#!/usr/bin/env python # coding: utf-8 # # Parsl tutorial # # Parsl is a native Python library that allows you to write functions that execute in parallel and tie them together with dependencies to create workflows. Parsl wraps Python functions as "Apps" using the **@App** decorator. Decorated functions can run in parallel when all their inputs are ready. # # For more comprehensive documentation and examples, please refer our [documentation](http://parsl.readthedocs.io/en/latest/). # In[ ]: import parsl import os from parsl.app.app import python_app, bash_app from parsl.configs.local_threads import config #parsl.set_stream_logger() # <-- log everything to stdout print(parsl.__version__) # ### Configuring Parsl # # Parsl separates code and execution. To do so, it relies on a configuration model to describe the pool of resources to be used for execution (e.g., clusters, clouds, threads). # # # We'll come back to configuration later in this tutorial. For now, we configure this example to use a local pool of [threads](https://en.wikipedia.org/wiki/Thread_computing) to facilitate local parallel execution. # In[ ]: parsl.load(config) # ## Apps # # # In Parsl an `app` is a piece of code that can be asynchronously executed on an execution resource (e.g., cloud, cluster, or local PC). Parsl provides support for pure Python apps (`python_app`) and also command-line apps executed via Bash (`bash_app`). # # ### Python Apps # # As a first example let's define a simple Python function that returns the string 'Hello World!'. This function is made into a Parsl App using the **@python_app** decorator. # In[ ]: @python_app def hello (): return 'Hello World!' print(hello().result()) # As can be seen above, Apps wrap standard Python function calls. As such, they can be passed arbitrary arguments and return standard Python objects. # In[ ]: @python_app def multiply (a, b): return a * b print(multiply(5,9).result()) # As Parsl apps are potentially executed remotely they must contain all required dependencies in the function body. For example, if an app requires the time library it should import the library in the function. # In[ ]: @python_app def slow_hello (): import time time.sleep(5) return 'Hello World!' print(slow_hello().result()) # ### Bash Apps # # Parsl’s Bash app allows you to wrap execution of external applications from the command-line as you would in a Bash shell. It can also be used to execute Bash scripts directly. To define a Bash app the wrapped Python function must return the command-line string to be executed. # # As a first example of a Bash app let's use the Linux command `echo` to return the string 'Hello World!'. This function is made into a Bash App using the **@bash_app** decorator. # # Note, that in this case the `echo` command will print 'Hello World!' to stdout. In order to use this output we need to tell Parsl to capture stdout. This is done by specifying the `stdout` keyword argument in the app function. The same approach can be used to capture `stderr`. # # In[ ]: @bash_app def echo_hello(stdout='echo-hello.stdout', stderr='echo-hello.stderr'): return 'echo "Hello World!"' echo_hello().result() with open('echo-hello.stdout', 'r') as f: print(f.read()) # ### Passing data # # Parsl Apps can exchange data as Python objects (as shown above) or in the form of files. In order to enforce dataflow semantics, Parsl must track the data that is passed into and out of an App. To make Parsl aware of these dependencies the app function includes `inputs` and `outputs` keyword arguments. # # We first create three test files named hello1.txt, hello2.txt, and hello3.txt containing the text "hello 1", "hello 2", and "hello 3". # In[ ]: for i in range(3): with open(os.path.join(os.getcwd(), 'hello-%s.txt' % i), 'w') as f: f.write('hello %s\n' % i) # We then write an App that will concentate these files using `cat`. We pass in the list of hello files (`input`) and concatenate the text into a new file named all_hellos.txt (`output`). # In[ ]: @bash_app def cat(inputs=[], outputs=[]): return 'cat %s > %s' %(" ".join(inputs), outputs[0]) concat = cat(inputs=[os.path.join(os.getcwd(), 'hello-0.txt'), os.path.join(os.getcwd(), 'hello-1.txt'), os.path.join(os.getcwd(), 'hello-2.txt')], outputs=[os.path.join(os.getcwd(),'all_hellos.txt')]) # Open the concatenated file with open(concat.outputs[0].result(), 'r') as f: print(f.read()) # ## Futures # When a normal Python function is invoked, the Python interpreter waits for the function to complete execution and returns the results. In case of long running functions it may not be desirable to wait for completion, instead it is preferable that functions are executed asynchronously. Parsl provides such asynchronous behavior by returning a future in lieu of results. A future is essentially an object that allows Parsl to track the status of an asynchronous task so that it may, in the future, be interrogated to find the status, results, exceptions, etc. # # Parsl provides two types of futures: AppFutures and DataFutures. While related, these two types of futures enable subtly different workflow patterns, as we will see. # # # ### AppFutures # AppFutures are the basic building block upon which Parsl scripts are built. Every invocation of a Parsl app returns an AppFuture which may be used to manage execution of the app and control the workflow. # # Here we show how AppFutures are used to wait for the result of a Python App. # In[ ]: @python_app def hello (): import time time.sleep(5) return 'Hello World!' app_future = hello() # Check if the app_future is resolved print ('Done: %s' % app_future.done()) # Print the result of the app_future. Note: this # call will block and wait for the future to resolve print ('Result: %s' % app_future.result()) print ('Done: %s' % app_future.done()) # ### DataFutures # # While AppFutures represent the execution of an asynchronous app, the DataFuture represents the files it produces. Parsl’s dataflow model, in which data flows from one app to another via files, requires such a construct to enable apps to validate creation of required files and to subsequently resolve dependencies when input files are created. When invoking an app, Parsl requires that a list of output files be specified (using the `outputs` keyword argument). A DataFuture for each file is returned by the app when it is executed. Throughout execution of the app Parsl will monitor these files to 1) ensure they are created, and 2) pass them to any dependent apps. # In[ ]: # App that echos an input message to an output file @bash_app def slowecho(message, outputs=[]): return 'sleep 5; echo %s &> {outputs[0]}' % (message) # Call echo specifying the output file hello = slowecho('Hello World!', outputs=[os.path.join(os.getcwd(), 'hello-world.txt')]) # The AppFuture's outputs attribute is a list of DataFutures print(hello.outputs) # Also check the AppFuture print ('Done: %s' % hello.done()) # Print the contents of the output DataFuture when complete with open(hello.outputs[0].result(), 'r') as f: print(f.read()) # Now that this is complete, check the DataFutures again, and the Appfuture print(hello.outputs) print ('Done: %s' % hello.done()) # ## Data Management # # Parsl is designed to enable implementation of dataflow patterns. These patterns enable workflows to be defined in which the data passed between apps manages the flow of execution. Dataflow programming models are popular as they can cleanly express, via implicit parallelism, the concurrency needed by many applications in a simple and intuitive way. # # ### Files # # Parsl’s file abstraction abstracts local access to a file. It therefore requires only the file path to be defined. Irrespective of where the script, or its apps are executed, Parsl uses this abstraction to access that file. When referencing a Parsl file in an app, Parsl maps the object to the appropriate access path. # In[ ]: from parsl.data_provider.files import File # App that copies the contents of 1 or more files to another file @bash_app def copy(inputs=[], outputs=[]): return 'cat %s &> %s' % (inputs[0], outputs[0]) # Create a test file open(os.path.join(os.getcwd(), 'cat-in.txt'), 'w').write('Hello World!\n') # Create Parsl file objects parsl_infile = File(os.path.join(os.getcwd(), 'cat-in.txt'),) parsl_outfile = File(os.path.join(os.getcwd(), 'cat-out.txt'),) # Call the copy app with the Parsl file copy_future = copy(inputs=[parsl_infile], outputs=[parsl_outfile]) # Read what was redirected to the output file with open(copy_future.outputs[0].result(), 'r') as f: print(f.read()) # ### Remote Files # # Parsl is also able to represent remotely accessible files. In this case, you can instantiate a file object using the remote location of the file. Parsl will implictly stage the file to the execution environment before executing any dependent apps. Parsl will also translate the location of the file into a local file path such that any dependent apps can access the file in the same way as a local file. Parsl supports files that are accessible via Globus, FTP, and HTTP. # # Here we create a File object using a publicly accessible file with random numbers. We can pass this file to the `sort_numbers` app in the same way we would a local file. # In[ ]: from parsl.data_provider.files import File @python_app def sort_numbers(inputs=[]): with open(inputs[0].filepath, 'r') as f: strs = [n.strip() for n in f.readlines()] strs.sort() return strs unsorted_file = File('https://raw.githubusercontent.com/Parsl/parsl-tutorial/master/input/unsorted.txt') f = sort_numbers(inputs=[unsorted_file]) print (f.result()) # ## Composing a workflow # # Now that we understand all the building blocks, we can create workflows with Parsl. Unlike other workflow systems, Parsl creates implicit workflows based on the passing of control or data between Apps. The flexibility of this model allows for the creation of a wide range of workflows from sequential through to complex nested, parallel workflows. As we will see below, a range of workflows can be created by passing AppFutures and DataFutures between Apps. # # # ### Sequential workflow # # Simple sequential or procedural workflows can be created by passing an AppFuture from one task to another. The following example shows one such workflow, which first generates a random number and then writes it to a file. # # In[ ]: # App that generates a random number @python_app def generate(limit): from random import randint return randint(1,limit) # App that writes a message to a file @bash_app def save(message, outputs=[]): return 'echo %s &> {outputs[0]}' % (message) # Generate the random number message = generate(10) print('Random number: %s' % message.result()) # Save the random number to a file saved = save(message, outputs=[os.path.join(os.getcwd(), 'sequential-output.txt')]) # Print the output file with open(saved.outputs[0].result(), 'r') as f: print('File contents: %s' % f.read()) # ### Parallel workflow # # The most common way that Parsl Apps are executed in parallel is via looping. The following example shows how a simple loop can be used to create many random numbers in parallel. # # In[ ]: # App that generates a random number @python_app def generate(limit): from random import randint return randint(1,limit) # Generate 5 random numers rand_nums = [] for i in range(5): rand_nums.append(generate(10)) # Wait for all apps to finish and collect the results outputs = [i.result() for i in rand_nums] # Print results print(outputs) # ### Parallel dataflow # # Parallel dataflows can be developed by passing data between Apps. In this example we create a set of files, each with a random number, we then concatenate these files into a single file and compute the sum of all numbers in that file. In the first two Apps files are exchanged. The final App returns the sum as a Python integer. # # # In[ ]: # App that generates a random number @bash_app def generate(outputs=[]): return "echo $(( RANDOM )) &> {outputs[0]}" # App that concatenates input files into a single output file @bash_app def concat(inputs=[], outputs=[], stdout="stdout.txt", stderr='stderr.txt'): return "cat {0} > {1}".format(" ".join(inputs), outputs[0]) # App that calculates the sum of values in a list of input files @python_app def total(inputs=[]): total = 0 with open(inputs[0], 'r') as f: for l in f: total += int(l) return total # Create 5 files with random numbers output_files = [] for i in range (5): output_files.append(generate(outputs=[os.path.join(os.getcwd(), 'random-%s.txt' % i)])) # Concatenate the files into a single file cc = concat(inputs=[i.outputs[0] for i in output_files], outputs=[os.path.join(os.getcwd(), 'all.txt')]) # Calculate the sum of the random numbers total = total(inputs=[cc.outputs[0]]) print (total.result()) # ## Examples # # ### Monte Carlo workflow # # Many scientific applications use the [monte-carlo method](https://en.wikipedia.org/wiki/Monte_Carlo_method#History) to compute results. # # If a circle with radius $r$ is inscribed inside a square with side length $2r$ then the area of the circle is $\pi r^2$ and the area of the square is $(2r)^2$. Thus, if $N$ uniformly distributed random points are dropped within the suqare then approximately $N\pi/4$ will be inside the circle. # # Each call to the function `pi()` is executed independently and in parallel. The `avg_three()` app is used to compute the average of the futures that were returned from the `pi()` calls. # # The dependency chain looks like this: # # ``` # App Calls pi() pi() pi() # \ | / # Futures a b c # \ | / # App Call avg_points() # | # Future avg_pi # ``` # In[ ]: # App that estimates pi by placing points in a box @python_app def pi(total): import random # Set the size of the box (edge length) in which we drop random points edge_length = 10000 center = edge_length / 2 c2 = center ** 2 count = 0 for i in range(total): # Drop a random point in the box. x,y = random.randint(1, edge_length),random.randint(1, edge_length) # Count points within the circle if (x-center)**2 + (y-center)**2 < c2: count += 1 return (count*4/total) # App that computes the average of the values @python_app def avg_points(a, b, c): return (a + b + c)/3 # Estimate three values for pi a, b, c = pi(10**6), pi(10**6), pi(10**6) # Compute the average of the three estimates avg_pi = avg_points(a, b, c) # Print the results print("A: {0:.5f} B: {1:.5f} C: {2:.5f}".format(a.result(), b.result(), c.result())) print("Average: {0:.5f}".format(avg_pi.result())) # ## Execution and configuration # # Parsl is designed to support arbitrary execution providers (e.g., PCs, clusters, supercomputers) and execution models (e.g., threads, pilot jobs, etc.). Instead, the configuration used to run the script tells Parsl how to execute apps on the desired environment. Parsl provides a high level abstraction, called a Block, for describing the resource configuration for a particular app or script. # # Information about the different execution providers and executors supported is included in the [Parsl documentation](https://parsl.readthedocs.io/en/latest/userguide/execution.html). # # Above we used built-in configuration for running with threads. Below we will illustrate how to create a config for different environments. # # ### Local execution with threads # # As we saw above, we can configure Parsl to execute apps on a local thread pool. This is a good way to parallelize execution on a local PC. The configuration object defines the executors that will be used as well as a range of other options such as authentication method (e.g., if using SSH), checkpoint files, and executor specific configuration. In the case of threads we define the maximum number of threads to be used. # In[ ]: from parsl.config import Config from parsl.executors.threads import ThreadPoolExecutor local_threads = Config( executors=[ ThreadPoolExecutor( max_threads=8, label='local_threads' ) ] ) # ### Local execution with pilot jobs # # We can also define a configuration that uses Parsl's HighThroughputExecutor. In this mode, pilot jobs are used to manage the submission. Parsl creates an interchange to manage execution and deploys one or more workers to execute tasks. The following config will instantiate this infrastructure locally, it can be extended to include a remote provider (e.g., Cori, Theta, etc.) for execution. # In[ ]: from parsl.providers import LocalProvider from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor local_htex = Config( executors=[ HighThroughputExecutor( label="htex_Local", worker_debug=True, cores_per_worker=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=1, ), ) ], strategy=None, ) # ### Running a workflow using a configuration # # We can now run the same workflow using either of the two configurations defined above. # # First we clear the current configuration and then load one of the two configurations we defined above. You can change these configurations back and forth to see the same workflow executed using different execution methods. You will notice that executing using the HighThroughputExecutor takes longer as it has to start interchange/worker processes locally before executing the tasks. # # Note: the ''parsl-workflows'' notebook shows how to execute a Parsl workflow on a remote resource. # In[ ]: parsl.clear() #parsl.load(local_threads) parsl.load(local_htex) # In[ ]: @bash_app def generate(outputs=[]): return "echo $(( RANDOM )) &> {outputs[0]}" @bash_app def concat(inputs=[], outputs=[], stdout="stdout.txt", stderr='stderr.txt'): return "cat {0} > {1}".format(" ".join(inputs), outputs[0]) @python_app def total(inputs=[]): total = 0 with open(inputs[0], 'r') as f: for l in f: total += int(l) return total # Create 5 files with random numbers output_files = [] for i in range (5): output_files.append(generate(outputs=[os.path.join(os.getcwd(), 'random-%s.txt' % i)])) # Concatenate the files into a single file cc = concat(inputs=[i.outputs[0].filepath for i in output_files], outputs=[os.path.join(os.getcwd(), 'combined.txt')]) # Calculate the sum of the random numbers total = total(inputs=[cc.outputs[0]]) print (total.result())