Dependencies beyond the DAG

Ben Clifford [email protected]

This tutorial is about doing things with dependencies that are more advanced than in Parsl introductory material. It is intended both for presentation at ParslFest2020 (https://parsl-project.org/parslfest2020.html) but also as something you can work through independently.

All of the use cases and apps in this tutorial are very simplified, but they all have origin in a real workflow used by the DESC workflow that is the subject of another ParslFest session: the science has been removed but the dependency structures, which are the topic of this talk, have been kept.

Most of this tutorial will work with Parsl 1.0.0, the latest released version at the time of writing. However, one section needs an in-development feature which you can get from URL in that section's header.

Getting started

First, I'll initialise parsl, ready for the rest of the tutorial. I'll load a local configuration with plenty of workers.

In [1]:
import parsl
from parsl.config import Config
parsl.clear()
parsl.load(
    Config(
        executors=[parsl.ThreadPoolExecutor(max_threads=10)]
    ))
Out[1]:
<parsl.dataflow.dflow.DataFlowKernel at 0x7f0c0c842c88>

1. Introduction to dependencies

First, let's look at simple dependencies between apps in Parsl.

Here's a simple application, which will return the number 10 but will also print some messages (so we can see it run) and delay for a few seconds (to help us see things happening over time):

In [15]:
# here's a simple app...
@parsl.python_app
def first():
    import time
    print("starting first")
    time.sleep(5)
    print("ending first")
    return 10

This app looks like a normal Python function, but with a decorator to turn it into a Parsl app.

That means when I invoke it, I won't get back the number 10... instead I'll get back a future...

In [16]:
fut = first()
type(fut)
starting first
Out[16]:
parsl.dataflow.futures.AppFuture

... and I can ask that future if it is done yet - true/false and to wait for the result.

In [17]:
print(fut.done())
fut.result()
False
ending first
Out[17]:
10

That's one app - it doesn't take any input and produces some output.

Here's another one that takes an input value and doubles it, along with a simple demonstration:

In [18]:
@parsl.python_app
def double(n):
    print("executing double")
    return n*2

double(4).result()
executing double
Out[18]:
8

So far we haven't seen any dependency handling. But we can string these two apps together like this, and watch them execute. This executes pretty much just like a regular Python expression, with Parsl converting the Future that comes out of first into the contained value as it gets passed into double.

In [19]:
fut = double(first())
fut.result()
starting first
ending first
executing double
Out[19]:
20

So far, I haven't shown anything that couldn't be written in regular Python, almost identically but without the app decorators.

Here's another app, total, which will add up all values that it is passed:

In [20]:
@parsl.python_app
def total(*args):
    print("Totalling...")
    accumulator = 0
    for x in args:
        accumulator += x
    return accumulator

total(1,10,2).result()
Totalling...
Out[20]:
13

Now if we run this with a couple of first invocations, we can see some more interesting parsl stuff happening:

In [21]:
fut = total(first(), first())
print(f"total future is {fut}")
fut.result()
starting first
starting firsttotal future is <AppFuture super=<AppFuture at 0x7f0c0c624748 state=pending>>

ending first
ending first
Totalling...
Out[21]:
20

What happened is that both first calls ran at the same time, and then total ran after those two had completed.

The two first calls produced futures, which are passed into total, which itself returns a future - stored in fut. We get that final future right away without having to wait for any of the three apps to actually execute. So the rest of our main workflow code can keep executing without any big delay.

Remember in a "real" workflow, those three apps might take a long time - days - to execute, but the Future is returned immediately.

That's important in a more complicated workflow, because we might want the code to go off and do other things - for example, launch more apps in a for-loop.

2. Composing subworkflows using Python functions... .result() is your enemy

If we weren't going to use Parsl dependencies, we could instead explicitly get the results passing from first to sum:

In [23]:
a_fut = first()
b_fut = first()
a = a_fut.result()
b = b_fut.result()
fut = total(a,b)
print(f"total future is {fut}")
fut.result()
starting first
starting first
ending first
ending first
Totalling...
total future is <AppFuture super=<AppFuture at 0x7f0c0c69df28 state=finished returned int>>
Out[23]:
20

Although this above example has the same concurrency (two first apps run together, then total runs after), the final total Future is not returned until after the two first apps have run. Remember this could be days later.

If I want a more complicated workflow, perhaps with this code wrapped in a function, then some differences start to appear. Here are the two implementations above, wrapped in functions and called twice:

In [26]:
def add_firsts_a():
    return total(first(), first())

total(add_firsts_a(), add_firsts_a()).result()
starting first
starting first
starting first
starting first
ending first
ending first
ending first
ending first
Totalling...
Totalling...
Totalling...
Out[26]:
40
In [28]:
def add_firsts_b():
    a_fut = first()
    b_fut = first()
    a = a_fut.result()
    b = b_fut.result()
    return total(a,b)

total(add_firsts_b(), add_firsts_b()).result()
starting first
starting first
ending first
ending first
Totalling...
starting firststarting first

ending first
ending first
Totalling...
Totalling...
Out[28]:
40

With add_firsts_a, all first invocations run at once. With add_firsts_b, only two of the first invocations run; then the next two are invoked later. The same results comes out at the end, but the concurrency is unnecessarily limited.

Rules of thumb:

  • .result() is your enemy. Avoid it apart from at the very end.
  • Return Futures as fast as you can so that other workflow code can run.

3. Dynamically assembling dependencies in a more complicated workflow

In the previous examples, the dependency structure was fixed in the Python source code: the dependency structure was completely fixed by the way in which apps were invoked in the Python source code.

In this section, I'm going to make a more complicated workflow: a Fantasy Sports League processor.

The idea is that you can assemble fantasy teams of three real players, and the workflow will compute for each team how well it would have performed, based on analysis of the players real performance.

First, here is the list of available players:

In [29]:
players = ["Dugnutt", "Butch", "Bannister", "Jackson", "Ennis-Hill", "Abi"]
In [30]:
@parsl.python_app
def analyze_player(name):
    print(f"start analyzing {name}")
    import time
    n = len(name)
    time.sleep(n)
    print(f"end analyzing {name}")
    return n

analyze_player("Abi").result()
start analyzing Abi
end analyzing Abi
Out[30]:
3
In [31]:
def analyze_all_players() -> dict:
    player_futures = {}
    for p in players:
        player_futures[p] = analyze_player(p)
    return player_futures

If I run analyze_all_players() in the cell below, I'll see all 5 player analyses start concurrently, and immediately get back futures for all 5 players in a dict. Then over the next 10 seconds, I'll see the analyses complete.

Remember returning futures right away is an important thing to do for concurrency.

In [32]:
analyze_all_players()
start analyzing Dugnutt
start analyzing Butchstart analyzing Bannister

start analyzing Jackson
start analyzing Ennis-Hill
start analyzing Abi
Out[32]:
{'Dugnutt': <AppFuture super=<AppFuture at 0x7f0c0c6343c8 state=pending>>,
 'Butch': <AppFuture super=<AppFuture at 0x7f0c0c6345f8 state=pending>>,
 'Bannister': <AppFuture super=<AppFuture at 0x7f0c0c634b00 state=pending>>,
 'Jackson': <AppFuture super=<AppFuture at 0x7f0c0c634e10 state=pending>>,
 'Ennis-Hill': <AppFuture super=<AppFuture at 0x7f0c0c63f080 state=pending>>,
 'Abi': <AppFuture super=<AppFuture at 0x7f0c0c63f1d0 state=pending>>}
end analyzing Abi
end analyzing Butch
end analyzing Dugnutt
end analyzing Jackson
end analyzing Bannister
end analyzing Ennis-Hill

Now lets define some teams. Because this is a fantasy league, a player can appear in multiple teams.

In [33]:
teams = [
    ("Team A", ["Dugnutt", "Bannister", "Ennis-Hill"]),
    ("Team B", ["Abi", "Butch", "Dugnutt"]),
    ("Team C", ["Butch", "Jackson", "Dugnutt"])
]

So for each team, I want to compute a team score based on combining the abilities of the three players. I'm going to use a simple sum to do this.

In [34]:
def analyze_teams(all_player_futs):
    final_results = []
    for (team_name, ps) in teams:
        player_futs = []
        for p in ps:
            player_futs.append(all_player_futs[p])
        final_results.append( (team_name,  total(*player_futs))) 
        print(f"prepared team {team_name}")
    print("All teams prepared")
    return final_results
In [35]:
all_player_futs = analyze_all_players()
rs = analyze_teams(all_player_futs)
start analyzing Dugnutt
start analyzing Butch
start analyzing Bannister
start analyzing Jackson
start analyzing Ennis-Hill
start analyzing Abi
prepared team Team A
prepared team Team B
prepared team Team C
All teams prepared
end analyzing Abi
end analyzing Butch
end analyzing Dugnutt
Totalling...
end analyzing Jackson
Totalling...
end analyzing Bannister
end analyzing Ennis-Hill
Totalling...
In [36]:
for (n, f) in rs:
    if f.done():
        print(f"Team {n} has score {f.result()}")
    else:
        print(f"Team {n} no score yet")
Team Team A has score 26
Team Team B has score 15
Team Team C has score 19

Dependency relates features of this workflow:

  • Futures can be stored in some more complicated Python data structures: dictionaries and lists
  • A simple implementation might wait for all the player processing to complete before moving onto team assembly. The above implementation makes team assembly only depend of the relevant players.
  • The means it can compute team assemblies as soon as the relevant players are completed - that can increase efficiency of resource usage, because more tasks are available to run soner.
  • As a special case of that, even if some players fail to compute, can compute all the teams which don't involve them - computation can go "deep" into the workflow on the successful branches, even if can't process the full "width". Maybe still valuable final science results there, even if it doesn't compute everything.

4. @join_app (feature in-development)

Note: This section won't work with parsl 1.0.0, but get https://github.com/Parsl/parsl/pull/1860 if you would like to try it.

In earlier sections, I've used functions to split up pieces of the workflow, quite like functions would be used in normal code. The important thing there is that when a function is called, it must return fast, returning Futures for anything that will take a long time.

There are cases where this isn't enough though. Python functions can take Futures as parameters and use those Futures to attach dependencies to any parsl apps that they launch.

But they can't inspect any value that will eventually appear inside any of those Futures, without using .result(), our enemy.

If function can't inspect the value inside a future, then they can't change what they do (for example, which apps are launched) based on those values. This is important when one stage of a workflow needs to decide what to do based on an earlier stage.

To address that, there is a new kind of Parsl app in development, called a @join_app. This is a cross between a regular Python function (it will run in the submitting process, and can launch apps), and a python app (it is able to wait for dependencies before executing, and can see the values inside those dependencies).

The example I will use here is recursive computation of the Fibonacci sequence.

Here's how I can write this using @join_app. See how the choice of apps to run is made based on the value of n, which might come as a dependency from an earlier workflow step rather than be immediately available:

In [39]:
@parsl.join_app
def fibonacci(n):
    if n == 0:
        return total()
    elif n == 1:
        return total(1)
    else:
        return total(fibonacci(n-1), fibonacci(n-2))
In [40]:
fibonacci(3).result()
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Out[40]:
2

Now I can make this fibonacci computation depend on a value coming from an earlier workflow step, rather than a value that is available immediately, but even so the final result Future in fut is available immediately.

In [41]:
fut = fibonacci(first())
print("Waiting for result")
fut.result()
starting firstWaiting for result

ending first
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...Totalling...
Totalling...

Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...
Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...Totalling...

Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Totalling...
Out[41]:
55

In this example, the important thing that has been enabled is that the fibonacci code is allowed to use an if statement to decide which apps it is going to launch (total in two cases, and total along with 2 x fibonacci in the third case).

5. Raw Future objects

Now I'm going to dig deeper into the mechanics of Futures.

The Future objects coming from invoking a parsl app have their values eventually set by the guts of parsl, when an app completes... but they're actually instances of the standard python Futures class. https://docs.python.org/3/library/concurrent.futures.html#future-objects

(Futures also appear in the executor API and you can see that in Yadu's ParslFest talk).

Parsl apps can use any future as a dependency, not just futures from other invoking parsl apps.

Here's a simple example of non-parsl Futures: I'll create a Future object that's not attached to any executing code, and then I'll use it as a dependency for a simple app, increment.

In [42]:
from concurrent.futures import Future
In [43]:
f = Future()
f
Out[43]:
<Future at 0x7f0c0c470c88 state=pending>

Now f contains a Future that will just sit there passively. Unless I do something to it, it will never get a value.

I can use that as a dependency for the total app:

In [44]:
f2 = total(1, f)
f2
Out[44]:
<AppFuture super=<AppFuture at 0x7f0c0c485128 state=pending>>

Because nothing is running to make f complete, that means the total app will never run either...

In [45]:
print(f.done())
print(f2.done())
False
False

... until something comes along and sets the result of the Future ...

In [46]:
f.set_result(10)
Totalling...
In [47]:
f2.result()
Out[47]:
11

... at which point total ran (because its input dependency had a value), and so the result of total is now also available in f2.

Parsl doesn't care how f got its value... just that it eventually did get one.

Hacking more interesting dependency behaviour in futures

This is a way that I've prototyped different dependency behaviour without having to build it into the parsl codebase. By making Futures that behave as I want to, I can prototype the behaviour in a few lines of rough code without having to work on the core parsl code.

As a trade-off, almost all of the other benefits that parsl brings (remote execution, retries, monitoring information, checkpointing) are lost for this particular step - so it's definitely a prototyping exercise rather than solid parsl functionality.

In this section, I'll try to solve a particular problem:

Sometimes an app raises an Exception instead of completing sucessfully. With normal Parsl behaviour, that causes all dependent apps to fail too. But what I'd like in this case is that the dependent app still runs, and gets to see that some of its upstream apps might have failed.

In this example, I would like an app that takes an average of the numbers returned by its dependencies - but if any dependencies fail, then we ignore that dependency rather than abort the whole computation.

Here's how it doesn't work right now:

In [52]:
@parsl.python_app
def rand():
    import random
    return random.random()

@parsl.python_app
def fail():
    raise RuntimeError("Parsl demo failure")
    
@parsl.python_app
def avg(*args):
    s = 0
    c = 0
    for x in args:
        c += 1
        s += x
    return (s/c, c)

avg(50,60).result()
Out[52]:
(55.0, 2)

Here's a small workflow with rand and avg put together:

In [55]:
avg(rand(),rand(), rand()).result()
Out[55]:
(0.6155617111331173, 3)

... but this fails (because there is a failing dependency):

In [56]:
avg(rand(), fail(), rand()).result()
---------------------------------------------------------------------------
DependencyError                           Traceback (most recent call last)
<ipython-input-56-48dd847f104a> in <module>
----> 1 avg(rand(), fail(), rand()).result()

/usr/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    423                 raise CancelledError()
    424             elif self._state == FINISHED:
--> 425                 return self.__get_result()
    426 
    427             self._condition.wait(timeout)

/usr/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/parsl/src/parsl/parsl/dataflow/dflow.py in handle_exec_update(self, task_id, future)
    270 
    271         try:
--> 272             res = self._unwrap_remote_exception_wrapper(future)
    273 
    274         except Exception as e:

~/parsl/src/parsl/parsl/dataflow/dflow.py in _unwrap_remote_exception_wrapper(future)
    417     @staticmethod
    418     def _unwrap_remote_exception_wrapper(future: Future) -> Any:
--> 419         result = future.result()
    420         if isinstance(result, RemoteExceptionWrapper):
    421             result.reraise()

/usr/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    423                 raise CancelledError()
    424             elif self._state == FINISHED:
--> 425                 return self.__get_result()
    426 
    427             self._condition.wait(timeout)

/usr/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

DependencyError: Dependency failure for task 471 with failed dependencies from tasks [469]

What i want is that dependency failure to not happen, and for avg to see the failure and be able to change its behaviour.

I'm going to make an adapter that passes through normal results, but that turns a Future with an exception into one that returns a regular value. Then I'm going to use that as an adapter.

In [57]:
class ExceptionHidingFuture(Future):
    def __init__(self, parent: Future):
        super().__init__()
        parent.add_done_callback(self.cb)
        
    def cb(self, parent):
        if parent.exception():
            self.set_result("LOL")
        else:
            self.set_result(parent.result())

First, what happens without this adapter:

In [58]:
f = Future()
f.set_exception(RuntimeError("DEMO"))
In [59]:
f.result()
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-59-007d59320aa9> in <module>
----> 1 f.result()

/usr/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    423                 raise CancelledError()
    424             elif self._state == FINISHED:
--> 425                 return self.__get_result()
    426 
    427             self._condition.wait(timeout)

/usr/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

RuntimeError: DEMO

And now the same again, but with the new adapter in the dependency chain (once with an exception showing the new exception handling code, and once with a normal result showing passthrough behaviour):

In [64]:
f = Future()
f2 = ExceptionHidingFuture(f)
f.set_exception(RuntimeError("DEMO"))
In [65]:
f2.result()
Out[65]:
'LOL'
In [66]:
f = Future()
f2 = ExceptionHidingFuture(f)
f.set_result(3)
In [67]:
f2.result()
Out[67]:
3

So now I can go back to my earlier attempt to average some random numbers and failures, wrapping the ExceptionHidingFuture around all of my dependencies:

First add some functionality into avg to make it handle failed dependencies:

In [68]:
@parsl.python_app
def avg(*args):
    s = 0
    c = 0
    for x in args:
        if not isinstance(x, str):
            c += 1
            s += x
    return (s/c, c)
In [70]:
avg(ExceptionHidingFuture(rand()),
    ExceptionHidingFuture(fail()),
    ExceptionHidingFuture(rand())
   ).result()
Out[70]:
(0.6095140854247905, 2)

So with a 9-line definition, this prototypes some differently dependency handling logic...

To become real parsl functionality that would probably need to be wired in more deeply into the parsl dataflow kernel... remember that this prototype loses a bunch of parsl's useful non-dependency functionality.

The End!

Summary:

  1. Basic dependencies
  2. .result is your enemy
  3. More complicated dependency structures
  4. @join_apps - changing behaviour based on dependencies
  5. Messing around in futures

Extra material

X1. What else is in a parsl future?

A Parsl app's Future also holds a reference to the task record for the app invocation, which can be interesting to poke at sometimes. For the purposes of this talk, the most interesting field is depends which shows that this task had two dependencies which both returned an int - those are the first invocations.

In [11]:
fut.task_def
Out[11]:
{'depends': [<AppFuture super=<AppFuture at 0x7f04018e07f0 state=finished returned int>>,
  <AppFuture super=<AppFuture at 0x7f04190d0128 state=finished returned int>>],
 'executor': 'threads',
 'func_name': 'sum',
 'fn_hash': 'sum',
 'memoize': False,
 'hashsum': None,
 'exec_fu': <Future at 0x7f04018f4780 state=finished returned int>,
 'fail_count': 0,
 'fail_history': [],
 'from_memo': False,
 'ignore_for_cache': [],
 'join': False,
 'status': <States.exec_done: 3>,
 'try_id': 0,
 'id': 8,
 'time_invoked': datetime.datetime(2020, 10, 5, 18, 37, 27, 258395),
 'time_returned': datetime.datetime(2020, 10, 5, 18, 37, 32, 268765),
 'try_time_launched': datetime.datetime(2020, 10, 5, 18, 37, 32, 264279),
 'try_time_returned': datetime.datetime(2020, 10, 5, 18, 37, 32, 268526),
 'resource_specification': {},
 'args': [10, 10],
 'func': <function __main__.sum(*args)>,
 'kwargs': {},
 'app_fu': <AppFuture super=<AppFuture at 0x7f041912d978 state=finished returned int>>,
 'task_launch_lock': <unlocked _thread.lock object at 0x7f04018edbc0>}

X2. Dependencies put in place by file staging

Sometimes parsl will add in more dependencies automatically on app invocation, due to file staging.

Here's an app that counts lines in a file:

In [33]:
@parsl.python_app
def count_lines(file):
    return len(open(file).readlines())

If the app is invoked on a local file, there are no dependencies...

In [34]:
fut = count_lines(parsl.File("/etc/passwd"))
In [35]:
fut.result()
Out[35]:
21
In [36]:
fut.task_def['depends']
Out[36]:
[]

... but if it is invoked on an HTTP URL, then a staging task is implicitly launched and added in as a dependency:

In [37]:
fut2 = count_lines(parsl.File("http://www.cqx.ltd.uk/index.html"))
In [38]:
fut2.result()
Out[38]:
18
In [39]:
fut2.task_def['depends']
Out[39]:
[<DataFuture at 0x7f03ec6d2080 state=finished with file <<class 'parsl.data_provider.files.File'> at 0x7f03ec6c3b38 url=http://www.cqx.ltd.uk/index.html scheme=http netloc=www.cqx.ltd.uk path=/index.html filename=index.html local_path=index.html>>]

In general, data staging providers can insert arbitrary sub-workflows before and after app execution.