Distributed, Advanced

Distributed futures

In [ ]:
from dask.distributed import Client
c = Client(n_workers=4)

In the previous chapter, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.

To begin, the futures interface (derived from the built-in concurrent.futures) allows map-reduce like functionality. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with submit() and map(). Notice that the call returns immediately, giving one or more futures, whose status begins as "pending" and later becomes "finished". There is no blocking of the local Python session.

Here is the simplest example of submit in action:

In [ ]:
def inc(x):
    return x + 1

fut = c.submit(inc, 1)

We can re-execute the following cell as often as we want as a way to poll the status of the future. This could of course be done in a loop, pausing for a short time on each iteration. We could continue with our work, or view a progressbar of work still going on, or force a wait until the future is ready.

In the meantime, the status dashboard (link above next to the Cluster widget) has gained a new element in the task stream, indicating that inc() has completed, and the progress section at the problem shows one task complete and held in memory.

In [ ]:

Possible alternatives you could investigate:

from dask.distributed import wait, progress

would show a progress bar in this notebook, rather than having to go to the dashboard. This progress bar is also asynchronous, and doesn't block the execution of other code in the meanwhile.


would block and force the notebook to wait until the computation pointed to by fut was done. However, note that the result of inc() is sitting in the cluster, it would take no time to execute the computation now, because Dask notices that we are asking for the result of a computation it already knows about. More on this later.

In [ ]:
# grab the information back - this blocks if fut is not ready
# equivalent action when only considering a single future
# fut.result()

Here we see an alternative way to execute work on the cluster: when you submit or map with the inputs as futures, the computation moves to the data rather than the other way around, and the client, in the local Python session, need never see the intermediate values. This is similar to building the graph using delayed, and indeed, delayed can be used in conjunction with futures. Here we use the delayed object total from before.

In [ ]:
# Some trivial work that takes time
# repeated from the Distributed chapter.

from dask import delayed
import time

def inc(x):
    return x + 1

def dec(x):
    return x - 1

def add(x, y):
    return x + y

x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)
In [ ]:
# notice the difference from total.compute()
# notice that this cell completes immediately
fut = c.compute(total)
In [ ]:
c.gather(fut) # waits until result is ready


submit takes a function and arguments, pushes these to the cluster, returning a Future representing the result to be computed. The function is passed to a worker process for evaluation. Note that this cell returns immediately, while computation may still be ongoing on the cluster.

In [ ]:
fut = c.submit(inc, 1)

This looks a lot like doing compute(), above, except now we are passing the function and arguments directly to the cluster. To anyone used to concurrent.futures, this will look familiar. This new fut behaves the same way as the one above. Note that we have now over-written the previous definition of fut, which will get garbage-collected, and, as a result, that previous result is released by the cluster

Exercise: Rebuild the above delayed computation using Client.submit instead

The arguments passed to submit can be futures from other submit operations or delayed objects. The former, in particular, demonstrated the concept of moving the computation to the data which is one of the most powerful elements of programming with Dask.

In [ ]:
# Your code here
In [ ]:
x = c.submit(inc, 1)
y = c.submit(dec, 2)
total = c.submit(add, x, y)

print(total)     # This is still a future
c.gather(total)   # This blocks until the computation has finished

Each futures represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.

We can explicitly pass data from our local session into the cluster using scatter(), but usually better is to construct functions that do the loading of data within the workers themselves, so that there is no need to serialise and communicate the data. Most of the loading functions within Dask, such as dd.read_csv, work this way. Similarly, we normally don't want to gather() results that are too big in memory.

The full API of the distributed scheduler gives details of interacting with the cluster, which remember, can be on your local machine or possibly on a massive computational resource.

The futures API offers a work submission style that can easily emulate the map/reduce paradigm (see c.map()) that may be familiar to many people. The intermediate results, represented by futures, can be passed to new tasks without having to bring the pull locally from the cluster, and new work can be assigned to work on the output of previous jobs that haven't even begun yet.

Generally, any Dask operation that is executed using .compute() can be submitted for asynchronous execution using c.compute() instead, and this applies to all collections. Here is an example with the calculation previously seen in the Bag chapter. We have replaced the .compute() method there with the distributed client version, so, again, we could continue to submit more work (perhaps based on the result of the calculation), or, in the next cell, follow the progress of the computation. A similar progress-bar appears in the monitoring UI page.

In [ ]:
%run prep.py -d accounts
In [ ]:
import dask.bag as db
import os
import json
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
js = lines.map(json.loads)

