import dask.array
from dask.diagnostics import ResourceProfiler
from multiprocessing.pool import ThreadPool
import dask.config
import matplotlib.pyplot as plt
import numpy as np
%%time
x = dask.array.zeros((12500, 10000), chunks=('10MB', -1))
dask.array.to_zarr(x, 'saved_x1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_x2.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y2.zarr', overwrite=True)
CPU times: user 3.15 s, sys: 1.26 s, total: 4.4 s Wall time: 1.46 s
x
|
x1 = dask.array.from_zarr('saved_x1.zarr')
y1 = dask.array.from_zarr('saved_x2.zarr')
x2 = dask.array.from_zarr('saved_y1.zarr')
y2 = dask.array.from_zarr('saved_y2.zarr')
def evaluate(x1, y1, x2, y2):
u = dask.array.stack([x1, y1])
v = dask.array.stack([x2, y2])
components = [u, v, u ** 2 + v ** 2]
return [
abs(c[0] - c[1]).mean(axis=-1)
for c in components
]
rprof = ResourceProfiler(dt=0.1)
with dask.config.set(pool=ThreadPool(1)):
with rprof:
%time results = evaluate(x1, y1, x2, y2)
%time dask.compute(results)
CPU times: user 25.8 ms, sys: 4.81 ms, total: 30.6 ms Wall time: 30.7 ms CPU times: user 5 s, sys: 1.76 s, total: 6.76 s Wall time: 6.7 s
results
[dask.array<mean_agg-aggregate, shape=(12500,), dtype=float64, chunksize=(125,), chunktype=numpy.ndarray>, dask.array<mean_agg-aggregate, shape=(12500,), dtype=float64, chunksize=(125,), chunktype=numpy.ndarray>, dask.array<mean_agg-aggregate, shape=(12500,), dtype=float64, chunksize=(125,), chunktype=numpy.ndarray>]
plt.plot(0.1 * np.arange(len(rprof.results)), [r.mem for r in rprof.results])
plt.ylabel('Memory usage (MB)')
plt.xlabel('Time (seconds)')
Text(0.5, 0, 'Time (seconds)')