#!/usr/bin/env python # coding: utf-8 # # # ## Scikit-Image and Dask Experiment # # This notebook investigates performance parallelizing scikit-image operations. We find that Dask provides a very small speedup, despite consuming 400% CPU. We benchmark various components of the computation and find that we are likely bound by something other than CPU processing, like memory access. # # # # This work was done in collaboration with the following scikit-image developers at the SciPy 2017 sprints # # - Emmanuelle Gouillart # - Stefan Van der Walt # - Nelle Varoquaux # # # ### Download and load data # # This data is courtesy of Emmanuelle Gouillart: [250MB download](https://drive.google.com/file/d/0Byr4wsGTdf46a3NTV2V0NkZBTXc/view) # In[1]: import numpy as np dat = np.fromfile('Al8Cu_1000_g13_t4_200_250.vol', dtype=np.float32).reshape((50, 1104, 1104)) dat /= abs(dat).max() # In[2]: dat.shape # In[3]: get_ipython().run_line_magic('matplotlib', 'inline') import matplotlib.pyplot as plt fig = plt.figure(figsize=(10, 10)) plt.imshow(dat[40, :, :], cmap='gray') # ### Split array into equal chunks, perform overlapping gaussian filter # # Lets look at a common operation, smoothing the image with a gaussian filter. We find that calling this normally on the image takes around 2.5 seconds. # In[4]: import skimage.filters get_ipython().run_line_magic('time', '_ = skimage.filters.gaussian(dat, sigma=3)') # ### Parallelize with Dask.array # # We split this array into four chunks in the large spatial dimensions, leaving the short dimension (50) full. # # We then map the `skimage.filters.gaussian` function on each block, providing an overlap of 9, which should provide enough space for the gaussian filter to be smooth over chunk transitions. # In[5]: import dask.array as da # In[6]: get_ipython().run_cell_magic('time', '', "x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)\ny = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')\n") # In[7]: y # In[8]: get_ipython().run_line_magic('time', '_ = y.compute()') # We see a modest speedup (20-30%) which is far below the 300% speedup we might expect on our four-core machine. # # The rest of this notebook tries to identify the burdens involved in this computation. # ### Task generation costs # In[9]: get_ipython().run_cell_magic('time', '', "x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)\ny = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')\n") # #### Overlapping introduces some graph overhead # # There are about 100 tasks for four chunks. We can probably bring this down. # In[10]: len(y.dask) # In[11]: y.visualize(optimize_graph=True) # ### Memory copy costs from stitching # # The `dask.array.map_overlap` function (which is the main component of `skimage.apply_parallel`) does a fair amount of getitem and `np.concatenate` calls. These can be wasteful. Lets benchmark with a simple no-op computation. # In[12]: get_ipython().run_cell_magic('time', '', "x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False)\n# y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none')\ny = x.map_overlap(lambda x: x, depth=9, boundary='none')\n") # In[13]: get_ipython().run_line_magic('time', '_ = y.compute()') # This is non-trivial, and something that we can work to reduce, but also isn't our limiting factor. # ### Look at single-machine diagnostics # # Lets look at a profile of our computation in many threads # In[14]: from dask.diagnostics import Profiler, ResourceProfiler, visualize x = da.from_array(dat, chunks=(50, dat.shape[1] // 2, dat.shape[2] // 2), name=False) y = x.map_overlap(skimage.filters.gaussian, depth=9, sigma=3, boundary='none') with Profiler() as prof, ResourceProfiler(dt=0.1) as rprof: y.compute(optimize_graph=False) # In[15]: from bokeh.plotting import show, output_notebook output_notebook() show(visualize([prof, rprof], show=False)) # Lets make some observations: # # 1. All of our cores report being busy at the same time (400% cpu usage) so our lack of speedup isn't a GIL issue # 2. The green bars are for `skimage.filters.gaussian`. These take up most of the time. # 3. The blue-purple bars on the right are the re-stitching component that is from dask.array. These are significant and should be reduced, but will only offer modest improvements. # 4. We're using 3GB of memory for what is only a 250MB dataset # In[16]: dat.nbytes / 1e6 # ### So how well does skimage.filters.gaussian parallelize normally? # # Lets just call gaussian on `dat` in four threads at the same time. This is weak-scaling instead of strong-scaling. If gaussian isn't amenable to being run in multiple threads at the same time then this should take four times as long. If gaussian is entirely bound by CPU and doesn't introduce other constraints then this should run in the same time as a single computation # # There is no stitching here, so this is best case. # In[17]: get_ipython().run_line_magic('time', '_ = skimage.filters.gaussian(dat, sigma=3) # single computation in single thread') # In[18]: from concurrent.futures import ThreadPoolExecutor, wait e = ThreadPoolExecutor() # In[19]: get_ipython().run_cell_magic('time', '', 'futures = [e.submit(skimage.filters.gaussian, dat, sigma=3) for _ in range(4)]\nwait(futures)\n') # It looks like we're seeing a difference of 2x, so this is right in the middle. This Scikit-image function is partially parallelizable. # ### How about a different algorithm, like denoising # # Lets try another algorithm to see if we get a different reaction. We try the `skimage.restoration.denoise_tv_chambolle` function. This seems to use a great deal of RAM, so we choose to work on a subset of the data. # In[20]: get_ipython().run_line_magic('time', '_ = skimage.restoration.denoise_tv_chambolle(dat[:10, ...])') # In[21]: get_ipython().run_cell_magic('time', '', 'with ResourceProfiler(dt=0.1) as rprof:\n futures = [e.submit(skimage.restoration.denoise_tv_chambolle, dat[:10, ...]) for _ in range(4)]\n wait(futures)\n') # In[22]: show(visualize([rprof], show=False)) # ### Lest try this with the distributed scheduler # # This will use processes, which might have better properties # In[23]: from dask.distributed import Client client = Client(set_as_default=False) client # In[24]: get_ipython().run_cell_magic('time', '', 'from dask.distributed import wait as dask_wait\n\nfutures = [client.submit(skimage.restoration.denoise_tv_chambolle, dat[:10, ...], pure=False)\n for _ in range(4)]\ndask_wait(futures)\n') # Lets try pre-scattering data around. # In[25]: get_ipython().run_cell_magic('time', '', '\nfuture = client.scatter(dat[:10, ...], broadcast=True)\nfutures = [client.submit(skimage.restoration.denoise_tv_chambolle, future, pure=False)\n for _ in range(4)]\ndask_wait(futures)\n') # Doesn't help that much # # This is a strong indication that the issue isn't GIL-bound # ### Lets profile the gaussian function on a single machine # In[26]: get_ipython().run_line_magic('load_ext', 'snakeviz') # In[27]: get_ipython().run_cell_magic('snakeviz', '', '_ = skimage.filters.gaussian(dat, sigma=3)\n') # This renders a visual diagnostic in a separate window that shows that almost all time is taken within the `scipy.ndimage._nd_image.correlate1d` function. Unfortunately CProfile seems unable to dive more deeply than this. # ## Conclusions # # ### Concrete next steps # # I suspect that these scikit-image functions are bound by some other resource than CPU processing power. My first guess would be that they are accessing or creating memory in inefficient ways. It might be worth rewriting one or two functions in a way that avoided memory allocation and access and then retrying this sort of experiment. # # On the dask side we can reduce the task graph overhead a bit more. We might also consider just rewriting the `skimage.apply_parallel` function by hand with dask.delayed to remove some of the generalities of `dask.array.map_blocks` that may not be relevant here. # # ### General take-aways # # This is a good example of how parallel computing may be ineffective, especially if you are not bound by raw computing power. # # Fortunately, the variety of visual diagnostic and parallel computing tools at our disposal allows us to get a variety of perspectives on the problem and help guide our work. In particular we used the following: # # 1. Snakeviz to visualize CProfile outputs that help us understand which functions take up the most time in a single thread. # 2. The single-machine Dask diagnostics. These provide useful static plots that help us understand when each task ran and the CPU and Memory use during the computation. The resource profiler (cpu and memory use) in particular was helpful for normal computations, not just when using Dask. # 3. The Dask.distributed dashboard was useful to identify communication and load balancing issues, though it is difficult to include in static documents like this one, so we did not include much discussion here. # In[ ]: