Parsl WorkQueueExecutor tutorial

WorkQueue installation

To use WQEX in Parsl, please install the full CCTools software package within an appropriate Anaconda or Miniconda environment.

  1. Install Parsl on Github master
  2. conda install -y -c conda-forge ndcctools

Fine-grained resource management

In this tutorial we present three typical uses of WorkQueueExecutor (WQEX) in Parsl, including the default, autolabel and per-task resource specification.

  1. Default is suitable for apps that need to utilize the resource of the entire node, e.g., an app itself already does some parallelism internally.
  2. Autolabel is to enable Work Queue to automatically figure out the resource needed for every category of tasks, and then automatically allocate the tasks to use the computing resource efficiently.
  3. Per-task resource specification allows a user to specify the exact resource needed of an app.

We will go through each of these use cases in the following.

In [1]:
# Imports for this notebook
from parsl.config import Config
from parsl.executors import WorkQueueExecutor
from parsl.providers import LocalProvider

import parsl
from parsl.app.app import python_app

import time
import multiprocessing
import glob
import pandas as pd
import json

Default configuration

Now we define an initial WQEX configuration. Like the configuration of other executors (e.g. HTEX), we need to define a provider for WQEX. This launches ONE Work Queue worker on the local node.

In [2]:
# Default configuration of WQEX
config = Config(
    executors=[
        WorkQueueExecutor(port=50055,
                          provider=LocalProvider()
        )
    ]
)

dfk = parsl.load(config)

Now we define a simple sleeper app that sleeps for a short duration. We know this app uses minimal resource.

In [3]:
@python_app
def sleeper(dur=1):
    """ 
    An app that sleeps for certain duration
    """
    import time
    time.sleep(dur)
    return dur

Then we invoke the sleeper app for multiple times. With this default configuration, each worker can only run one task at a time. So the following sleeper tasks are essentially run in serial.

In [4]:
start = time.time()
tasks = [sleeper() for i in range(5)]
res = [t.result() for t in tasks]
print(res)
print(f"Task finished in {time.time() - start} seconds")
[1, 1, 1, 1, 1]
Task finished in 28.411840200424194 seconds

As we see below, there are multiple cores on the local node. This default config is not ideal for running tasks like sleeper that uses only minimal resource. This default config is suitable for apps that need to use the entire computing node.

In [5]:
multiprocessing.cpu_count()
Out[5]:
56

Clean up the loaded Parsl WQEX default config

In [6]:
dfk.cleanup()
parsl.clear()

How can we use WQEX for apps that do not need the entire node?

  1. Autolabel
  2. Per-task resource specification

Autolabel

Let us enable the autolabelin this new configuration. To enable autolabel, we just need to set autolabel=True. Note here that we also set autocateogory=True. Autolabel in WQEX automaically monitors the resource pattern of apps per category. By default, we have autocateogory=False, which means that WQEX categorizes ALL Parsl apps into one category, parsl-default. However, different apps may present various resource patterns. So it is better to categorize apps in a finer-grained. Setting autocateogory=True enables WorkQueue to automatically categorize different parsl apps based on the function name.

In [7]:
# Configuration with autolabel and autocategory enabled
config = Config(
    executors=[
        WorkQueueExecutor(port=50055,
                          autolabel=True,
                          autocategory=True,
                          provider=LocalProvider()
        )
    ]
)

dfk = parsl.load(config)

Next we define an app called mp_task that can stress multiple cores. In this app, we use the multiprocessing module to implement it. We then run five mp_task tasks. By default, each mp_task uses 2 CPU cores and lasts for around 2 seconds.

