There is some computation we want to perform.
The inputs and outputs are multidimensional arrays 🔢 (a.k.a tensors)
Input and/or output tensors are too big to fit comfortably into main memory
At least some parts of the computation can be parallelised by processing the data in chunks
Sometimes, embarassingly parallel 😳
Sometimes, if your computation complexity is moderate, a significant amount of your time may be spent in reading and/or writing data.
N/B: this bottleneck may be due to limited I/O bandwith or a certain part of your I/O stack not being parallel.
Main usecase here is with scientific data anylsis, where shortening the loop between exploratory science -> interactive analysis -> many rounds of summarising, visualising, hypothesising, modelling, testing and repeating the loop is crucial
Align the chunks!
import dask.array as da
a = ... # what goes here?
x = da.from_array(a)
y = (x - x.mean(axis=1)) / x.std(axis=1)
u, s, v = da.linalg.svd_compressed(y, 20)
u = u.compute()
h5py: can store tensors, chunk them, compress chunks, smooth numpy integration and supports hierachies BUT no thread based parallelism, no parallel compressed writes, not easy to plug in new compressors, no support for cloud object stores
bcolz: simple implementation (in a good way), simple storage format, chunked storage but primarily for string data BUT chunking in one dimension only, no support for cloud object stores
3 years, 1,191 commits, 302 issues, 208 PRs, 40 releases, 9 repos, and at least 2 babies later... 😅
$ pip install zarr
$ conda install -c conda-forge zarr
import zarr
zarr.__version__
Zarr is designed to be used in parallel workflows with parallelism front-and-centre. It's designed to be used as the source (or input) of some parallel compute and also as the sink (or output) of some parallel compute, with both multi-thread and multi-process parallelism are supported.
# create directory store (saved on local filesystem, basically) and root of store
... # create directory store with name `example.zarr`
... # create group using the initialised store
store = zarr.DirectoryStore('example.zarr')
root = zarr.group(store)
root
# create a 2-dimensional 10kx10k array of 32-bit integers (a leaf node in our hierachy) with 1kx1k chunks
...
...
hello = root.zeros('hello',
shape=(10000, 10000),
chunks=(1000, 1000),
dtype='<i4')
Alternatively, supports h5py style interface with root.create_dataset
.
# Create a 2d 100mx100m array, chunked into 10kx10k of 32 bit integers
...
...
big = root.zeros('big',
shape=(100_000_000, 100_000_000),
chunks=(10_000, 10_000),
dtype='i4')
# Print info about the big dataset/array
big.info
# [0, 0:20000]
# [0:20000, 0]
big[0, 0:20000] = np.arrange(20000)
big[0:20000, 0] = np.arrange(20000)
Show big.info
and show which chunks are initialised, size is bigger, etc...
big[0:1000, 0:1000] # read first 1kx1k chunk of the array
# Chunks are initialised on write
big.info
# Fill value (for this dtype: 0) is used when reading unwritten regions:
big[-1000:, -1000:]
# Reading the whole array (💔 my RAM)
big[:]
!ls -a example.zarr
!cat example.zarr/big/.zarray
Using the directory store means that Zarr will use a directory on your local filesystem, but there are other stores available, including a zipped store (zipfile), SQL store, Azure Blob store, S3FS, GCFS store, MongoDB store, Redis store, and the list goes on.
It's also very easy to build a store, they only need to implement a MutableMapping
interface (basically a dictionary). Think Kedro datasets!
More technically, it needs to be dict
-like, which means specifying __get_item__
, __set_item__
and __iter__
.
So, could use Google Cloud Storage for example:
import gcsfs
gcs = gcsfs.GCSFileSystem(token='anon')
store = gcsfs.GCSMap("data/example.zarr", gcs=gcs, check=False)
root = zarr.group(store)
Under some circumstances, locking is required to avoid contention and data loss during concurrent writes. When does this happen?
numcodecs
import zarr
from numcodecs import Blosc
store = zarr.DirectoryStore("example.zarr")
root = zarr.group(store)
compressor = Blosc(cname="zstd", clevel=1, shuffle=Blosc.BITSHUFFLE)
big2 = root.zeros("big2",
shape=(100_000_000, 100_000_000),
chunks=(10_000, 10_000),
dtype='i4',
compressor=compressor
)
decode
and encode
method, which expects something buffer
-like and outputs something buffer
-like.import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, visualize, ProgressBar
from bokeh.io import output_notebook
output_notebook()
store = zarr.DirectoryStore("parallel.zarr")
store.clear()
root = zarr.group(store=store)
shape = 500000, 1000
chunks = 100000, 500
data = da.random.normal(loc=1000, scale=100, size=shape, chunks=chunks).astype("int32")
z_input = root.zeros("input", shape=data.shape, dtype=data.dtype, chunks=chunks)
with ProgressBar():
data.store(z_input, lock=False)
z_input.info
z_input[:1000]
step1 = da.from_array(z_input, chunks=z_input.chunks)
step1.visualize()
step2 = step1 * 2
step2.visalize()
step3 = step2.sum(axis=1)
ste3.visualize()
z_output = root.zeros("output", shape=step3.shape, dtype=step3.dtype, chunks=chunks[0])
step4 = step3.store(z_output, lock=False, compute=False)
step4.visualize()
with Profiler() as prof, ResourceProfiler() as rprof:
with ProgressBar():
step4.compute()
visualize([prof, rprof])
z_output[:]
We've written a specification for the Zarr format, in the hopes that other people can create implementations (in other languages). So far: