IPython parallel uses a canning
module to prepare objects for serialization.
from ipyparallel.serialize import canning
import numpy as np
Canning is a no-op for simple or unrecognized types:
canning.can(5)
5
class C():
pass
c = C()
canning.can(c)
<__main__.C at 0x10bb53898>
Canning provides special CannedObject wrappers for custom serialization of particular types:
A = np.ones(2048)
cannedA = canning.can(A)
cannedA
<ipyparallel.serialize.canning.CannedArray at 0x10bb41a58>
This accomplishes two goals:
The main optimization feature of a CannedObject is that it has a .buffers
attribute, which is a list of buffer-API-providing objects (bytes
, memoryviews
):
cannedA.buffers
[<memory at 0x10bba6f48>]
A serialized object is always a list of buffers. This allows objects to have an arbitrary number of discontiguous buffers as part of its serialization, which can be used without concatenating them during serialization*.
Serialization involves:
*note: we don't do this for discontiguous numpy arrays, but we could.
from ipyparallel.serialize import serialize_object
bufs = serialize_object(A)
bufs
[b'\x80\x03cipyparallel.serialize.canning\nCannedArray\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00dtypeq\x03X\x03\x00\x00\x00<f8q\x04X\x07\x00\x00\x00buffersq\x05]q\x06NaX\x07\x00\x00\x00pickledq\x07\x89X\x05\x00\x00\x00shapeq\x08M\x00\x08\x85q\tub.', <memory at 0x10bba6a08>]
bufs[1].nbytes
16384
See how the second buffer is the same as cannedA.buffers[0], and is in fact just a view on the original array's memory:
memoryview(A)
<memory at 0x10bb5c1c8>
Serializing an object that doesn't get canned is just pickling, but always to a list of bytes of length 1:
serialize_object(5)
[b'\x80\x03K\x05.']
The pickled object has had its buffers removed:
import pickle
cannedA2 = pickle.loads(bufs[0])
cannedA2
<ipyparallel.serialize.canning.CannedArray at 0x10bb41b70>
cannedA2.buffers
[None]
Deserialization involves restoring the buffers, and then calling the CannedObject's get_object
method:
cannedA2.buffers = bufs[1:]
cannedA2.get_object()
array([ 1., 1., 1., ..., 1., 1., 1.])
Since we can have multiple objects in one message, deserialize returns the unpacked object and the remaining buffers:
from ipyparallel.serialize import deserialize_object
obj, remaining = deserialize_object(bufs)
obj
array([ 1., 1., 1., ..., 1., 1., 1.])
A message can contain multiple objects, which we can chain since every serialized object is a list of blobs:
from itertools import chain
objects = [5, 'hi', A]
message = list(chain(*(serialize_object(obj) for obj in objects)))
message
[b'\x80\x03K\x05.', b'\x80\x03X\x02\x00\x00\x00hiq\x00.', b'\x80\x03cipyparallel.serialize.canning\nCannedArray\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00dtypeq\x03X\x03\x00\x00\x00<f8q\x04X\x07\x00\x00\x00buffersq\x05]q\x06NaX\x07\x00\x00\x00pickledq\x07\x89X\x05\x00\x00\x00shapeq\x08M\x00\x08\x85q\tub.', <memory at 0x10bb5c588>]
remaining = message
received = []
while remaining:
obj, remaining = deserialize_object(remaining)
received.append(obj)
received
[5, 'hi', array([ 1., 1., 1., ..., 1., 1., 1.])]