To use WQEX in Parsl, please install the full CCTools software package within an appropriate Anaconda or Miniconda environment.
In this tutorial we present three typical uses of WorkQueueExecutor (WQEX) in Parsl, including the default, autolabel and per-task resource specification.
We will go through each of these use cases in the following.
# Imports for this notebook
from parsl.config import Config
from parsl.executors import WorkQueueExecutor
from parsl.providers import LocalProvider
import parsl
from parsl.app.app import python_app
import time
import multiprocessing
import glob
import pandas as pd
import json
Now we define an initial WQEX configuration. Like the configuration of other executors (e.g. HTEX), we need to define a provider for WQEX. This launches ONE Work Queue worker on the local node.
# Default configuration of WQEX
config = Config(
executors=[
WorkQueueExecutor(port=50055,
provider=LocalProvider()
)
]
)
dfk = parsl.load(config)
Now we define a simple sleeper app that sleeps for a short duration. We know this app uses minimal resource.
@python_app
def sleeper(dur=1):
"""
An app that sleeps for certain duration
"""
import time
time.sleep(dur)
return dur
Then we invoke the sleeper app for multiple times. With this default configuration, each worker can only run one task at a time. So the following sleeper
tasks are essentially run in serial.
start = time.time()
tasks = [sleeper() for i in range(5)]
res = [t.result() for t in tasks]
print(res)
print(f"Task finished in {time.time() - start} seconds")
[1, 1, 1, 1, 1] Task finished in 28.411840200424194 seconds
As we see below, there are multiple cores on the local node. This default config is not ideal for running tasks like sleeper
that uses only minimal resource. This default config is suitable for apps that need to use the entire computing node.
multiprocessing.cpu_count()
56
Clean up the loaded Parsl WQEX default config
dfk.cleanup()
parsl.clear()
Let us enable the autolabelin this new configuration. To enable autolabel, we just need to set autolabel=True
. Note here that we also set autocateogory=True
. Autolabel in WQEX automaically monitors the resource pattern of apps per category. By default, we have autocateogory=False
, which means that WQEX categorizes ALL Parsl apps into one category, parsl-default
. However, different apps may present various resource patterns. So it is better to categorize apps in a finer-grained. Setting autocateogory=True
enables WorkQueue to automatically categorize different parsl apps based on the function name.
# Configuration with autolabel and autocategory enabled
config = Config(
executors=[
WorkQueueExecutor(port=50055,
autolabel=True,
autocategory=True,
provider=LocalProvider()
)
]
)
dfk = parsl.load(config)
Next we define an app called mp_task
that can stress multiple cores. In this app, we use the multiprocessing
module to implement it. We then run five mp_task
tasks. By default, each mp_task
uses 2 CPU cores and lasts for around 2 seconds.
@python_app
def mp_task(cores=2):
"""
An app that use multiprocessing to mimic an app that uses multiple cores
"""
from multiprocessing import Process
import time
def stress(dur=2):
start = time.time()
while time.time() - start < dur:
continue
processes = []
for i in range(cores):
p = Process(target=stress, args=(2,))
p.start()
processes.append(p)
for p in processes:
p.join()
return f'Done with {cores} cores'
# Submit 5 mp_task app
tasks = []
for i in range(5):
fut = mp_task()
tasks.append(fut)
res = [t.result() for t in tasks]
print(res)
['Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores']
WorkQueue automatically monitors the resource usage of this category (based on the function name of app). Let us take a deeper look at what happens behind the scene. We define a function to parse the logs to get the resource allocated to all the tasks.
def parse_logs():
"""
Parse the resource assignment of Work Queue from the runinfo logs
"""
dirs = glob.glob("runinfo/*")
log = "{}/WorkQueueExecutor/transaction_log".format(sorted(dirs)[-1])
with open(log) as f:
lines = f.readlines()
resources = ['task_id', 'task_type', 'cores', 'memory', 'disk']
df = pd.DataFrame(columns=resources)
task_ids = {}
for line in lines:
if "WAITING" in line and "WAITING_RETRIEVAL" not in line and 'taskid' not in line:
line = line.split()
task_id = line[3]
task_category = line[5]
task_ids[task_id] = task_category
# timestamp master-pid TASK id (continue next line)
# DONE SUCCESS exit-code exceeded measured
if "RUNNING" in line and 'taskid' not in line:
line = line.split()
task_id = line[3]
s = json.loads(line[-1])
# append the new resources to the panda's data frame.
# Resources are represented in a json array as
# [value, "unit", such as [1000, "MB"],
# so we only take the first element of the array:
df.loc[len(df)] = [task_id, task_ids[task_id]] + list(float(s[r][0]) for r in resources[2:])
return df
Here is the resource for each mp_task
task assigned by WorkQueue.
As we see, the first mp_task
task was allocated 56 cores, 257GB memory, and 112GB disk. This is because WorkQueue assigns all the resource on the local node to the task, in order to profile the resource utilization for the first task. After that, WorkQueue updates the resource for this category (based on function name) accordingly. Thus, the later tasks use only 2 cores, 57MB memory, and 2MB disk.
In summary, in autolabel, for each category, Work Queue monitors the resources of some tasks in this category first (the number is tuned by autolabel_window
). Then Work Queue allocates resources to later tasks based on the monitoring for high throughput. If a task fails because of lacking of resource, Work Queue automatically retries the task with more resource.
df = parse_logs()
df[df['task_type'] == 'mp_task'].head()
task_id | task_type | cores | memory | disk | |
---|---|---|---|---|---|
0 | 1 | mp_task | 56.0 | 257036.0 | 118788.0 |
1 | 2 | mp_task | 2.0 | 57.0 | 2.0 |
2 | 3 | mp_task | 2.0 | 57.0 | 2.0 |
3 | 4 | mp_task | 2.0 | 57.0 | 2.0 |
4 | 5 | mp_task | 2.0 | 57.0 | 2.0 |
The above shows how autolabel works. This is good for the use case where one is not quite clear about the resource requirements of the apps.
Besides autolabel, WQEX supports to specify the resource requirement of a Parsl app by using a specific kwarg parsl_resource_specification. There are two ways you can specify the resource requirements for each app.
This declaration is the default resource requirement for mp_task_spec
app. Currently, WQEX supports specification of three types of resources: cores, memory, and disk.
@python_app
def mp_task_spec(cores=2, parsl_resource_specification={'cores': 2, 'memory': 100, 'disk': 100}):
from multiprocessing import Process
import time
def stress(dur=2):
start = time.time()
while time.time() - start < dur:
continue
processes = []
for i in range(cores):
p = Process(target=stress, args=(2,))
p.start()
processes.append(p)
for p in processes:
p.join()
return f'Done with {cores} cores'
If we invoke an app without further specifying the resource requirements, WQEX allocates each invocation the default resource when we define the app.
tasks = []
for i in range(3):
# Invoke the app using the default resource specification
fut = mp_task_spec(cores=2)
tasks.append(fut)
res = [t.result() for t in tasks]
print(res)
['Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores']
Here is the resource for each mp_task_spec
task allocated by WorkQueue. As we see, all invocations of mp_task_spec
are allocated by the same resource, which is 3 cores, 100MB memory, and 100MB disk.
df = parse_logs()
df[df['task_type'] == 'mp_task_spec'].head(n=10)
task_id | task_type | cores | memory | disk | |
---|---|---|---|---|---|
5 | 6 | mp_task_spec | 2.0 | 100.0 | 100.0 |
6 | 7 | mp_task_spec | 2.0 | 100.0 | 100.0 |
7 | 8 | mp_task_spec | 2.0 | 100.0 | 100.0 |
In addition to using the default resource specification, we can also change the resource requirement when we invoke the app
tasks = []
for i in range(3):
# Vary the resource specification per invocation
fut = mp_task_spec(cores=i+1,
parsl_resource_specification={'cores': i+1, 'memory': 100, 'disk': 100})
tasks.append(fut)
res = [t.result() for t in tasks]
print(res)
['Done with 1 cores', 'Done with 2 cores', 'Done with 3 cores']
Here is the resource for each mp_task_spec task assigned by WorkQueue. As we see, all invocations of mp_task_spec
use different resources.
df = parse_logs()
df[df['task_type'] == 'mp_task_spec'].tail(n=3)
task_id | task_type | cores | memory | disk | |
---|---|---|---|---|---|
8 | 9 | mp_task_spec | 1.0 | 100.0 | 100.0 |
9 | 10 | mp_task_spec | 2.0 | 100.0 | 100.0 |
10 | 11 | mp_task_spec | 3.0 | 100.0 | 100.0 |
It is worth mentioning that any resource specification overrides what Work Queue autolabel infers.
With autolabel enabled, we do not have to specify all three types of resources for an app. We can choose to specify some of them. For example, in the below, we only specify cores and disk, but autolabel automatically infers what is needed for memory and fill that gap.
With autolabel disabled, if we do not specify all three types of resources, cores, memory, and disk, Work Queue alloates all the resource of a worker (i.e., a compute node) to run the app. This is not a proper configuration we should use.
fut = mp_task_spec(cores=2, parsl_resource_specification={'cores': 2, 'disk': 50})
fut.result()
df = parse_logs()
df[df['task_type'] == 'mp_task_spec'].tail(n=1)
task_id | task_type | cores | memory | disk | |
---|---|---|---|---|---|
11 | 12 | mp_task_spec | 2.0 | 60.0 | 50.0 |