This notebook investigates performance parallelizing scikit-image operations. We find that Dask provides a very small speedup, despite consuming 400% CPU. We benchmark various components of the computation and find that we are likely bound by something other than CPU processing, like memory access.
This work was done in collaboration with the following scikit-image developers at the SciPy 2017 sprints
This data is courtesy of Emmanuelle Gouillart: 250MB download
import numpy as np
dat = np.fromfile('Al8Cu_1000_g13_t4_200_250.vol',
dtype=np.float32).reshape((50, 1104, 1104))
dat /= abs(dat).max()
dat.shape
(50, 1104, 1104)
%matplotlib inline
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(10, 10))
plt.imshow(dat[40, :, :], cmap='gray')
<matplotlib.image.AxesImage at 0x7f55f84ff710>
Lets look at a common operation, smoothing the image with a gaussian filter. We find that calling this normally on the image takes around 2.5 seconds.
import skimage.filters
%time _ = skimage.filters.gaussian(dat, sigma=3)
CPU times: user 2.24 s, sys: 64 ms, total: 2.3 s Wall time: 2.3 s
We split this array into four chunks in the large spatial dimensions, leaving the short dimension (50) full.
We then map the skimage.filters.gaussian
function on each block, providing an overlap of 9, which should provide enough space for the gaussian filter to be smooth over chunk transitions.
import dask.array as da
%%time
x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')
CPU times: user 4 ms, sys: 0 ns, total: 4 ms Wall time: 5.18 ms
y
dask.array<trim, shape=(50, 1104, 1104), dtype=float64, chunksize=(50, 552, 552)>
%time _ = y.compute()
CPU times: user 5.49 s, sys: 668 ms, total: 6.16 s Wall time: 1.94 s
We see a modest speedup (20-30%) which is far below the 300% speedup we might expect on our four-core machine.
The rest of this notebook tries to identify the burdens involved in this computation.
%%time
x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')
CPU times: user 4 ms, sys: 0 ns, total: 4 ms Wall time: 6.46 ms
There are about 100 tasks for four chunks. We can probably bring this down.
len(y.dask)
109
y.visualize(optimize_graph=True)
The dask.array.map_overlap
function (which is the main component of skimage.apply_parallel
) does a fair amount of getitem and np.concatenate
calls. These can be wasteful. Lets benchmark with a simple no-op computation.
%%time
x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
# y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')
y = x.map_overlap(lambda x: x, depth=9, boundary='none')
CPU times: user 8 ms, sys: 0 ns, total: 8 ms Wall time: 6.76 ms
%time _ = y.compute()
CPU times: user 468 ms, sys: 280 ms, total: 748 ms Wall time: 309 ms
This is non-trivial, and something that we can work to reduce, but also isn't our limiting factor.
Lets look at a profile of our computation in many threads
from dask.diagnostics import Profiler, ResourceProfiler, visualize
x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)
y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')
with Profiler() as prof, ResourceProfiler(dt=0.1) as rprof:
y.compute(optimize_graph=False)
from bokeh.plotting import show, output_notebook
output_notebook()
show(visualize([prof, rprof], show=False))
Lets make some observations:
skimage.filters.gaussian
. These take up most of the time.dat.nbytes / 1e6
243.7632
Lets just call gaussian on dat
in four threads at the same time. This is weak-scaling instead of strong-scaling. If gaussian isn't amenable to being run in multiple threads at the same time then this should take four times as long. If gaussian is entirely bound by CPU and doesn't introduce other constraints then this should run in the same time as a single computation
There is no stitching here, so this is best case.
%time _ = skimage.filters.gaussian(dat, sigma=3) # single computation in single thread
CPU times: user 2.41 s, sys: 76 ms, total: 2.49 s Wall time: 2.51 s
from concurrent.futures import ThreadPoolExecutor, wait
e = ThreadPoolExecutor()
%%time
futures = [e.submit(skimage.filters.gaussian, dat, sigma=3) for _ in range(4)]
wait(futures)
CPU times: user 18.1 s, sys: 660 ms, total: 18.8 s Wall time: 4.72 s
It looks like we're seeing a difference of 2x, so this is right in the middle. This Scikit-image function is partially parallelizable.
Lets try another algorithm to see if we get a different reaction. We try the skimage.restoration.denoise_tv_chambolle
function. This seems to use a great deal of RAM, so we choose to work on a subset of the data.
%time _ = skimage.restoration.denoise_tv_chambolle(dat[:10, ...])
CPU times: user 4.18 s, sys: 980 ms, total: 5.16 s Wall time: 5.17 s
%%time
with ResourceProfiler(dt=0.1) as rprof:
futures = [e.submit(skimage.restoration.denoise_tv_chambolle, dat[:10, ...]) for _ in range(4)]
wait(futures)
CPU times: user 55.1 s, sys: 12.6 s, total: 1min 7s Wall time: 17.7 s
show(visualize([rprof], show=False))
W-1005 (SNAPPED_TOOLBAR_ANNOTATIONS): Snapped toolbars and annotations on the same side MAY overlap visually: Figure(id='acda2fa8-8b21-4c8a-8fca-bb3ae95e6890', ...)
This will use processes, which might have better properties
from dask.distributed import Client
client = Client(set_as_default=False)
client
Client
|
Cluster
|
%%time
from dask.distributed import wait as dask_wait
futures = [client.submit(skimage.restoration.denoise_tv_chambolle, dat[:10, ...], pure=False)
for _ in range(4)]
dask_wait(futures)
/home/mrocklin/workspace/distributed/distributed/worker.py:652: UserWarning: Serialized function is very large. This is likely due to closing over large local variables. Serialized function is 49 MB warnings.warn(large_function_warning % (len(b) / 1e6))
CPU times: user 2.61 s, sys: 592 ms, total: 3.2 s Wall time: 19.3 s
Lets try pre-scattering data around.
%%time
future = client.scatter(dat[:10, ...], broadcast=True)
futures = [client.submit(skimage.restoration.denoise_tv_chambolle, future, pure=False)
for _ in range(4)]
dask_wait(futures)
CPU times: user 2.22 s, sys: 388 ms, total: 2.61 s Wall time: 18.6 s
Doesn't help that much
This is a strong indication that the issue isn't GIL-bound
%load_ext snakeviz
%%snakeviz
_ = skimage.filters.gaussian(dat, sigma=3)
*** Profile stats marshalled to file '/tmp/tmpyqxp_1hp'.
This renders a visual diagnostic in a separate window that shows that almost all time is taken within the scipy.ndimage._nd_image.correlate1d
function. Unfortunately CProfile seems unable to dive more deeply than this.
I suspect that these scikit-image functions are bound by some other resource than CPU processing power. My first guess would be that they are accessing or creating memory in inefficient ways. It might be worth rewriting one or two functions in a way that avoided memory allocation and access and then retrying this sort of experiment.
On the dask side we can reduce the task graph overhead a bit more. We might also consider just rewriting the skimage.apply_parallel
function by hand with dask.delayed to remove some of the generalities of dask.array.map_blocks
that may not be relevant here.
This is a good example of how parallel computing may be ineffective, especially if you are not bound by raw computing power.
Fortunately, the variety of visual diagnostic and parallel computing tools at our disposal allows us to get a variety of perspectives on the problem and help guide our work. In particular we used the following: