This notebook will dive straight into a live coding demo of how you could plug your execution system as a backend to Parsl. An execution system in this context is a system that will take some code (python function) and execute it on some compute provider for eg. a laptop, GPU cluster, a cloud vendor etc.
Now, why would you want to do this ?
In this notebook we will cover:
Parsl executors extend the concurrent.futures.Executor
class. This gives us a widely used standard interface that at it's core is very simple.
Balsam is an HPC workflows and edge computing system out of ALCF. Balsam's execution model follows a Pilot job model where a scheduler job is requested for a large walltime, onto which workers are launched.These workers connect back to the main system and can run arbitrary number of tasks. Balsam uses a Postgres database to store the task definitions, making it robust in the face of both software and hardware failures.
Let's first import both Parsl and Balsam and confirm that we have the latest versions available
import balsam
import parsl
print("Balsam version :", balsam.__version__)
print("Parsl version :", parsl.__version__)
Submit a batch job to the Cobalt scheduler on theta using Balsam
import os
os.environ["BALSAM_DB_PATH"] = '~/myWorkflow'
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
from balsam.scripts.cli_commands import submitlaunch
import os
import sys
# Step.1 would be get a test job launched with Balsam.
def balsam_submit_batch_job(nodes=1, walltime=10):
sys.path.insert(0,'/soft/datascience/Balsam/0.3.8/env/lib/python3.6/site-packages/')
sys.path.insert(0,'/soft/datascience/Balsam/0.3.8/')
# We also need balsam and postgresql to be in the path.
import os
os.environ['PATH'] ='/soft/datascience/Balsam/0.3.8/env/bin/:' + os.environ['PATH']
os.environ['PATH'] +=':/soft/datascience/PostgreSQL/9.6.12/bin/'
from argparse import Namespace
args = Namespace(nodes=nodes,
time_minutes=walltime,
job_mode='serial',
project='CSC249ADCD01',
wf_filter='',
sched_flags='',
queue='debug-cache-quad')
submitlaunch(args)
We need to create an application profile for the tasks we want Balsam to execute In our case, we want balsam to execute a task that:
from balsam.core.models import BalsamJob, ApplicationDefinition
def register_balsam_executor():
from balsam.core.models import BalsamJob, ApplicationDefinition
return ApplicationDefinition.objects.get_or_create(
name = 'execute_parsl_task', # Let's call execute_parsl_task
envscript = '/home/yadunand/setup_parsl_test_env.sh', # This script will activate the parsl conda env
executable = os.path.abspath('execute_parsl_task.py'), # This script does the 5 steps described above.
)
Balsam internally creates a database entry in Postgres, and the job object returned can be used to check the state of the function execution.
from balsam.launcher.futures import FutureTask, wait
def _cbk():
pass
def launch_balsam_task(func_name, task_id, fn_package, result_package):
job = BalsamJob(name = f"{func_name}.{task_id}",
workflow = "parsl-balsam",
application = "execute_parsl_task",
args = f"-i {fn_package} -o {result_package}",
node_packing_count=16 # TEST THIS TODO:
)
job.data["task_id"] = task_id
job.save()
return FutureTask(job, _cbk)
We need a mechanism that waits for all launched jobs and updates the results when they become available. One mechanism available from balsam is a poll based system triggered when job.done is called.
import threading
import time
from concurrent.futures import Future
# Ideally we'd have a thread that sits in the background polling tasks, rather than having the main thread block
def task_status_poller(job_list, kill_event, poll_freq=2):
to_remove = []
while not kill_event.is_set():
print([job.done for job in job_list])
for job in job_list:
if job.done:
to_remove.append(job)
for job in to_remove:
to_remove.remove(job)
time.sleep(5)
print("Dying")
kill_event = threading.Event()
#task_status_poller([Future(), Future()], kill_event)
thread = threading.Thread(target=task_status_poller, args=([Future(), Future()], kill_event, ))
# Start thread, sleep 5s, and kill thread
thread.start()
time.sleep(4)
kill_event.set()
# Test everything so far.
# balsam_submit_batch_job()
# register_balsam_executor()
#task = os.path.abspath('balsam_workdir/c4eb5544-741b-420f-bd10-829fb85fedf0.pkl')
#task_out = task + '.out'
#j1 = launch_balsam_task('foo', 1, task, task+'.1.out')
#j2 = launch_balsam_task('foo', 2, task, task+'.2.out')
#kill_event = threading.Event()
#thread = threading.Thread(target=poll_for_results,
# args=([j1, j2], kill_event, ))
# Start thread, sleep 5s, and kill thread
#thread.start()
#time.sleep(300)
#kill_event.set()
Here we will create a new executor with the following methods :
start()
-> starts the task status polling thread, a new batch job, and registers the execute applicationshutdown()
-> set the event for the polling thread to terminatesubmit()
-> launch a task via balsamscaling_enabled()
-> set to false
to disable auto-scalingimport threading
import time
from parsl.utils import RepresentationMixin
from parsl.executors.status_handling import NoStatusHandlingExecutor
from concurrent.futures import Future
import os
import uuid
from parsl.serialize import pack_apply_message, deserialize
class BalsamExecutor(NoStatusHandlingExecutor, RepresentationMixin):
def __init__(self, nodes=1, walltime=10, workdir='balsam_workdir', label='BalsamExecutor'):
self.label = label
self._tasks = {}
self.workdir = os.path.abspath(workdir)
self.task_count = 0
self.nodes = nodes
self.walltime = walltime
self._kill_event = threading.Event()
def task_status_poller(self, kill_event, poll_delay=20):
""" This should be a thread
"""
pass
def start(self):
""" Called when DFK starts the executor when the config is loaded
1. Make workdir
2. Start task_status_poller
3. Register balsam executor
4. Launch cobalt job from balsam
"""
pass
def shutdown(self):
""" Shutdown threads etc
"""
pass
def submit(self, func, resource_spec, *args, **kwargs):
print(f"{func} submitted")
def scale_in(self):
pass
def scale_out(self):
pass
def scaling_enabled(self):
return False
Let's load the executor we defined above and run some quick tests
from parsl.config import Config
config = Config(executors=[BalsamExecutor()])
parsl.clear()
parsl.load(config)
We'll create two apps, one python based, the other a bash app.
from parsl import python_app, bash_app
@python_app
def double(x):
return x * 2
@bash_app
def platinfo(stdout='test.out'):
return 'uname -a'
Let's launch a few tasks that invoke the double
app defined above
futures = {}
for i in range(4):
f = double(i)
futures[i] = f
for i in futures:
print(futures[i])
for i in futures:
print(futures[i].result())
parsl.clear()
We basically implemented the sequence shown in this diagram: