Threads and processes
Multithreading in Python is split into two groups: multithreading and multiprocessing.
Multithreading means you have one Python process. Due to the way that Python is implemented with Global Interpreter Lock (GIL), you can only run one Python instruction at a time, even from multiple threads. This is very limiting, but not the end of the world for multithreading. One loophole is that this only is valid for Python instructions; as long as they don't change Python's internal memory model (like changing refcounts), compiled code is allowed to escape the GIL. This include JIT code like Numba!
The other method is multiprocessing. This involves creating two or more Python processes, with their own memory space, then either transferring data (via Pickle) or by sharing selected portions of memory (less common before Python 3.8, but possible). This is much heaver-weight than threading, but can be used effectively.
import asyncio
import concurrent.futures
import sys # noqa: F401
import threading
import time
import matplotlib.pyplot as plt
import numpy as np
def prepare(height, width):
c = np.sum(
np.broadcast_arrays(*np.ogrid[-1j : 0j : height * 1j, -1.5 : 0 : width * 1j]),
axis=0,
)
fractal = np.zeros_like(c, dtype=np.int32)
return c, fractal
def run(c, fractal, maxiterations=20):
z = c
for i in range(1, maxiterations + 1):
z = z**2 + c # Compute z
diverge = abs(z) > 2 # Divergence criteria
z[diverge] = 2 # Keep number size small
fractal[~diverge] = i # Fill in non-diverged iteration number
return fractal
size = 4000, 3000
c, fractal = prepare(*size)
%%time
fractal = run(c, fractal)
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);
import threading
c, fractal = prepare(*size)
def piece(i):
ci = c[10 * i : 10 * (i + 1), :]
fi = fractal[10 * i : 10 * (i + 1), :]
run(ci, fi)
workers = []
for i in range(size[0] // 10):
workers.append(threading.Thread(target=piece, args=(i,)))
Note: You can also use the OO interface, which I sometimes prefer.
class Worker(threading.Thread):
def __init__(self, c, fractal, i):
super(Worker, self).__init__()
self.c = c
self.fractal = fractal
self.i = i
def run(self):
run(self.c[10*self.i : 10*(self.i + 1), :], self.fractal[10*self.i : 10*(self.i + 1), :])
workers = []
for i in range(size[0]//10):
workers.append(Worker(c, fractal, i))
%%time
for worker in workers:
worker.start()
for worker in workers:
worker.join()
len(workers)
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);
Python 3 introduced an "executor" interface that manages workers for you. Instead of creating threads or processes with a run
method, you create an executor and send work to it. It has the same interface for threads and processes.
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
def piece(i):
ci = c[10 * i : 10 * (i + 1), :]
fi = fractal[10 * i : 10 * (i + 1), :]
run(ci, fi)
c, fractal = prepare(*size)
%%time
futures = executor.map(piece, range(size[0] // 10))
for _ in futures: # iterating over them waits for the results
pass
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);
Python has native support for async functions, which don't use threads at all, and were originally even implemented on top of generators. This is a control scheme that makes all suspension points explicit, using the "await" keyword. This is much easier to program multiple threads sharing data, since you can see the await points; you are running in a single thread except for being able to temporarily give control elsewhere when "await" is present.
This can, however, be integrated with threading, and can take advantage of functions that release the GIL.
async def compute_async():
await asyncio.gather(*(asyncio.to_thread(piece, x) for x in range(size[0] // 10)))
c, fractal = prepare(*size)
IPython supports top-level await, but it does not support it in some magics, including the time related magics. So we'll replace the time magic ourselves.
start = time.time()
await compute_async()
print(time.time() - start)
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);
It's often difficult to get "perfect scaling," N times more work from N threads, in real situations. Even though this problem is "embarrassingly parallel" (none of the workers need to know other workers' results), there can be scheduling overhead, contention for memory, or slow-downs due to Python's Global Interpreter Lock.
One way to avoid the global interpreter lock is to send work to separate processes. Python interpreters in separate processes do not share memory and therefore do not need to coordinate.
However, that means that we can't send data by simply sharing variables. We have to send it through a multiprocessing.Queue
(which serializes— pickles— the data so that it can go through a pipe). In Python 3.8, we have the new multiprocessing.shared_memory
module!
You can share arrays among processes if you declare them as shared memory before launching the subprocesses. Python has a special type for this:
%%writefile multiproc.py
import concurrent.futures
import multiprocessing.shared_memory
import time
import numpy as np
size = 4000, 3000
def prepare(height, width):
c = np.sum(
np.broadcast_arrays(*np.ogrid[-1j : 0j : height * 1j, -1.5 : 0 : width * 1j]),
axis=0,
)
fractal = np.zeros_like(c, dtype=np.int32)
return c, fractal
def run(c, fractal, maxiterations=20):
z = c
for i in range(1, maxiterations + 1):
z = z**2 + c # Compute z
diverge = abs(z) > 2 # Divergence criteria
z[diverge] = 2 # Keep number size small
fractal[~diverge] = i # Fill in non-diverged iteration number
return fractal
c, orig_fractal = prepare(*size)
def piece(i):
mem = multiprocessing.shared_memory.SharedMemory(name="perfdistnumpy")
fractal = np.ndarray(shape=c.shape, dtype=np.int32, buffer=mem.buf)
ci = c[10 * i : 10 * (i + 1), :]
fi = fractal[10 * i : 10 * (i + 1), :]
run(ci, fi)
mem.close()
if __name__ == "__main__":
d_size = np.int32().itemsize * np.prod(orig_fractal.size)
mem = multiprocessing.shared_memory.SharedMemory(
name="perfdistnumpy", create=True, size=d_size
)
try:
fractal = np.ndarray(shape=c.shape, dtype=np.int32, buffer=mem.buf)
fractal[...] = orig_fractal
executor = concurrent.futures.ProcessPoolExecutor(max_workers=8)
start = time.time()
futures = executor.map(piece, range(size[0] // 10))
for _ in futures: # iterating over them waits for the results
pass
print(time.time() - start, "seconds")
print(fractal)
finally:
mem.close()
mem.unlink()
This is shared across processes and can enen outlive the owning process, so make sure you close (per process) and unlink (once) the memory you take! Having a fixed name (like above) can be safer.
!{sys.executable} multiproc.py
Now let's try an external package: Dask.
Still, there needs to be a better way. Our array slices in piece
are fragile: an indexing error can ruin the result. Can't the problem of scattering work be generalized?
import dask.array
c, fractal = prepare(*size)
c = dask.array.from_array(c, chunks=(10, size[1]))
fractal = dask.array.from_array(fractal, chunks=(10, size[1]))
%%time
fractal = run(c, fractal)
That's AWESOME! So fast for so little. Let's take a look!
fractal
What the heck? This is not an array: it is a description of how to make an array. Dask has stepped through our procedure and built an execution graph, encoding all the dependencies so that it can correctly apply it to individual chunks. When we execute this graph, Dask will send a chunk to each processor in the computer and combine results.
%%time
fractal = fractal.compute()
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);
Much better and less exciting. Dask now did our chunking for us. It's not any faster than single threaded, though; we need to point it to a scheduler/worker. This could be local, or it could be many computers anywhere in the world!
We seem to have paid for this simplicity: it took twice as long as the carefully sliced pieces
in the executor.
The reason is that our code is not as simple as it looks. It has masking and piecemeal assignments, which in principle could introduce complex dependencies. We know that everything will be fine if you just chop up the array in independent sections— and thus we implemented our thread and executor-based solutions that way.
Let me show you what Dask has to do for a 1×1 chunking of our problem.
c, fractal = prepare(1, 1) # try 2, 2
c = dask.array.from_array(c, chunks=(1, 1))
fractal = dask.array.from_array(fractal, chunks=(1, 1))
fractal = run(c, fractal, maxiterations=1) # try more iterations
fractal.visualize()
If that were all, I'd probably stick to chopping up the grid by hand (when possible). However, exactly the same interface that distributes work across cores in my laptop can distribute work around the world, just by pointing it to a remote scheduler.
This is truly the lazy busy researcher approach!
Note to self: launch
dask-scheduler & dask-worker --nthreads 8 127.0.0.1:8786 &in a terminal now.
import dask.distributed
client = dask.distributed.Client("127.0.0.1:8786")
client
c, fractal = prepare(*size)
c = dask.array.from_array(c, chunks=(100, size[1]))
fractal = dask.array.from_array(fractal, chunks=(100, size[1]))
fractal = run(c, fractal)
%%time
fractal = client.compute(fractal, sync=True)
Well, that was exciting!
In the end, this example took longer than the single-core version, but it illustrates how array operations can be distributed in a simple way.
I haven't shown very much of what Dask can do. It's a general toolkit for delayed and distributed evaluation. As such, it provides a nice way to work on Pandas-like DataFrames that are too large for memory:
import dask.dataframe
df = dask.dataframe.read_csv("data/nasa-exoplanets.csv")
df
We don't see the data because they haven't been loaded. But we can get them if we need them.
df[["pl_hostname", "pl_pnum"]].compute()
Additionally, Dask isn't the only project filling this need. There's also:
and many smaller projects.
(Distributed computing hasn't been fully figured out yet.)