f = c.compute(js.filter(lambda record: record['name'] == 'Alice')
In [ ]:
from dask.distributed import progress
# note that progress must be the last line of a cell
# in order to show up
In [ ]:
# get result.
In [ ]:
# release values by deleting the futures
del f, fut, x, y, total


Considering which data should be loaded by the workers, as opposed to passed, and which intermediate values to persist in worker memory, will in many cases determine the computation efficiency of a process.

In the example here, we repeat a calculation from the Array chapter - notice that each call to compute() is roughly the same speed, because the loading of the data is included every time.

In [ ]:
%run prep.py -d random
In [ ]:
import h5py
import os
f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
dset = f['/x']
import dask.array as da
x = da.from_array(dset, chunks=(1000000,))

%time x.sum().compute()
%time x.sum().compute()

If, instead, we persist the data to RAM up front (this takes a few seconds to complete - we could wait() on this process), then further computations will be much faster.

In [ ]:
# changes x from a set of delayed prescriptions
# to a set of futures pointing to data in RAM
# See this on the UI dashboard.
x = c.persist(x)
In [ ]:
%time x.sum().compute()
%time x.sum().compute()

Naturally, persisting every intermediate along the way is a bad idea, because this will tend to fill up all available RAM and make the whole system slow (or break!). The ideal persist point is often at the end of a set of data cleaning steps, when the data is in a form which will get queried often.

Exercise: how is the memory associated with x released, once we know we are done with it?

In [ ]:

Asynchronous computation

One benefit of using the futures API is that you can have dynamic computations that adjust as things progress. Here we implement a simple naive search by looping through results as they come in, and submit new points to compute as others are still running.

Watching the diagnostics dashboard as this runs you can see computations are being concurrently run while more are being submitted. This flexibility can be useful for parallel algorithms that require some level of synchronization.

Lets perform a very simple minimization using dynamic programming. The function of interest is known as Rosenbrock:

In [ ]:
# a simple function with interesting minima
import time

def rosenbrock(point):
    """Compute the rosenbrock function and return the point and result"""
    score = (1 - point[0])**2 + 2 * (point[1] - point[0]**2)**2
    return point, score

Initial setup, including creating a graphical figure. We use Bokeh for this, which allows for dynamic update of the figure as results come in.

In [ ]:
from bokeh.io import output_notebook, push_notebook
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, show
import numpy as np

# set up plot background
N = 500
x = np.linspace(-5, 5, N)
y = np.linspace(-5, 5, N)
xx, yy = np.meshgrid(x, y)
d = (1 - xx)**2 + 2 * (yy - xx**2)**2
d = np.log(d)

p = figure(x_range=(-5, 5), y_range=(-5, 5))
p.image(image=[d], x=-5, y=-5, dw=10, dh=10, palette="Spectral11");

We start off with a point at (0, 0), and randomly scatter test points around it. Each evaluation takes ~100ms, and as result come in, we test to see if we have a new best point, and choose random points around that new best point, as the search box shrinks.

We print the function value and current best location each time we have a new best value.

In [ ]:
from dask.distributed import as_completed
from random import uniform

scale = 5                  # Initial random perturbation scale
best_point = (0, 0)        # Initial guess
best_score = float('inf')  # Best score so far
startx = [uniform(-scale, scale) for _ in range(10)]
starty = [uniform(-scale, scale) for _ in range(10)]

# set up plot
source = ColumnDataSource({'x': startx, 'y': starty, 'c': ['grey'] * 10})
p.circle(source=source, x='x', y='y', color='c')
t = show(p, notebook_handle=True)

# initial 10 random points
futures = [c.submit(rosenbrock, (x, y)) for x, y in zip(startx, starty)]
iterator = as_completed(futures)

for res in iterator:
    # take a completed point, is it an improvement?
    point, score = res.result()
    if score < best_score:
        best_score, best_point = score, point
        print(score, point)

    x, y = best_point
    newx, newy = (x + uniform(-scale, scale), y + uniform(-scale, scale))
    # update plot
    source.stream({'x': [newx], 'y': [newy], 'c': ['grey']}, rollover=20)
    # add new point, dynamically, to work on the cluster
    new_point = c.submit(rosenbrock, (newx, newy))
    iterator.add(new_point)  # Start tracking new task as well

    # Narrow search and consider stopping
    scale *= 0.99
    if scale < 0.001:


When something goes wrong in a distributed job, it is hard to figure out what the problem was and what to do about it. When a task raises an exception, the exception will show up when that result, or other result that depend upon it, is gathered.

Consider the following delayed calculation to be computed by the cluster. As usual, we get back a future, which the cluster is working on to compute (this happens very slowly for the trivial procedure).

In [ ]:
def ratio(a, b):
    return a // b

ina = [5, 25, 30]
inb = [5, 5, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)

We only get to know what happened when we gather the result (this is also true for out.compute(), except we could not have done other stuff in the meantime). For the first set of inputs, it works fine.

In [ ]:

But if we introduce bad input, an exception is raised. The exception happens in ratio, but only comes to our attention when calculating the sum.

In [ ]:
ina = [5, 25, 30]
inb = [5, 0, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)

The display in this case makes the origin of the exception obvious, but this is not always the case. How should this be debugged, how would we go about finding out the exact conditions that caused the exception?

The first step, of course, is to write well-tested code which makes appropriate assertions about its input and clear warnings and error messages when something goes wrong. This applies to all code.

The most typical thing to do is to execute some portion of the computation in the local thread, so that we can run the Python debugger and query the state of things at the time that the exception happened. Obviously, this cannot be performed on the whole data-set when dealing with Big Data on a cluster, but a suitable sample will probably do even then.

In [ ]:
import dask
with dask.config.set(scheduler="sync"):
    # do NOT use c.compute(out) here - we specifically do not
    # want the distributed scheduler
In [ ]:
# uncomment to enter post-mortem debugger
# %debug

The trouble with this approach is that Dask is meant for the execution of large datasets/computations - you probably can't simply run the whole thing in one local thread, else you wouldn't have used Dask in the first place. So the code above should only be used on a small part of the data that also exhibits the error. Furthermore, the method will not work when you are dealing with futures (such as f, above, or after persisting) instead of delayed-based computations.

As an alternative, you can ask the scheduler to analyze your calculation and find the specific sub-task responsible for the error, and pull only it and its dependencies locally for execution.

In [ ]:
In [ ]:
# uncomment to enter post-mortem debugger
# %debug

Finally, there are errors other than exceptions, when we need to look at the state of the scheduler/workers. In the standard "LocalCluster" we started, we have direct access to these.

In [ ]:
[(k, v.state) for k, v in c.cluster.scheduler.tasks.items() if v.exception is not None]