In this tutorial, you will be able to first try a few Parsl dataflows (examples 1-4) on your local machine, to get a sense of the library. Then, in examples 5-7 you will run similar dataflows on any resource you may have access to, such as clouds (Amazon Web Services), HPC systems, clusters etc, to see how more complex workflows can be expressed with Parsl.
This section will provide a walk-through for getting a simple "mock" science application running using Parsl on your local machine. | Note: the mock science apps are included in the apps directory of the tutorial repository.
The first Parsl script runs simulate.sh
to generate a single random number. It writes the number to stdout. You can run the following script repeatedly to see different "simulated" results.
<img src="figures/part01.png", align=left>
from parsl import App, DataFlowKernel
from parsl.configs.local import localIPP, localThreads
dfk = DataFlowKernel(config=localThreads)
@App('bash', dfk)
def mysim(stdout="output/p1.out", stderr="output/p1.err"):
"""Set this example up as a bash app by returning the
command line app to be called, in this case simulate"""
return "app/simulate"
# call the mysim app and wait for the result
mysim().result()
# open the output file and read the result
with open('output/p1.out', 'r') as f:
print(f.read())
# clean up the DFK
dfk.cleanup()
Debugging parallel dataflows is often more complicated than with serial programs. The easiest way to debug a Parsl application is via Python loggers. The following example shows how to enable logging. In this case we log the debug stream to the console. You can also log to a file using the set_file_logger
function.
When trying to debug individual apps it is often best to first capture stdout and stderr to files. These files will capture any text output by the app which may indicate app behavior.
from parsl import App, DataFlowKernel
from parsl.configs.local import localIPP, localThreads
from parsl import set_stream_logger, NullHandler
# set the stream logger to print debug messages
set_stream_logger()
dfk = DataFlowKernel(config=localThreads)
@App('bash', dfk)
def mysim(stdout="output/p1-debug.out", stderr="output/p1-debug.err"):
"""Set this example up as a bash app by returning the
command line app to be called, in this case simulate"""
return "app/simulate"
mysim().result()
with open('output/p1-debug.out', 'r') as f:
print(f.read())
dfk.cleanup()
The second example mirrors the first, however instead of using an external application it uses a Python function to simulate a science app again by generating a random number. In this case the Python app returns the simulated value as a Python object rather than using an external file.
from parsl import App, DataFlowKernel
from parsl.configs.local import localIPP, localThreads
dfk = DataFlowKernel(config=localThreads)
@App('python', dfk)
def mysim():
from random import randint
"""Generate a random integer and return it"""
return randint(1,100)
print(mysim().result())
dfk.cleanup()
The third example shows how Parsl can be used to run many simulations in parallel. In this case, we define the same Parsl simulation app (simulate
). The Python script then loops calling this app. Parsl ensures that each independent instance of the app will execute in parallel. Note: rather than use stdout the simulation app redirects the output to a specified file.
<img src="figures/part03.png", align=left>
from parsl import App, DataFlowKernel
from parsl.configs.local import localIPP, localThreads
dfk = DataFlowKernel(config=localThreads)
@App('bash', dfk)
def mysim(outputs=[], stdout="output/p3.out", stderr="output/p3.err"):
"""Call simulate and return results in the output file"""
return 'app/simulate > {0}'.format(outputs[0])
# loop to execute the simulation app 5 times
results = []
for i in range(5):
out_file = "output/p3_sim_{0}".format(i)
results.append(mysim(outputs=[out_file]))
# print each job status, initially all are running
print ("Job Status: {}".format([r.done() for r in results]))
# wait for all apps to complete
[r.result() for r in results]
# print each job status, they will now be finished
print ("Job Status: {}".format([r.done() for r in results]))
# collect up the output files and print their values
outputs = [r.outputs[0] for r in results]
for o in outputs:
with open(o.filename, 'r') as f:
print(f.read().strip())
dfk.cleanup()
After all the parallel simulations in an ensemble run have completed, it is typically necessary to gather and analyze their results with some kind of post-processing analysis program or script. The fourth example introduces such a postprocessing step. In this case, the files created by all of the parallel runs of simulate
will be averaged by the "analysis application" stats
.
Note: in this example we do not block on the outputs of the simulation app, rather the futures are passed directly to the analysis application. Parsl will manage these dependencies and only execute the analysis app when the simulation app completes.
<img src="figures/part04.png", align=left>
from parsl import App, DataFlowKernel
from parsl.configs.local import localIPP, localThreads
dfk = DataFlowKernel(config=localThreads)
@App('bash', dfk)
def mysim(outputs=[],
stdout="output/p4_sim.out",
stderr="output/p4_sim.err"):
"""Call simulate and return results in the output file"""
return 'app/simulate > {0}'.format(outputs[0])
@App('bash', dfk)
def stats(inputs=[],
outputs=[],
stderr='output/p4_stats.err',
stdout='output/p4_stats.out'):
"""call stats with all simulation results as inputs"""
return "app/stats {0} > {1}".format(" ".join(inputs), outputs[0])
# call the simulation app 5 times
results = []
for i in range(5):
out_file = "output/p4_sim_{0}".format(i)
results.append(mysim(outputs=[out_file]))
# collect the output data futures
sim_outputs = [r.outputs[0] for r in results]
# run the stats app
s = stats(inputs=sim_outputs, outputs=["output/p4_stats.txt"])
# wait for the result
s.result()
# print the result
with open('output/p4_stats.txt', 'r') as f:
print(f.read())
dfk.cleanup()
This section introduces execution of Parsl scripts on remote computational resources.
Parsl supports a variety of resource providers as well as methods for submitting workload to those resources (e.g., pilot jobs). Example configurations for compute resources such as UChicago Midway, NERSC Cori, and ANL Cooley are included in the config directory of the tutorial repository.
A Parsl script can be executed from a login node by using these configuration files directly. Parsl also supports a more advanced submission model in which a SSH channel is used to submit wokloads from an external machine (e.g., your laptop). In this case the machine on which the script is executed must be accessible from the cluster (e.g., ensure firewall rules allow pilot job connections back to your laptop).
In this example we show how one might develop a script to run on a remote resource. We first develop a Parsl script for sorting a file. We initially run this script using the local ThreadPoolExecutor on a laptop, we subsequently extend this example to submit the job via the IPyParallel pilot job model locally and then on a remote resource. We finally show how Parsl's SSH channel can be used to connect to the remote resource directly from your laptop.
This script uses the sort
application to sort the numbers from an unsorted file.
First we use the local thread executor to run the sort
command locally.
from parsl import App, DataFlowKernel
from parsl.configs.local import localIPP, localThreads
dfk = DataFlowKernel(config=localThreads)
@App('bash', dfk)
def sort(unsorted,
outputs=[],
stderr='output/p5_a_sort.err',
stdout='output/p5_a_sort.out'):
"""Call sort executable on the input file"""
return "sort -g {} > {}".format(unsorted, outputs[0])
# call the sort app on the unsorted.txt file
# save the results to a_sorted.txt
s = sort("input/unsorted.txt", outputs=["output/a_sorted.txt"])
# wait for the result
output_file = s.outputs[0].result()
# print the contents of the unsorted and sorted files
print("Contents of the unsorted.txt file:")
with open('input/unsorted.txt', 'r') as f:
print(f.read().replace("\n",","))
print("\nContents of the sorted output file:")
with open(output_file, 'r') as f:
print(f.read().replace("\n",","))
dfk.cleanup()
We now use IPyParallel to run the sort
command using a pilot job model. In this case we use the localIPP
configuration to tell Parsl to use local IPyParallel.
You will notice that apart from the configuration and instantiation of the DataFlowKernal this script is identical to the previous script.
from parsl import App, DataFlowKernel
from parsl.configs.local import localIPP, localThreads
dfk = DataFlowKernel(config=localIPP)
@App('bash', dfk)
def sort(unsorted: str,
outputs: list = [],
stderr: str='output/p5_b_sort.err',
stdout: str='output/p5_b_sort.out'):
"""Call sort executable on the input file"""
return "sort -g {} > {}".format(unsorted, outputs[0])
s = sort("input/unsorted.txt", outputs=["output/b_sorted.txt"])
output_file = s.outputs[0].result()
print("Contents of the unsorted.txt file:")
with open('input/unsorted.txt', 'r') as f:
print(f.read().replace("\n",","))
print("\nContents of the sorted output file:")
with open(output_file, 'r') as f:
print(f.read().replace("\n",","))
dfk.cleanup()
We now take the previous example and run it on a cluster using IPyParallel. To run this example you will need to execute the script from a login node and uncomment the configuration needed for your cluster.
Note: you will need to either install the Parsl library on that login node or use an existing environment. You will also need to download the parsl-tutorial repository and ensure that it is avaialble on the worker nodes via a shared file system.
from parsl import App, DataFlowKernel
import json
import os
# use the Midway configuration
from config.midway import config
# from config.cori import config
# from config.cooley import config
dfk = DataFlowKernel(config=config)
@App('bash', dfk)
def sort(unsorted, outputs=[]):
"""Call sort executable on the input file"""
return "sort -g {} > {}".format(unsorted, outputs[0])
s = sort(os.path.abspath("input/unsorted.txt"),
outputs=[os.path.abspath("output/sorted_c.txt")])
output_file = s.outputs[0].result()
print("Contents of the unsorted.txt file:")
with open('input/unsorted.txt', 'r') as f:
print(f.read().replace("\n",","))
print("\nContents of the sorted output file:")
with open(output_file, 'r') as f:
print(f.read().replace("\n",","))
In the previous example we needed to execute the script from a login node. Parsl also can be run over a remote SSH channel. However, it requires that the machine on which the script is executed to be able to host the IPythonParallel controller and therefore be accessible from the remote machine (e.g., network access, firewall rules, etc.).
You will also need to configure your SSH agent to enable creation of the SSH connection to the login node. In a terminal type:
$ ssh-add
Note: Parsl does not yet provide automated file staging. Instead, for this eaxmple we use a Python app to first create the unsorted file and then we use the Parsl SSH connection to explicitly stage the output file back to the host machine.
To run this script you must update the shared_dir variable as well as the configuration details below.
If you are running this notebook in a shared environment (e.g., as part of a tutorial) you will need to configure SSH agent forwarding. First enable agent forwarding on your host machine:
$ vi ~/.ssh/config file:
Host <HOSTNAME>
ForwardAgent yes
Then SSH to that host while forwarding your agent
$ ssh-add
$ ssh -A <HOSTNAME>
Finally, on the remote machine find the SSH_AUTH_SOCK path and paste that in this notebook
$ echo $SSH_AUTH_SOCK
from parsl import App, DataFlowKernel
#from parsl import set_stream_logger
import os
from config.midway import config
# directory shared with worker nodes
shared_dir = "<SHARED_DIRECTORY>"
# set remote connection details
config["sites"][0]["auth"]= {"channel": "ssh",
"hostname":"<LOGIN_NODE>",
"username": "<USERNAME>",
"scriptDir": shared_dir}
# if using a shared Jupyter environmnet with agent forwarding
#os.environ['SSH_AUTH_SOCK'] = '/tmp/ssh-uBccLlzXVT/agent.6420'
dfk = DataFlowKernel(config=config)
@App('python', dfk)
def create_unsorted_file(outputs=[]):
"""Create an unsorted file by generating random numbers"""
from random import randint
file = open(outputs[0], 'w')
for i in range(0,50):
file.write("{0}\n".format(randint(1,100)))
file.close()
@App('bash', dfk)
def sort(unsorted, outputs=[]):
"""Call sort executable on the input file"""
return "sort -g {0} > {1}".format(unsorted, outputs[0])
# create the unsorted file
unsorted = create_unsorted_file(outputs=[os.path.join(shared_dir, "unsorted-generated.txt")])
# sort the file into a new file called sorted_d.txt
s = sort(unsorted.outputs[0],
outputs=[os.path.join(shared_dir, "sorted_d.txt")])
# wait for the app to complete
output_file = s.outputs[0].result()
# use Parsl's SSH channel to copy the sorted file
dfk.executor.execution_provider.channel.pull_file(output_file, '.')
with open(os.path.basename(output_file), 'r') as f:
print(f.read().replace("\n",","))
dfk.cleanup()
The final example is a basic "Hello World!" example that shows you how to run MPI applications. Here we have a simple MPI code mpi_hello.c
that has each MPI rank sleep for a user-specified duration and then print the processor name on which the rank is executing followed by "Hello World!".
The following script is designed to be run from the login node. Like the previous examples you will need to update the shared directory. You may optionally use an SSH channel by following the previous instructions. If running from the login node you will not need the remote configuration details.
Given the range of MPI libraries installed on each site you will need to specify the site-specific MPI compiler (e.g., mpicc, cc) and the way to execute the MPI job (e.g., mpirun, srun).
from parsl import App, DataFlowKernel
import os
#from parsl import set_stream_logger
set_stream_logger()
from config.midway import config
remote = True
# directory shared with worker nodes
shared_dir = "<SHARED_DIRECTORY>"
# set remote connection details
if remote:
config["sites"][0]["auth"]= {"channel": "ssh",
"hostname":"<LOGIN_NODE>",
"username": "<USERNAME>",
"scriptDir": shared_dir}
# set information about the number of nodes needed
config["sites"][0]["execution"]["block"]["walltime"] = '00:10:00'
config["sites"][0]["execution"]["block"]["nodes"] = 2
# set the site-specific compiler
compiler = 'mpicc' # or 'cc'
# set the site-specific runner
runner = 'mpirun' # or 'srun'
dfk = DataFlowKernel(config=config)
@App('bash', dfk)
def compile_app(dirpath, stdout=None, stderr=None, compiler="mpicc"):
"""Compile mpi app using site-specific compiler.
E.g., midway compiler = mpicc, Cori compiler= cc
"""
return '''cd {0}; make clean; make CC={1} '''.format(dirpath, compiler)
@App('bash', dfk)
def mpi_hello(dirpath, launcher="mpirun", app="mpi_hello", nproc=20, outputs=[]):
"""Call compiled mpi executable with local mpilib.
Works natively for openmpi mpiexec, mpirun, orterun, oshrun, shmerun
mpiexec is default"""
import os
if launcher == "mpirun" :
return "mpirun -np {} {} &> {};".format(nproc, os.path.join(dirpath,app), outputs[0])
elif launcher == "srun" :
return "srun -n {} ./{} &> {};".format(nproc, os.path.join(dirpath,app), outputs[0])
# complile the app and wait for it to complete (.result())
compile_app(dirpath=os.path.join(shared_dir, "mpi_apps"),
stdout=os.path.join(shared_dir, "mpi_apps.compile.out"),
stderr=os.path.join(shared_dir, "mpi_apps.compile.err",),
compiler='mpicc'
).result()
# run the mpi app
hello = mpi_hello(os.path.join(shared_dir, "mpi_apps"),
launcher="mpirun",
outputs=[os.path.join(shared_dir, "mpi_apps", "hello.txt")])
output_file = hello.outputs[0].result()
# if running remotely using SSH, copy the file back to the host
if remote:
dfk.executor.execution_provider.channel.pull_file(output_file, '.')
# read the result file
with open(os.path.basename(output_file), 'r') as f:
print(f.read())
dfk.cleanup()