#!/usr/bin/env python # coding: utf-8 # # Plug your execution system into Parsl # # This notebook will dive straight into a live coding demo of how you could plug your execution system as a backend to Parsl. An execution system in this context is a system that will take some code (python function) and execute it on some compute provider for eg. a laptop, GPU cluster, a cloud vendor etc. # # Now, why would you want to do this ? # # * Your system supports features our systems don't eg. resource based task packing # * Your system might have faster, more reliable or efficient task execution # * You want to create a mix and match system where tasks execute in different environments using systems that suit them, and you want Parsl to be combine these systems # # # In this notebook we will cover: # # 1. Parsl Executor API # 2. Balsam Overview # 3. Cover how basic operation can be done with Balsam # 4. Write a new Balsam Executor that we'll test. # ## Notebook environment # # 1. We are in a Jupyter notebook hosted at ALCF, with access to the cobalt scheduler # 2. This notebook has both Parsl and Balsam libraries available from the active Conda environment # 3. We will not focus on the specifics of Balsam, but rather on how it's task execution capabilities can be integrated into Parsl. # ### Parsl Executors # # Parsl executors extend the `concurrent.futures.Executor` class. This gives us a widely used standard interface that at it's core is very simple. # # ![API Diagram](API_diagram.png) # # # 1. An executor has a submit method # 2. When submit is called with a function, and it's params the executor will execute the function # 3. The submit method immediately returns a Future, rather than waiting for the function execution # 4. The Future is asynchronously updated with the results when it becomes available. # # ### Balsam # # Balsam is an HPC workflows and edge computing system out of ALCF. Balsam's execution model follows a Pilot job model where a scheduler job is requested for a large walltime, onto which workers are launched.These workers connect back to the main system and can run arbitrary number of tasks. Balsam uses a Postgres database to store the task definitions, making it robust in the face of both software and hardware failures. # # ![Balsam](balsam.png) # Let's first import both Parsl and Balsam and confirm that we have the latest versions available # In[ ]: import balsam import parsl print("Balsam version :", balsam.__version__) print("Parsl version :", parsl.__version__) # #### Step 1: Batch Job submission # # Submit a batch job to the Cobalt scheduler on theta using Balsam # In[ ]: import os os.environ["BALSAM_DB_PATH"] = '~/myWorkflow' os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" from balsam.scripts.cli_commands import submitlaunch import os import sys # Step.1 would be get a test job launched with Balsam. def balsam_submit_batch_job(nodes=1, walltime=10): sys.path.insert(0,'/soft/datascience/Balsam/0.3.8/env/lib/python3.6/site-packages/') sys.path.insert(0,'/soft/datascience/Balsam/0.3.8/') # We also need balsam and postgresql to be in the path. import os os.environ['PATH'] ='/soft/datascience/Balsam/0.3.8/env/bin/:' + os.environ['PATH'] os.environ['PATH'] +=':/soft/datascience/PostgreSQL/9.6.12/bin/' from argparse import Namespace args = Namespace(nodes=nodes, time_minutes=walltime, job_mode='serial', project='CSC249ADCD01', wf_filter='', sched_flags='', queue='debug-cache-quad') submitlaunch(args) # #### Step 2: Register Application Definition # # We need to create an application profile for the tasks we want Balsam to execute # In our case, we want balsam to execute a task that: # # ![Sequence Diagram](executor_sequence.png) # # 1. Read a serialized function and params package # 2. Deserialize # 3. Execute the function package # 4. Serialize the results or exceptions. Let's assume our functions will never fail. # 5. Write out the serialized results package for parsl # In[ ]: from balsam.core.models import BalsamJob, ApplicationDefinition def register_balsam_executor(): from balsam.core.models import BalsamJob, ApplicationDefinition return ApplicationDefinition.objects.get_or_create( name = 'execute_parsl_task', # Let's call execute_parsl_task envscript = '/home/yadunand/setup_parsl_test_env.sh', # This script will activate the parsl conda env executable = os.path.abspath('execute_parsl_task.py'), # This script does the 5 steps described above. ) # #### Step 3: Launch tasks via Balsam # # Balsam internally creates a database entry in Postgres, and the job object # returned can be used to check the state of the function execution. # In[ ]: from balsam.launcher.futures import FutureTask, wait def _cbk(): pass def launch_balsam_task(func_name, task_id, fn_package, result_package): job = BalsamJob(name = f"{func_name}.{task_id}", workflow = "parsl-balsam", application = "execute_parsl_task", args = f"-i {fn_package} -o {result_package}", node_packing_count=16 # TEST THIS TODO: ) job.data["task_id"] = task_id job.save() return FutureTask(job, _cbk) # #### Step 4: Check task status # # We need a mechanism that waits for all launched jobs and updates the results when they become available. One mechanism available from balsam is a poll based system triggered when job.done is called. # In[ ]: import threading import time from concurrent.futures import Future # Ideally we'd have a thread that sits in the background polling tasks, rather than having the main thread block def task_status_poller(job_list, kill_event, poll_freq=2): to_remove = [] while not kill_event.is_set(): print([job.done for job in job_list]) for job in job_list: if job.done: to_remove.append(job) for job in to_remove: to_remove.remove(job) time.sleep(5) print("Dying") kill_event = threading.Event() #task_status_poller([Future(), Future()], kill_event) thread = threading.Thread(target=task_status_poller, args=([Future(), Future()], kill_event, )) # Start thread, sleep 5s, and kill thread thread.start() time.sleep(4) kill_event.set() # In[ ]: # Test everything so far. # balsam_submit_batch_job() # In[ ]: # register_balsam_executor() # In[ ]: #task = os.path.abspath('balsam_workdir/c4eb5544-741b-420f-bd10-829fb85fedf0.pkl') #task_out = task + '.out' #j1 = launch_balsam_task('foo', 1, task, task+'.1.out') #j2 = launch_balsam_task('foo', 2, task, task+'.2.out') # In[ ]: #kill_event = threading.Event() #thread = threading.Thread(target=poll_for_results, # args=([j1, j2], kill_event, )) # Start thread, sleep 5s, and kill thread #thread.start() #time.sleep(300) #kill_event.set() # #### Step 5: Make a new executor # # Here we will create a new executor with the following methods : # # 1. `start()` -> starts the task status polling thread, a new batch job, and registers the execute application # 2. `shutdown()` -> set the event for the polling thread to terminate # 3. `submit()` -> launch a task via balsam # 4. `scaling_enabled()` -> set to `false` to disable auto-scaling # In[ ]: import threading import time from parsl.utils import RepresentationMixin from parsl.executors.status_handling import NoStatusHandlingExecutor from concurrent.futures import Future import os import uuid from parsl.serialize import pack_apply_message, deserialize class BalsamExecutor(NoStatusHandlingExecutor, RepresentationMixin): def __init__(self, nodes=1, walltime=10, workdir='balsam_workdir', label='BalsamExecutor'): self.label = label self._tasks = {} self.workdir = os.path.abspath(workdir) self.task_count = 0 self.nodes = nodes self.walltime = walltime self._kill_event = threading.Event() def task_status_poller(self, kill_event, poll_delay=20): """ This should be a thread """ pass def start(self): """ Called when DFK starts the executor when the config is loaded 1. Make workdir 2. Start task_status_poller 3. Register balsam executor 4. Launch cobalt job from balsam """ pass def shutdown(self): """ Shutdown threads etc """ pass def submit(self, func, resource_spec, *args, **kwargs): print(f"{func} submitted") def scale_in(self): pass def scale_out(self): pass def scaling_enabled(self): return False # Let's load the executor we defined above and run some quick tests # In[ ]: from parsl.config import Config config = Config(executors=[BalsamExecutor()]) # In[ ]: parsl.clear() parsl.load(config) # We'll create two apps, one python based, the other a bash app. # In[ ]: from parsl import python_app, bash_app @python_app def double(x): return x * 2 @bash_app def platinfo(stdout='test.out'): return 'uname -a' # Let's launch a few tasks that invoke the `double` app defined above # In[ ]: futures = {} for i in range(4): f = double(i) futures[i] = f # In[ ]: for i in futures: print(futures[i]) # In[ ]: for i in futures: print(futures[i].result()) # In[ ]: parsl.clear() # ## Recap # # We basically implemented the sequence shown in this diagram: # # ![Sequence Diagram](executor_sequence.png) # # * We saw a walkthrough of Balsam interfaces that show you how to # * Place batch job requests via Balsam # * Register an application that will execute parsl tasks # * Launch a Balsam job for each Parsl task # * Wrote a new BalsamExecutor that plugs into parsl