#!/usr/bin/env python # coding: utf-8 # # Chaining maps with IPython parallel (or dask) # # For [this question on Stack Overflow](http://stackoverflow.com/questions/37998484) # In[1]: import ipyparallel as ipp client = ipp.Client() view = client.load_balanced_view() get_ipython().run_line_magic('px', 'import time') def task1(x): ## Do some work. time.sleep(0.1 * x) return x * 2 def task2(x): ## Do some work. time.sleep(0.1 * x) return x * 3 def task3(x): ## Do some work. time.sleep(0.1 * x) return x * 4 # ## Option 1: chain results with IPython parallel # In[2]: from concurrent.futures import Future from functools import partial def chain_apply(view, func, future): """Chain a call to view.apply(func, future.result()) when future is ready. Returns a Future for the subsequent result. """ f2 = Future() # when f1 is ready, submit a new task for func on its result def apply_func(f): if f.exception(): f2.set_exception(f.exception()) return print('submitting %s(%s)' % (func.__name__, f.result())) ar = view.apply_async(func, f.result()) # when ar is done, pass through the result to f2 ar.add_done_callback(lambda ar: f2.set_result(ar.get())) future.add_done_callback(apply_func) return f2 def chain_map(view, func, list_of_futures): """Chain a new callback on a list of futures.""" return [ chain_apply(view, func, f) for f in list_of_futures ] results1 = map(partial(view.apply, task1), [1, 2, 3]) results2 = chain_map(view, task2, results1) results3 = chain_map(view, task3, results2) print("Waiting for results") [ r.result() for r in results3 ] # ## Option 2: Use dask.distributed # # This sort of thing is right in [distributed](https://distributed.readthedocs.io)'s wheelhouse. # # IPython 5.1 allows turning an IPython cluster into a dask one with a single method: # In[3]: executor = client.become_distributed(ncores=1) executor # In[4]: results1 = executor.map(task1, [1, 2, 3]) results2 = executor.map(task2, results1) results3 = executor.map(task3, results2) executor.gather(results3)