How to merge multiple sorted remote data streams using the heapq.merge
function that ships with Python.
from __future__ import print_function
import heapq
from IPython.display import display
from IPython import parallel
rc = parallel.Client()
Imagine we have some routine that is capable of loading/creating a sorted subset of our data in an engine, based on a parameter (such as the indes of which part of the data to read):
def load_data(arg):
"""Load a dataset in the global namespace. The dataset *must* be sorted.
Return the *name* of the variable in which the dataset was loaded."""
global data
# Here, real data loading would occur
s = 4-arg
step = arg+1
data = range(s, s+4*step**2, step)
return 'data'
We want a function that takes a given single-engine View and a variable name, and returns a local iterator on the remote object. It should look something like this skeleton function:
def remote_iterator(view, name):
"""Return an iterator on an object living on a remote engine."""
# TODO: create an iterator remotely
while True:
pass
# TODO: yield the next item
# TODO: turn remote StopIteration into local StopIteration
Errors raised on engines will show up in the Client as a RemoteError. This means you have to be a little careful when trying to catch remote errors:
try:
rc[-1].execute("foo = barbarbar", block=True)
except NameError:
print("caught NameError")
except Exception as e:
print("Oops! Didn't catch %r" % e)
raise e
print("safe and sound")
A RemoteError has three attributes:
err.ename
- the class name of the remote error (e.g. NameError
, ValueError
)err.evalue
- the string value of the error messageerr.traceback
- the remote traceback as a list of stringsFor simple builtin exceptions, you can re-raise remote errors as the original exception class with a case like the following:
def assign_foo():
try:
rc[-1].execute("foo = barbarbar", block=True)
except parallel.RemoteError as e:
if e.ename == 'NameError':
raise NameError(e.evalue)
else:
raise e
By doing this re-cast, any exception handling outside will handle remote exceptions as if they were local.
try:
assign_foo()
except NameError:
print("caught NameError")
except Exception as e:
print("Oops! Didn't catch %r" % e)
raise e
print("safe and sound")
Can you fill out this remote_iterator function?
Potentially useful:
def remote_iterator(view, name):
"""Return an iterator on an object living on a remote engine."""
# TODO: create an iterator remotely
while True:
pass
# TODO: yield the next item
# TODO: turn remote StopIteration into local StopIteration
A local example that should be a good guideline for the remote version:
%load soln/remote_iter_hint.py
And the solution:
%load soln/remote_iter.py
And an ever-so-slightly fancier solution:
%load soln/remote_iter_slightly_better.py
Now, we bring IPython.parallel
into action:
dview = rc.direct_view()
print('Engine IDs:', rc.ids)
# Load the data on the engines
data_refs = dview.map(load_data, rc.ids)
data_refs
list(data_refs)
# And we now make a local object which represents the remote iterator
iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)]
for it in iterators:
print(list(it))
Now, let's merge those datasets into a single sorted one:
print('Locally merge the remote sets:')
iterators = [remote_iterator(rc[e], ref) for e,ref in zip(rc.ids, data_refs)]
remote = list(heapq.merge(*iterators))
print(remote)
repeat the operation by copying the data from the engines to our local namespace and doing a regular merge here:
# Key step here: pull data from each engine:
local_data = [rc[e][ref] for e,ref in zip(rc.ids, data_refs)]
print('Local data:')
for subset in local_data:
print(subset)
print('Sorted:')
local = list(heapq.merge(*local_data))
print(local)
print("local == remote: %s" % (local==remote))