Why? By couching data from third-party files into the Bluesky Event Model, downstream tooling expect a common in-memory layout, regardless of variations between scientific domain or file format.
How? It is always possible to represent experimental data, fundamentally, as one or more time series because that is how the data is acquired in the first place.
Write a generator that yields one RunStart document and subsequent documents relating to that run. The signature of the generator is up to the implementor: it may take any arguments, position or keyword, required or optional.
Note that it will not be enough to have one of these for "format" (e.g. TIFF series) because examples of different origin will have different conventions for metadata in filesnames, etc. We will likely need separate ingestors corresponding each software that generates the files to be ingested.
We will also need one additional layer, above ingestors, to support a dialog box where users select file(s) and the appropriate ingestor or ingestors for those files are chosen.
import base64
import hashlib
import glob
import os
import event_model
import tifffile
def ingest_tiff_series(pattern):
"""
Wrap one TIFF series in the document model.
"""
file_list = sorted(glob.glob(pattern))
if not file_list:
raise ValueError(f"No matches for {pattern!r}")
# We would like a deterministic UID for this group of files.
# This is one possible approach, maybe not the best one.
m = hashlib.sha256()
[m.update(filename.encode()) for filename in file_list]
uid = base64.urlsafe_b64encode(m.digest()).decode()
mtime = os.path.getmtime(file_list[0])
# RunStart Document
run_bundle = event_model.compose_run(uid=uid, time=mtime)
yield 'start', run_bundle.start_doc
# Peek at the first image to get shape, dtype.
img = tifffile.imread(file_list[0])
# EventDescriptor Document
shape = img.shape
dtype = 'number' # must be one of the jsonschema types
data_keys = {'image': {'shape': shape, 'dtype': 'number', 'source': pattern}}
stream_bundle = run_bundle.compose_descriptor(data_keys=data_keys, name='primary')
yield 'descriptor', stream_bundle.descriptor_doc
# Events or EventPages
for filename in file_list:
mtime = os.path.getmtime(filename)
img = tifffile.imread(filename)
if img.shape != shape:
raise ValueError(f"Expected series of images of shape {shape} "
f"but {filename} has shape {img.shape}")
yield 'event', stream_bundle.compose_event(data={'image': img},
timestamps={'image': mtime},
time=mtime)
# RunStop Document
yield 'stop', run_bundle.compose_stop(time=mtime)
BlueskyInMemoryCatalog
¶Here we instantiate an empty Catalog and then add one BlueskyRun
to it. The signature of upsert
is a generator function, a tuple of position arguments to be passed to that function, and a dictionary of keyword arguments to be passed. As stated above, the generated is expected to yield one RunStart document and subsequent documents related to that one run. It may yield only a partial run if the experiment is still in progress or was ungracefully interrupted.
If the uid
of the run is the same as one previously passed to upsert
, it will replaced the previous one, as the name "upsert" (adopted from the database jargon for "update/insert") suggests.
from intake_bluesky.in_memory import BlueskyInMemoryCatalog
catalog = BlueskyInMemoryCatalog()
catalog.upsert(ingest_tiff_series, ('files/*.tiff',), {})
list(catalog)
['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=']
run = catalog['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=']
run.read_canonical() # a stream of documents like what the bluesky RunEngine would emit
<generator object BlueskyRun.read_canonical at 0x7f2ee3bc09e8>
for name, doc in run.read_canonical():
print(name)
start descriptor event event event event event event event event event event stop
The data from a particular stream can also be accessed as an xarray.Dataset
, which is nice for interactive work.
run.primary.read()
<xarray.Dataset> Dimensions: (dim_0: 512, dim_1: 512, time: 10) Coordinates: * time (time) float64 1.559e+09 1.559e+09 ... 1.559e+09 1.559e+09 Dimensions without coordinates: dim_0, dim_1 Data variables: image (time, dim_0, dim_1) int64 217 155 77 12 108 ... 211 34 108 2 182 seq_num (time) int64 1 2 3 4 5 6 7 8 9 10 uid (time) <U36 '42439914-0d37-473c-a0ce-0487cb05700e' ... '7a154272-065e-43bd-8d86-9b9edbc9459e'
run.primary.read()['image'].sum()
<xarray.DataArray 'image' ()> array(334187042, dtype=int64)
import base64
import hashlib
import glob
import os
import event_model
import tifffile
def ingest_tiff_stack(filename):
"""
Wrap one TIFF series in the document model.
"""
# We would like a deterministic UID for this group of files.
# This is one possible approach, maybe not the best one.
m = hashlib.sha256()
m.update(filename.encode())
uid = base64.urlsafe_b64encode(m.digest()).decode()
mtime = os.path.getmtime(filename)
# RunStart Document
run_bundle = event_model.compose_run(uid=uid, time=mtime)
yield 'start', run_bundle.start_doc
img_stack = tifffile.imread(filename)
# EventDescriptor Document
shape = img_stack.shape[1:]
dtype = 'number' # must be one of the jsonschema types
data_keys = {'image': {'shape': shape, 'dtype': 'number', 'source': filename}}
stream_bundle = run_bundle.compose_descriptor(data_keys=data_keys, name='primary')
yield 'descriptor', stream_bundle.descriptor_doc
# Events or EventPages
len_ = len(img_stack)
yield 'event_page', stream_bundle.compose_event_page(data={'image': img_stack},
timestamps={'image': [mtime] * len_},
time=[mtime] * len_,
seq_num=list(range(1, len_ + 1)),
validate=False) # Work around bug in event-model validator.
# RunStop Document
yield 'stop', run_bundle.compose_stop(time=mtime)
catalog.upsert(ingest_tiff_stack, ('files/stack/stack.tiff',), {})
list(catalog)
['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=', 'PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=']
run = catalog['PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=']
for name, doc in run.read_canonical():
print(name)
start descriptor event event event event event event event event event event stop
run.primary.read()['image'].sum()
<xarray.DataArray 'image' ()> array(334198927, dtype=int64)
The library mongoquery
is used to provide a large subset of the MongoDB query API, even though no MongoDB is present.
list(catalog.search({'time': {'$lt': 1558887100}})) # narrows results to the TIFF Series example
[]
The BlueskyInMemoryCatalog
"ingests" data into memory. A separate process would have to repeat the conversion from TIFF to documents. If the same data will be accessedd repeatedly, it may be convenient to make a separate copy of the data in a richer format, such as msgpack, that can encode the documents more literally. This will roughly double the storage required but likely significantly expedite the data-loading process.
from suitcase.msgpack import export
for uid in catalog:
export(catalog[uid].read_canonical(), 'native_copies')
It is not simple and fast to pull up a catalog of the re-serialized data.
from intake_bluesky.msgpack import BlueskyMsgpackCatalog
native_catalog = BlueskyMsgpackCatalog('native_copies/*.msgpack')
list(native_catalog)
['PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=', 'ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=']
There are a couple layers of laziness available in this system.
read_canonical()
or read()
is called, the ingestor generator is only pulled until it yeilds the first document (RunStart) to support search. Depending on the ingestor, this may mean that file need not be fully read or even opened until/unless the corresponding catalog entry is read.import dask.array.image
def ingest_tiff_series_dask(pattern):
"""
Wrap one TIFF series in the document model.
"""
file_list = sorted(glob.glob(pattern))
if not file_list:
raise ValueError(f"No matches for {pattern!r}")
# We would like a deterministic UID for this group of files.
# This is one possible approach, maybe not the best one.
m = hashlib.sha256()
[m.update(filename.encode()) for filename in file_list]
uid = base64.urlsafe_b64encode(m.digest()).decode()
mtime = os.path.getmtime(file_list[0])
# RunStart Document
run_bundle = event_model.compose_run(uid=uid, time=mtime)
yield 'start', run_bundle.start_doc
# Peek at the first image to get shape, dtype.
img = tifffile.imread(file_list[0])
# EventDescriptor Document
shape = img.shape
dtype = 'number' # must be one of the jsonschema types
data_keys = {'image': {'shape': shape, 'dtype': 'number', 'source': pattern}}
stream_bundle = run_bundle.compose_descriptor(data_keys=data_keys, name='primary')
yield 'descriptor', stream_bundle.descriptor_doc
# Events or EventPages
for filename in file_list:
mtime = os.path.getmtime(filename)
img = dask.array.image.imread(filename)
img = img.reshape(img.shape[1:]) # dask.imread gives us an extra dimension of len 1
if img.shape != shape:
raise ValueError(f"Expected series of images of shape {shape} "
f"but {filename} has shape {img.shape}")
yield 'event', stream_bundle.compose_event(data={'image': img},
timestamps={'image': mtime},
time=mtime)
# RunStop Document
yield 'stop', run_bundle.compose_stop(time=mtime)
catalog.upsert(ingest_tiff_series_dask, ('files/*.tiff',), {})
list(catalog) # The 'upsert' method has replaced the non-lazy version of this run that had the same uid.
['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s=', 'PylgaOkBfHebVoDhBEusZvBcnGF7WNiFYecrfQvjZdg=']
for name, doc in catalog['ZWIjPOmIq1Ncr8XRNqpNIKIhi0e8j5KbVSJ287N_Z1s='].read_canonical():
if name == 'event':
break
doc
{'uid': '9ded224d-6dbf-40f3-ae4f-df736a2b23e3', 'time': 1559144355.3176703, 'data': {'image': dask.array<reshape, shape=(512, 512), dtype=int64, chunksize=(512, 512)>}, 'timestamps': {'image': 1559144355.3176703}, 'seq_num': 1, 'filled': {}, 'descriptor': '63d737c2-0301-4b65-be44-f9c098a65ce6'}
doc['data']['image'].compute()
array([[217, 155, 77, ..., 184, 130, 185], [ 17, 173, 229, ..., 168, 124, 144], [ 90, 210, 231, ..., 28, 16, 98], ..., [237, 235, 88, ..., 237, 222, 207], [184, 9, 252, ..., 232, 210, 91], [116, 240, 234, ..., 193, 187, 157]], dtype=int64)