import ipyparallel as ipp
client = ipp.Client()
view = client.load_balanced_view()
%px import time
def task1(x):
## Do some work.
time.sleep(0.1 * x)
return x * 2
def task2(x):
## Do some work.
time.sleep(0.1 * x)
return x * 3
def task3(x):
## Do some work.
time.sleep(0.1 * x)
return x * 4
from concurrent.futures import Future
from functools import partial
def chain_apply(view, func, future):
"""Chain a call to view.apply(func, future.result()) when future is ready.
Returns a Future for the subsequent result.
"""
f2 = Future()
# when f1 is ready, submit a new task for func on its result
def apply_func(f):
if f.exception():
f2.set_exception(f.exception())
return
print('submitting %s(%s)' % (func.__name__, f.result()))
ar = view.apply_async(func, f.result())
# when ar is done, pass through the result to f2
ar.add_done_callback(lambda ar: f2.set_result(ar.get()))
future.add_done_callback(apply_func)
return f2
def chain_map(view, func, list_of_futures):
"""Chain a new callback on a list of futures."""
return [ chain_apply(view, func, f) for f in list_of_futures ]
results1 = map(partial(view.apply, task1), [1, 2, 3])
results2 = chain_map(view, task2, results1)
results3 = chain_map(view, task3, results2)
print("Waiting for results")
[ r.result() for r in results3 ]
Waiting for results submitting task2(2) submitting task2(4) submitting task2(6) submitting task3(6) submitting task3(12) submitting task3(18)
[24, 48, 72]
This sort of thing is right in distributed's wheelhouse.
IPython 5.1 allows turning an IPython cluster into a dask one with a single method:
executor = client.become_distributed(ncores=1)
executor
<Executor: scheduler="172.16.3.46:60106" processes=8 cores=8>
results1 = executor.map(task1, [1, 2, 3])
results2 = executor.map(task2, results1)
results3 = executor.map(task3, results2)
executor.gather(results3)
[24, 48, 72]