In [8]:
@python_app
def mp_task(cores=2):
    """
    An app that use multiprocessing to mimic an app that uses multiple cores
    """
    from multiprocessing import Process
    import time
    def stress(dur=2):
        start = time.time()
        while time.time() - start < dur:
            continue
    
    processes = []
    for i in range(cores):
        p = Process(target=stress, args=(2,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
    return f'Done with {cores} cores'


# Submit 5 mp_task app
tasks = []
for i in range(5):
    fut = mp_task()
    tasks.append(fut)
res = [t.result() for t in tasks]
print(res)
['Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores']

WorkQueue automatically monitors the resource usage of this category (based on the function name of app). Let us take a deeper look at what happens behind the scene. We define a function to parse the logs to get the resource allocated to all the tasks.

In [9]:
def parse_logs():
    """
    Parse the resource assignment of Work Queue from the runinfo logs
    """
    dirs = glob.glob("runinfo/*")
    log = "{}/WorkQueueExecutor/transaction_log".format(sorted(dirs)[-1])
    with open(log) as f:
        lines = f.readlines()

    resources = ['task_id', 'task_type', 'cores', 'memory', 'disk']
    df = pd.DataFrame(columns=resources)
    task_ids = {}
    for line in lines:
        if "WAITING" in line and "WAITING_RETRIEVAL" not in line and 'taskid' not in line:
                line = line.split()
                task_id = line[3]
                task_category = line[5]
                task_ids[task_id] = task_category

        # timestamp master-pid TASK id (continue next line)
        # DONE SUCCESS exit-code exceeded measured
        if "RUNNING" in line and 'taskid' not in line:
            line = line.split()
            task_id = line[3]
            s = json.loads(line[-1])

            # append the new resources to the panda's data frame.
            # Resources are represented in a json array as
            # [value, "unit", such as [1000, "MB"],
            # so we only take the first element of the array:
            df.loc[len(df)] = [task_id, task_ids[task_id]] + list(float(s[r][0]) for r in resources[2:])
    return df

Here is the resource for each mp_task task assigned by WorkQueue.

As we see, the first mp_task task was allocated 56 cores, 257GB memory, and 112GB disk. This is because WorkQueue assigns all the resource on the local node to the task, in order to profile the resource utilization for the first task. After that, WorkQueue updates the resource for this category (based on function name) accordingly. Thus, the later tasks use only 2 cores, 57MB memory, and 2MB disk.

In summary, in autolabel, for each category, Work Queue monitors the resources of some tasks in this category first (the number is tuned by autolabel_window). Then Work Queue allocates resources to later tasks based on the monitoring for high throughput. If a task fails because of lacking of resource, Work Queue automatically retries the task with more resource.

In [10]:
df = parse_logs()
df[df['task_type'] == 'mp_task'].head()
Out[10]:
task_id task_type cores memory disk
0 1 mp_task 56.0 257036.0 118788.0
1 2 mp_task 2.0 57.0 2.0
2 3 mp_task 2.0 57.0 2.0
3 4 mp_task 2.0 57.0 2.0
4 5 mp_task 2.0 57.0 2.0

The above shows how autolabel works. This is good for the use case where one is not quite clear about the resource requirements of the apps.

Per-task resource specification

Besides autolabel, WQEX supports to specify the resource requirement of a Parsl app by using a specific kwarg parsl_resource_specification. There are two ways you can specify the resource requirements for each app.

Declare resource requirements in the kwargs when you define an app

This declaration is the default resource requirement for mp_task_spec app. Currently, WQEX supports specification of three types of resources: cores, memory, and disk.

In [11]:
@python_app
def mp_task_spec(cores=2, parsl_resource_specification={'cores': 2, 'memory': 100, 'disk': 100}):
    from multiprocessing import Process
    import time
    def stress(dur=2):
        start = time.time()
        while time.time() - start < dur:
            continue
    
    processes = []
    for i in range(cores):
        p = Process(target=stress, args=(2,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
    return f'Done with {cores} cores'

If we invoke an app without further specifying the resource requirements, WQEX allocates each invocation the default resource when we define the app.

In [12]:
tasks = []
for i in range(3):
    # Invoke the app using the default resource specification
    fut = mp_task_spec(cores=2)
    tasks.append(fut)
res = [t.result() for t in tasks]
print(res)
['Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores']

Here is the resource for each mp_task_spec task allocated by WorkQueue. As we see, all invocations of mp_task_spec are allocated by the same resource, which is 3 cores, 100MB memory, and 100MB disk.

In [13]:
df = parse_logs()
df[df['task_type'] == 'mp_task_spec'].head(n=10)
Out[13]:
task_id task_type cores memory disk
5 6 mp_task_spec 2.0 100.0 100.0
6 7 mp_task_spec 2.0 100.0 100.0
7 8 mp_task_spec 2.0 100.0 100.0

In addition to using the default resource specification, we can also change the resource requirement when we invoke the app

In [14]:
tasks = []
for i in range(3):
    # Vary the resource specification per invocation
    fut = mp_task_spec(cores=i+1,
                       parsl_resource_specification={'cores': i+1, 'memory': 100, 'disk': 100})
    tasks.append(fut)
res = [t.result() for t in tasks]
print(res)
['Done with 1 cores', 'Done with 2 cores', 'Done with 3 cores']

Here is the resource for each mp_task_spec task assigned by WorkQueue. As we see, all invocations of mp_task_spec use different resources.

In [15]:
df = parse_logs()
df[df['task_type'] == 'mp_task_spec'].tail(n=3)
Out[15]:
task_id task_type cores memory disk
8 9 mp_task_spec 1.0 100.0 100.0
9 10 mp_task_spec 2.0 100.0 100.0
10 11 mp_task_spec 3.0 100.0 100.0

It is worth mentioning that any resource specification overrides what Work Queue autolabel infers.

  1. With autolabel enabled, we do not have to specify all three types of resources for an app. We can choose to specify some of them. For example, in the below, we only specify cores and disk, but autolabel automatically infers what is needed for memory and fill that gap.

  2. With autolabel disabled, if we do not specify all three types of resources, cores, memory, and disk, Work Queue alloates all the resource of a worker (i.e., a compute node) to run the app. This is not a proper configuration we should use.

In [16]:
fut = mp_task_spec(cores=2, parsl_resource_specification={'cores': 2, 'disk': 50})
fut.result()
df = parse_logs()
df[df['task_type'] == 'mp_task_spec'].tail(n=1)
Out[16]:
task_id task_type cores memory disk
11 12 mp_task_spec 2.0 60.0 50.0
In [ ]: