#!/usr/bin/env python # coding: utf-8 # # Support for dask arrays # # It is possible to operate on dask arrays and spare the memory (or perhaps even time). # In[1]: # Necessary imports import dask import dask.multiprocessing import physt import numpy as np import dask.array as da from physt import h1, h2 get_ipython().run_line_magic('matplotlib', 'inline') # In[2]: # Create two arrays np.random.seed(42) SIZE = 2 ** 21 CHUNK = int(SIZE / 16) million = np.random.rand(SIZE)#.astype(int) million2 = (3 * million + np.random.normal(0., 0.3, SIZE))#.astype(int) # Chunk them for dask chunked = da.from_array(million, chunks=(CHUNK)) chunked2 = da.from_array(million2, chunks=(CHUNK)) # ## Create histograms # # `h1`, `h2`, ... have their alternatives in `physt.dask_compat`. They should work similarly. Although, they are not complete and unexpected errors may occur. # In[3]: from physt.compat.dask import h1 as d1 from physt.compat.dask import h2 as d2 # In[4]: # Use chunks to create a 1D histogram ha = d1(chunked2, "fixed_width", 0.2) check_ha = h1(million2, "fixed_width", 0.2) ok = (ha == check_ha) print("Check: ", ok) ha.plot() ha # In[5]: # Use chunks to create a 2D histogram hb = d2(chunked, chunked2, "fixed_width", .2, axis_names=["x", "y"]) check_hb = h2(million, million2, "fixed_width", .2, axis_names=["x", "y"]) hb.plot(show_zero=False, cmap="rainbow") ok = (hb == check_hb) print("Check: ", ok) hb # In[6]: # And another cross-check hh = hb.projection("y") hh.plot() print("Check: ", np.array_equal(hh.frequencies, ha.frequencies)) # Just frequencies # In[7]: # Use dask for normal arrays (will automatically split array to chunks) d1(million2, "fixed_width", 0.2) == ha # ## Some timings # # Your results may vary substantially. These numbers are just for illustration, on 4-core (8-thread) machine. The real gain comes when we have data that don't fit into memory. # ### Efficiency # In[8]: # Standard get_ipython().run_line_magic('time', 'h1(million2, "fixed_width", 0.2)') # In[9]: # Same array, but using dask get_ipython().run_line_magic('time', 'd1(million2, "fixed_width", 0.2)') # In[10]: # Most efficient: dask with already chunked data get_ipython().run_line_magic('time', 'd1(chunked2, "fixed_width", 0.2)') # ### Different scheduling # In[11]: get_ipython().run_line_magic('time', 'd1(chunked2, "fixed_width", 0.2)') # In[12]: get_ipython().run_cell_magic('time', '', '# Hyper-threading or not?\ngraph, name = d1(chunked2, "fixed_width", 0.2, compute=False)\ndask.threaded.get(graph, name, num_workers=4)\n') # In[13]: # Multiprocessing not so efficient for small arrays? get_ipython().run_line_magic('time', 'd1(chunked2, "fixed_width", 0.2, dask_method=dask.multiprocessing.get)')