Ben Clifford benc@hawaga.org.uk
This tutorial is about doing things with dependencies that are more advanced than in Parsl introductory material. It is intended both for presentation at ParslFest2020 (https://parsl-project.org/parslfest2020.html) but also as something you can work through independently.
All of the use cases and apps in this tutorial are very simplified, but they all have origin in a real workflow used by the DESC workflow that is the subject of another ParslFest session: the science has been removed but the dependency structures, which are the topic of this talk, have been kept.
Most of this tutorial will work with Parsl 1.0.0, the latest released version at the time of writing. However, one section needs an in-development feature which you can get from URL in that section's header.
First, I'll initialise parsl, ready for the rest of the tutorial. I'll load a local configuration with plenty of workers.
import parsl
from parsl.config import Config
parsl.clear()
parsl.load(
Config(
executors=[parsl.ThreadPoolExecutor(max_threads=10)]
))
<parsl.dataflow.dflow.DataFlowKernel at 0x7fae1cb1fcc0>
First, let's look at simple dependencies between apps in Parsl.
Here's a simple application, which will return the number 10
but will also print some messages (so we can see it run) and delay for a few seconds (to help us see things happening over time):
# here's a simple app...
@parsl.python_app
def first():
import time
print("starting first")
time.sleep(5)
print("ending first")
return 10
This app looks like a normal Python function, but with a decorator to turn it into a Parsl app.
That means when I invoke it, I won't get back the number 10... instead I'll get back a future...
fut = first()
type(fut)
starting first
parsl.dataflow.futures.AppFuture
ending first
... and I can ask that future if it is done yet - true/false and to wait for the result.
print(fut.done())
fut.result()
True
10
That's one app - it doesn't take any input and produces some output.
Here's another one that takes an input value and doubles it, along with a simple demonstration:
@parsl.python_app
def double(n):
print("executing double")
return n*2
double(4).result()
executing double
8
So far we haven't seen any dependency handling. But we can string these two apps together like this, and watch them execute. This executes pretty much just like a regular Python expression, with Parsl converting the Future that comes out of first
into the contained value as it gets passed into double.
fut = double(first())
fut.result()
starting first ending first executing double
20
So far, I haven't shown anything that couldn't be written in regular Python, almost identically but without the app decorators.
Here's another app, total
, which will add up all values that it is passed:
@parsl.python_app
def total(*args):
print("Totalling...")
accumulator = 0
for x in args:
accumulator += x
return accumulator
total(1,10,2).result()
Totalling...
13
Now if we run this with a couple of first
invocations, we can see some more interesting parsl stuff happening:
fut = total(first(), first())
print(f"total future is {fut}")
fut.result()
starting first starting first total future is <AppFuture super=<AppFuture at 0x7fae1c96aa90 state=pending>> ending first ending first Totalling...
20
What happened is that both first
calls ran at the same time, and then total
ran after those two had completed.
The two first
calls produced futures, which are passed into total
, which itself returns a future - stored in fut
. We get that final future right away without having to wait for any of the three apps to actually execute. So the rest of our main workflow code can keep executing without any big delay.
Remember in a "real" workflow, those three apps might take a long time - days - to execute, but the Future
is returned immediately.
That's important in a more complicated workflow, because we might want the code to go off and do other things - for example, launch more apps in a for
-loop.
.result()
is your enemy¶If we weren't going to use Parsl dependencies, we could instead explicitly get the results passing from first
to sum
:
a_fut = first()
b_fut = first()
a = a_fut.result()
b = b_fut.result()
fut = total(a,b)
print(f"total future is {fut}")
fut.result()
starting first starting first ending first ending first Totalling...total future is <AppFuture super=<AppFuture at 0x7fae1c973908 state=pending>>
20
Although this above example has the same concurrency (two first
apps run together, then total
runs after), the final total
Future is not returned until after the two first
apps have run. Remember this could be days later.
If I want a more complicated workflow, perhaps with this code wrapped in a function, then some differences start to appear. Here are the two implementations above, wrapped in functions and called twice:
def add_firsts_a():
return total(first(), first())
total(add_firsts_a(), add_firsts_a()).result()
starting first starting first starting first starting first ending first ending first Totalling... ending first ending first Totalling... Totalling...
40
def add_firsts_b():
a_fut = first()
b_fut = first()
a = a_fut.result()
b = b_fut.result()
return total(a,b)
total(add_firsts_b(), add_firsts_b()).result()
starting first starting first ending first ending first Totalling... starting first starting first ending first ending first Totalling... Totalling...
40
With add_firsts_a
, all first
invocations run at once. With add_firsts_b
, only two of the first
invocations run; then the next two are invoked later. The same results comes out at the end, but the concurrency is unnecessarily limited.
Rules of thumb:
.result()
is your enemy. Avoid it apart from at the very end.In the previous examples, the dependency structure was fixed in the Python source code: the dependency structure was completely fixed by the way in which apps were invoked in the Python source code.
In this section, I'm going to make a more complicated workflow: a Fantasy Sports League processor.
The idea is that you can assemble fantasy teams of three real players, and the workflow will compute for each team how well it would have performed, based on analysis of the players real performance.
First, here is the list of available players:
players = ["Dugnutt", "Butch", "Bannister", "Jackson", "Ennis-Hill", "Abi"]
@parsl.python_app
def analyze_player(name):
print(f"start analyzing {name}")
import time
n = len(name)
time.sleep(n)
print(f"end analyzing {name}")
return n
analyze_player("Abi").result()
start analyzing Abi end analyzing Abi
3
def analyze_all_players() -> dict:
player_futures = {}
for p in players:
player_futures[p] = analyze_player(p)
return player_futures
If I run analyze_all_players() in the cell below, I'll see all 5 player analyses start concurrently, and immediately get back futures for all 5 players in a dict
. Then over the next 10 seconds, I'll see the analyses complete.
Remember returning futures right away is an important thing to do for concurrency.
analyze_all_players()
start analyzing Dugnutt start analyzing Butch start analyzing Bannister start analyzing Jackson start analyzing Ennis-Hill start analyzing Abi
{'Dugnutt': <AppFuture super=<AppFuture at 0x7fae1c1020f0 state=pending>>, 'Butch': <AppFuture super=<AppFuture at 0x7fae1c1024e0 state=pending>>, 'Bannister': <AppFuture super=<AppFuture at 0x7fae1c1029e8 state=pending>>, 'Jackson': <AppFuture super=<AppFuture at 0x7fae1c973080 state=pending>>, 'Ennis-Hill': <AppFuture super=<AppFuture at 0x7fae1c973780 state=pending>>, 'Abi': <AppFuture super=<AppFuture at 0x7fae1c973cc0 state=pending>>}
end analyzing Abi end analyzing Butch end analyzing Dugnutt end analyzing Jackson end analyzing Bannister end analyzing Ennis-Hill
Now lets define some teams. Because this is a fantasy league, a player can appear in multiple teams.
teams = [
("Team A", ["Dugnutt", "Bannister", "Ennis-Hill"]),
("Team B", ["Abi", "Butch", "Dugnutt"]),
("Team C", ["Butch", "Jackson", "Dugnutt"])
]
So for each team, I want to compute a team score based on combining the abilities of the three players. I'm going to use a simple sum
to do this.
def analyze_teams(all_player_futs):
final_results = []
for (team_name, ps) in teams:
player_futs = []
for p in ps:
player_futs.append(all_player_futs[p])
final_results.append( (team_name, total(*player_futs)))
print(f"prepared team {team_name}")
print("All teams prepared")
return final_results
all_player_futs = analyze_all_players()
rs = analyze_teams(all_player_futs)
start analyzing Dugnutt start analyzing Butch start analyzing Bannister start analyzing Jacksonstart analyzing Ennis-Hill start analyzing Abiprepared team Team A prepared team Team B prepared team Team C All teams prepared end analyzing Abi end analyzing Butch end analyzing Dugnutt Totalling... end analyzing Jackson Totalling... end analyzing Bannister end analyzing Ennis-Hill Totalling...
for (n, f) in rs:
if f.done():
print(f"Team {n} has score {f.result()}")
else:
print(f"Team {n} no score yet")
Team Team A has score 26 Team Team B has score 15 Team Team C has score 19
Dependency relates features of this workflow:
Note: This section won't work with parsl 1.0.0, but get https://github.com/Parsl/parsl/pull/1860 if you would like to try it.
In earlier sections, I've used functions to split up pieces of the workflow, quite like functions would be used in normal code. The important thing there is that when a function is called, it must return fast, returning Futures for anything that will take a long time.
There are cases where this isn't enough though. Python functions can take Futures as parameters and use those Futures to attach dependencies to any parsl apps that they launch.
But they can't inspect any value that will eventually appear inside any of those Futures, without using .result()
, our enemy.
If function can't inspect the value inside a future, then they can't change what they do (for example, which apps are launched) based on those values. This is important when one stage of a workflow needs to decide what to do based on an earlier stage.
To address that, there is a new kind of Parsl app in development, called a @join_app
. This is a cross between a regular Python function (it will run in the submitting process, and can launch apps), and a python app (it is able to wait for dependencies before executing, and can see the values inside those dependencies).
The example I will use here is recursive computation of the Fibonacci sequence.
Here's how I can write this using @join_app
. See how the choice of apps to run is made based on the value of n
, which might come as a dependency from an earlier workflow step rather than be immediately available:
@parsl.join_app
def fibonacci(n):
if n == 0:
return total()
elif n == 1:
return total(1)
else:
return total(fibonacci(n-1), fibonacci(n-2))
fibonacci(3).result()
Totalling... Totalling... Totalling... Totalling... Totalling...
2
Now I can make this fibonacci computation depend on a value coming from an earlier workflow step, rather than a value that is available immediately, but even so the final result Future in fut
is available immediately.
fut = fibonacci(first())
print("Waiting for result")
fut.result()
starting first Waiting for result ending first Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling...Totalling... Totalling...Totalling... Totalling...Totalling... Totalling... Totalling...Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling...Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling... Totalling...Totalling... Totalling... Totalling...Totalling... Totalling...Totalling... Totalling... Totalling... Totalling...Totalling... Totalling...Totalling... Totalling...Totalling... Totalling...Totalling... Totalling... Totalling... Totalling...Totalling... Totalling...Totalling... Totalling... Totalling... Totalling... Totalling... Totalling...
55
In this example, the important thing that has been enabled is that the fibonacci code is allowed to use an if
statement to decide which apps it is going to launch (total
in two cases, and total
along with 2 x fibonacci
in the third case).
Future
objects¶Now I'm going to dig deeper into the mechanics of Futures.
The Future objects coming from invoking a parsl app have their values eventually set by the guts of parsl, when an app completes... but they're actually instances of the standard python Futures class. https://docs.python.org/3/library/concurrent.futures.html#future-objects
(Futures also appear in the executor API and you can see that in Yadu's ParslFest talk).
Parsl apps can use any future as a dependency, not just futures from other invoking parsl apps.
Here's a simple example of non-parsl Futures: I'll create a Future object that's not attached to any executing code, and then I'll use it as a dependency for a simple app, increment
.
from concurrent.futures import Future
f = Future()
f
<Future at 0x7fae1476aa90 state=pending>
Now f contains a Future
that will just sit there passively. Unless I do something to it, it will never get a value.
I can use that as a dependency for the total
app:
f2 = total(1, f)
f2
<AppFuture super=<AppFuture at 0x7fae1476ab00 state=pending>>
Because nothing is running to make f
complete, that means the total
app will never run either...
print(f.done())
print(f2.done())
False False
... until something comes along and sets the result of the Future ...
f.set_result(10)
Totalling...
f2.result()
11
... at which point total
ran (because its input dependency had a value), and so the result of total
is now also available in f2
.
Parsl doesn't care how f
got its value... just that it eventually did get one.
This is a way that I've prototyped different dependency behaviour without having to build it into the parsl codebase. By making Futures that behave as I want to, I can prototype the behaviour in a few lines of rough code without having to work on the core parsl code.
As a trade-off, almost all of the other benefits that parsl brings (remote execution, retries, monitoring information, checkpointing) are lost for this particular step - so it's definitely a prototyping exercise rather than solid parsl functionality.
In this section, I'll try to solve a particular problem:
Sometimes an app raises an Exception instead of completing sucessfully. With normal Parsl behaviour, that causes all dependent apps to fail too. But what I'd like in this case is that the dependent app still runs, and gets to see that some of its upstream apps might have failed.
In this example, I would like an app that takes an average of the numbers returned by its dependencies - but if any dependencies fail, then we ignore that dependency rather than abort the whole computation.
Here's how it doesn't work right now:
@parsl.python_app
def rand():
import random
return random.random()
@parsl.python_app
def fail():
raise RuntimeError("Parsl demo failure")
@parsl.python_app
def avg(*args):
s = 0
c = 0
for x in args:
c += 1
s += x
return (s/c, c)
avg(50,60).result()
(55.0, 2)
Here's a small workflow with rand
and avg
put together:
avg(rand(),rand(), rand()).result()
(0.5941547596900922, 3)
... but this fails (because there is a failing dependency):
avg(rand(), fail(), rand()).result()
--------------------------------------------------------------------------- DependencyError Traceback (most recent call last) <ipython-input-38-48dd847f104a> in <module> ----> 1 avg(rand(), fail(), rand()).result() /usr/lib/python3.7/concurrent/futures/_base.py in result(self, timeout) 423 raise CancelledError() 424 elif self._state == FINISHED: --> 425 return self.__get_result() 426 427 self._condition.wait(timeout) /usr/lib/python3.7/concurrent/futures/_base.py in __get_result(self) 382 def __get_result(self): 383 if self._exception: --> 384 raise self._exception 385 else: 386 return self._result ~/parsl/src/parsl/parsl/dataflow/dflow.py in handle_exec_update(self, task_id, future) 270 271 try: --> 272 res = self._unwrap_remote_exception_wrapper(future) 273 274 except Exception as e: ~/parsl/src/parsl/parsl/dataflow/dflow.py in _unwrap_remote_exception_wrapper(future) 417 @staticmethod 418 def _unwrap_remote_exception_wrapper(future: Future) -> Any: --> 419 result = future.result() 420 if isinstance(result, RemoteExceptionWrapper): 421 result.reraise() /usr/lib/python3.7/concurrent/futures/_base.py in result(self, timeout) 423 raise CancelledError() 424 elif self._state == FINISHED: --> 425 return self.__get_result() 426 427 self._condition.wait(timeout) /usr/lib/python3.7/concurrent/futures/_base.py in __get_result(self) 382 def __get_result(self): 383 if self._exception: --> 384 raise self._exception 385 else: 386 return self._result DependencyError: Dependency failure for task 439 with failed dependencies from tasks [437]
What i want is that dependency failure to not happen, and for avg
to see the failure and be able to change its behaviour.
I'm going to make an adapter that passes through normal results, but that turns a Future
with an exception into one that returns a regular value. Then I'm going to use that as an adapter.
class ExceptionHidingFuture(Future):
def __init__(self, parent: Future):
super().__init__()
parent.add_done_callback(self.cb)
def cb(self, parent):
if parent.exception():
self.set_result("LOL")
else:
self.set_result(parent.result())
First, what happens without this adapter:
f = Future()
f.set_exception(RuntimeError("DEMO"))
f.result()
--------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-44-007d59320aa9> in <module> ----> 1 f.result() /usr/lib/python3.7/concurrent/futures/_base.py in result(self, timeout) 423 raise CancelledError() 424 elif self._state == FINISHED: --> 425 return self.__get_result() 426 427 self._condition.wait(timeout) /usr/lib/python3.7/concurrent/futures/_base.py in __get_result(self) 382 def __get_result(self): 383 if self._exception: --> 384 raise self._exception 385 else: 386 return self._result RuntimeError: DEMO
And now the same again, but with the new adapter in the dependency chain (once with an exception showing the new exception handling code, and once with a normal result showing passthrough behaviour):
f = Future()
f2 = ExceptionHidingFuture(f)
f.set_exception(RuntimeError("DEMO"))
f2.result()
'LOL'
f = Future()
f2 = ExceptionHidingFuture(f)
f.set_result(3)
f2.result()
3
So now I can go back to my earlier attempt to average some random numbers and failures, wrapping the ExceptionHidingFuture around all of my dependencies:
First add some functionality into avg
to make it handle failed dependencies:
@parsl.python_app
def avg(*args):
s = 0
c = 0
for x in args:
if not isinstance(x, str):
c += 1
s += x
return (s/c, c)
avg(ExceptionHidingFuture(rand()),
ExceptionHidingFuture(fail()),
ExceptionHidingFuture(rand())
).result()
(0.7574856902166225, 2)
So with a 9-line definition, this prototypes some differently dependency handling logic...
To become real parsl functionality that would probably need to be wired in more deeply into the parsl dataflow kernel... remember that this prototype loses a bunch of parsl's useful non-dependency functionality.
Summary:
A Parsl app's Future also holds a reference to the task record for the app invocation, which can be interesting to poke at sometimes. For the purposes of this talk, the most interesting field is depends
which shows that this task had two dependencies which both returned an int
- those are the first
invocations.
fut.task_def
Sometimes parsl will add in more dependencies automatically on app invocation, due to file staging.
Here's an app that counts lines in a file:
@parsl.python_app
def count_lines(file):
return len(open(file).readlines())
If the app is invoked on a local file, there are no dependencies...
fut = count_lines(parsl.File("/etc/passwd"))
fut.result()
fut.task_def['depends']
... but if it is invoked on an HTTP URL, then a staging task is implicitly launched and added in as a dependency:
fut2 = count_lines(parsl.File("http://www.cqx.ltd.uk/index.html"))
fut2.result()
fut2.task_def['depends']
In general, data staging providers can insert arbitrary sub-workflows before and after app execution.