This notebook combines Numba, a high performance Python compiler, with Dask Arrays.
In particular we show off two Numba features, and how they compose with Dask:
This was originally published as a blogpost here
Many array computing functions operate only on a local region of the array. This is common in image processing, signals processing, simulation, the solution of differential equations, anomaly detection, time series analysis, and more. Typically we write code that looks like the following:
def _smooth(x):
out = np.empty_like(x)
for i in range(1, x.shape[0] - 1):
for j in range(1, x.shape[1] - 1):
out[i, j] = (x[i + -1, j + -1] + x[i + -1, j + 0] + x[i + -1, j + 1] +
x[i + 0, j + -1] + x[i + 0, j + 0] + x[i + 0, j + 1] +
x[i + 1, j + -1] + x[i + 1, j + 0] + x[i + 1, j + 1]) // 9
return out
Or something similar. The numba.stencil
decorator makes this a bit easier to write down. You just write down what happens on every element, and Numba handles the rest.
import numba
@numba.stencil
def _smooth(x):
return (x[-1, -1] + x[-1, 0] + x[-1, 1] +
x[ 0, -1] + x[ 0, 0] + x[ 0, 1] +
x[ 1, -1] + x[ 1, 0] + x[ 1, 1]) // 9
When we run this function on a NumPy array, we find that it is slow, operating at Python speeds.
import numpy as np
x = np.ones((100, 100))
%timeit _smooth(x)
But if we JIT compile this function with Numba, then it runs more quickly.
@numba.njit
def smooth(x):
return _smooth(x)
%timeit smooth(x)
For those counting, that’s over 1000x faster!
Note: this function already exists as scipy.ndimage.uniform_filter
, which operates at the same speed.
In these applications people often have many such arrays and they want to apply this function over all of them. In principle they could do this with a for loop.
from glob import glob
import skimage.io
for fn in glob('/path/to/*.png'):
img = skimage.io.imread(fn)
out = smooth(img)
skimage.io.imsave(fn.replace('.png', '.out.png'), out)
If they wanted to then do this in parallel they would maybe use the multiprocessing or concurrent.futures modules. If they wanted to do this across a cluster then they could rewrite their code with PySpark or some other system.
Or, they could use Dask array, which will handle both the pipelining and the parallelism (single machine or on a cluster) all while still looking mostly like a NumPy array.
import dask_image
x = dask_image.imread('/path/to/*.png') # a large lazy array of all of our images
y = x.map_blocks(smooth, dtype='int8')
And then because each of the chunks of a Dask array are just NumPy arrays, we can use the map_blocks function to apply this function across all of our images, and then save them out.
This is fine, but lets go a bit further, and discuss generalized universal functions from NumPy.
Because we don't have a stack of images nearby, we're going to make a random array with similar structure.
import dask.array as da
x = da.random.randint(0, 127, size=(10000, 1000, 1000), chunks=('64 MB', None, None), dtype='int8')
x
Numba Docs: https://numba.pydata.org/numba-doc/dev/user/vectorize.html
NumPy Docs: https://docs.scipy.org/doc/numpy-1.16.0/reference/c-api. generalized-ufuncs.html
A generalized universal function (gufunc) is a normal function that has been
annotated with typing and dimension information. For example we can redefine
our smooth
function as a gufunc as follows:
@numba.guvectorize(
[(numba.int8[:, :], numba.int8[:, :])],
'(n, m) -> (n, m)'
)
def smooth(x, out):
out[:] = _smooth(x)
This function knows that it consumes a 2d array of int8's and produces a 2d array of int8's of the same dimensions.
This sort of annotation is a small change, but it gives other systems like Dask
enough information to use it intelligently. Rather than call functions like
map_blocks
, we can just use the function directly, as if our Dask Array was
just a very large NumPy array.
# Before gufuncs
y = x.map_blocks(smooth, dtype='int8')
# After gufuncs
y = smooth(x)
y
This is nice. If you write library code with gufunc semantics then that code just works with systems like Dask, without you having to build in explicit support for parallel computing. This makes the lives of users much easier.
Starting the Dask Client is optional. It will start the dashboard which is useful to gain insight on the computation.
from dask.distributed import Client, progress
client = Client(threads_per_worker=4,
n_workers=1,
processes=False,
memory_limit='4GB')
client
y.max().compute()
Numba also supports a JIT compilation with CUDA on compatible GPU devices.
This gives about a 200x speedup over a CPU on a single V100 GPU using numba.cuda.jit.
import numba.cuda
@numba.cuda.jit
def smooth_gpu(x, out):
i, j = cuda.grid(2)
n, m = x.shape
if 1 <= i < n - 1 and 1 <= j < m - 1:
out[i, j] = (x[i - 1, j - 1] + x[i - 1, j] + x[i - 1, j + 1] +
x[i , j - 1] + x[i , j] + x[i , j + 1] +
x[i + 1, j - 1] + x[i + 1, j] + x[i + 1, j + 1]) // 9
import cupy, math
x_gpu = cupy.ones((10000, 10000), dtype='int8')
out_gpu = cupy.zeros((10000, 10000), dtype='int8')
# I copied the four lines below from the Numba docs
threadsperblock = (16, 16)
blockspergrid_x = math.ceil(x_gpu.shape[0] / threadsperblock[0])
blockspergrid_y = math.ceil(x_gpu.shape[1] / threadsperblock[1])
blockspergrid = (blockspergrid_x, blockspergrid_y)
smooth_gpu[blockspergrid, threadsperblock](x_gpu, out_gpu)
Full notebook here