IPython.parallel doesn't do much in the way of serialization. It has custom zero-copy handling of numpy arrays, but other than that, it doesn't do anything other than the bare minimum to make basic interactively defined functions and classes sendable.
There are a few projects that extend pickle to make just about anything sendable, and one of these is dill.
To install dill:
pip install --pre dill
First, as always, we create a task function, this time with a closure
def make_closure(a):
"""make a function with a closure, and return it"""
def has_closure(b):
return a * b
return has_closure
closed = make_closure(5)
closed(2)
10
import pickle
Without help, pickle can't deal with closures
pickle.dumps(closed)
--------------------------------------------------------------------------- PicklingError Traceback (most recent call last) <ipython-input-5-0f1f376cfea0> in <module>() ----> 1 pickle.dumps(closed) /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in dumps(obj, protocol) 1372 def dumps(obj, protocol=None): 1373 file = StringIO() -> 1374 Pickler(file, protocol).dump(obj) 1375 return file.getvalue() 1376 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in dump(self, obj) 222 if self.proto >= 2: 223 self.write(PROTO + chr(self.proto)) --> 224 self.save(obj) 225 self.write(STOP) 226 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.pyc in save_global(self, obj, name, pack) 746 raise PicklingError( 747 "Can't pickle %r: it's not found as %s.%s" % --> 748 (obj, module, name)) 749 else: 750 if klass is not obj: PicklingError: Can't pickle <function has_closure at 0x10412c6e0>: it's not found as __main__.has_closure
But after we import dill, magic happens
import dill
pickle.dumps(closed)[:64] + '...'
"cdill.dill\n_load_type\np0\n(S'FunctionType'\np1\ntp2\nRp3\n(cdill.dill..."
So from now on, pretty much everything is pickleable.
As usual, we start by creating our Client and View
from IPython import parallel
rc = parallel.Client()
view = rc.load_balanced_view()
Now let's try sending our function with a closure:
view.apply_sync(closed, 3)
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-9-23a646829fdc> in <module>() ----> 1 view.apply_sync(closed, 3) /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in apply_sync(self, f, *args, **kwargs) /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in spin_after(f, self, *args, **kwargs) 73 def spin_after(f, self, *args, **kwargs): 74 """call spin after the method.""" ---> 75 ret = f(self, *args, **kwargs) 76 self.spin() 77 return ret /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in apply_sync(self, f, *args, **kwargs) 245 returning the result. 246 """ --> 247 return self._really_apply(f, args, kwargs, block=True) 248 249 #---------------------------------------------------------------- /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries) /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs) 64 self._in_sync_results = True 65 try: ---> 66 ret = f(self, *args, **kwargs) 67 finally: 68 self._in_sync_results = False /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries) /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs) 49 n_previous = len(self.client.history) 50 try: ---> 51 ret = f(self, *args, **kwargs) 52 finally: 53 nmsgs = len(self.client.history) - n_previous /Users/minrk/dev/ip/mine/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, block, track, after, follow, timeout, targets, retries) 1049 1050 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, -> 1051 metadata=metadata) 1052 tracker = None if track is False else msg['tracker'] 1053 /Users/minrk/dev/ip/mine/IPython/parallel/client/client.pyc in send_apply_request(self, socket, f, args, kwargs, metadata, track, ident) 1252 bufs = serialize.pack_apply_message(f, args, kwargs, 1253 buffer_threshold=self.session.buffer_threshold, -> 1254 item_threshold=self.session.item_threshold, 1255 ) 1256 /Users/minrk/dev/ip/mine/IPython/kernel/zmq/serialize.pyc in pack_apply_message(f, args, kwargs, buffer_threshold, item_threshold) 163 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys) 164 --> 165 msg = [pickle.dumps(can(f),-1)] 166 msg.append(pickle.dumps(info, -1)) 167 msg.extend(arg_bufs) /Users/minrk/dev/ip/mine/IPython/utils/codeutil.pyc in reduce_code(co) 36 def reduce_code(co): 37 if co.co_freevars or co.co_cellvars: ---> 38 raise ValueError("Sorry, cannot pickle code objects with closures") 39 args = [co.co_argcount, co.co_nlocals, co.co_stacksize, 40 co.co_flags, co.co_code, co.co_consts, co.co_names, ValueError: Sorry, cannot pickle code objects with closures
Oops, no dice. For IPython to work with dill,
there are one or two more steps. IPython will do these for you if you call DirectView.use_dill
:
rc[:].use_dill()
<AsyncResult: use_dill>
This is equivalent to
from IPython.utils.pickleutil import use_dill
use_dill()
rc[:].apply(use_dill)
Now let's try again
view.apply_sync(closed, 3)
15
Yay! Now we can use dill to allow IPython.parallel to send anything.
And that's it! We can send closures and other previously non-pickleables to our engines.
Let's give it a try now:
remote_closure = view.apply_sync(make_closure, 4)
remote_closure(5)
20
But wait, there's more!
At this point, we can send/recv all kinds of stuff
def outer(a):
def inner(b):
def inner_again(c):
return c * b * a
return inner_again
return inner
So outer returns a function with a closure, which returns a function with a closure.
Now, we can resolve the first closure on the engine, the second here, and the third on a different engine, after passing through a lambda we define here and call there, just for good measure.
view.apply_sync(lambda f: f(3),view.apply_sync(outer, 1)(2))
6
And for good measure, let's test that normal execution still works:
%px foo = 5
print(rc[:]['foo'])
rc[:]['bar'] = lambda : 2 * foo
rc[:].apply_sync(parallel.Reference('bar'))
[5, 5, 5, 5, 5, 5, 5, 5]
[10, 10, 10, 10, 10, 10, 10, 10]
And test that the @interactive
decorator works
%%file testdill.py
from IPython.parallel import interactive
@interactive
class C(object):
a = 5
@interactive
class D(C):
b = 10
@interactive
def foo(a):
return a * b
Overwriting testdill.py
import testdill
v = rc[-1]
v['D'] = testdill.D
d = v.apply_sync(lambda : D())
print d.a, d.b
5 10
v['b'] = 10
v.apply_sync(testdill.foo, 5)
50