from __future__ import print_function import heapq from IPython.display import display from IPython import parallel rc = parallel.Client() def load_data(arg): """Load a dataset in the global namespace. The dataset *must* be sorted. Return the *name* of the variable in which the dataset was loaded.""" global data # Here, real data loading would occur s = 4-arg step = arg+1 data = range(s, s+4*step**2, step) return 'data' def remote_iterator(view, name): """Return an iterator on an object living on a remote engine.""" # TODO: create an iterator remotely while True: pass # TODO: yield the next item # TODO: turn remote StopIteration into local StopIteration try: rc[-1].execute("foo = barbarbar", block=True) except NameError: print("caught NameError") except Exception as e: print("Oops! Didn't catch %r" % e) raise e print("safe and sound") def assign_foo(): try: rc[-1].execute("foo = barbarbar", block=True) except parallel.RemoteError as e: if e.ename == 'NameError': raise NameError(e.evalue) else: raise e try: assign_foo() except NameError: print("caught NameError") except Exception as e: print("Oops! Didn't catch %r" % e) raise e print("safe and sound") def remote_iterator(view, name): """Return an iterator on an object living on a remote engine.""" # TODO: create an iterator remotely while True: pass # TODO: yield the next item # TODO: turn remote StopIteration into local StopIteration %load soln/remote_iter_hint.py %load soln/remote_iter.py %load soln/remote_iter_slightly_better.py dview = rc.direct_view() print('Engine IDs:', rc.ids) # Load the data on the engines data_refs = dview.map(load_data, rc.ids) data_refs list(data_refs) # And we now make a local object which represents the remote iterator iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)] for it in iterators: print(list(it)) print('Locally merge the remote sets:') iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)] remote = list(heapq.merge(*iterators)) print(remote) # Key step here: pull data from each engine: local_data = [rc[e][ref] for e,ref in zip(rc.ids, data_refs)] print('Local data:') for subset in local_data: print(subset) print('Sorted:') local = list(heapq.merge(*local_data)) print(local) print("local == remote: %s" % (local==remote))