In this notebook we will learn how to save a numpy array in a zarr file for distributed computation.
We will use the Reflection response for Marchenko redatuming as sample data.
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import shutil
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import matplotlib.pyplot as plt
import scipy as sp
import dask.array as da
import pylops
import pylops_distributed
import zarr
from pylops.utils import dottest
Set env variable with path
#os.environ["STORE_PATH"] = ...
Local cluster
#client = pylops_distributed.utils.backend.dask(processes=False, threads_per_worker=2,
# n_workers=2)
#client
SSH cluster
from dask.distributed import Scheduler, Client
client = Client('be-linrgsn045:8786')
client
Client
|
Cluster
|
R = np.load('../data/marchenko/input.npz')['R']
R = np.swapaxes(R, 0, 1) # R[r, s, f] (for data with dipole source, you need to integrate over sources)
nr, ns, nt = R.shape
print(R.shape)
(101, 101, 800)
# Add negative time to operator and bring to frequency
nfmax = 500 # max frequency for MDC (#samples)
Rtwosided = np.concatenate((np.zeros((nr, ns, nt-1)), R), axis=-1)
Rtwosided_fft = np.fft.rfft(Rtwosided, 2*nt-1, axis=-1)/np.sqrt(2*nt-1)
Rtwosided_fft = Rtwosided_fft[...,:nfmax]
Rtwosided_fft = Rtwosided_fft.transpose(2, 0, 1) # R[f, r, s]
Rtwosided_fft = Rtwosided_fft.astype(np.complex64)
shutil.rmtree('../data/marchenko/input.zarr')
Rstore = zarr.DirectoryStore('../data/marchenko/input.zarr')
Rzaar = zarr.create(store=Rstore, overwrite=True,
shape=(nfmax, ns, nr), chunks=(nfmax // 4, ns, nr),
dtype=np.complex64)
Rzaar.info
Type | zarr.core.Array |
---|---|
Data type | complex64 |
Shape | (500, 101, 101) |
Chunk shape | (125, 101, 101) |
Order | C |
Read-only | False |
Compressor | Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0) |
Store type | zarr.storage.DirectoryStore |
No. bytes | 40804000 (38.9M) |
No. bytes stored | 396 |
Storage ratio | 103040.4 |
Chunks initialized | 0/4 |
Let's write inside
Rzaar[:, : ,:] = Rtwosided_fft
Now we get it into dask
Rda = da.from_zarr('../data/marchenko/input.zarr/')
Rda
|
R = np.load('../data/marchenko/3DMarchenko_data_samesrcrec.npz')['data']
R = np.swapaxes(R, 0, 1) # R[r, s, f] (for data with dipole source, you need to integrate over sources)
nr, ns, nt = R.shape
print(R.shape)
(2451, 2451, 601)
shutil.rmtree(os.environ["STORE_PATH"]+'Marchenko3D/input3D.zarr')
Rstore = zarr.DirectoryStore(os.environ["STORE_PATH"]+'Marchenko3D/input3D.zarr')
Rzaar = zarr.create(store=Rstore, overwrite=True,
shape=(nfmax, ns, nr), chunks=(nfmax // 20, ns, nr),
dtype=np.complex64)
Rzaar.info
Type | zarr.core.Array |
---|---|
Data type | complex64 |
Shape | (500, 2451, 2451) |
Chunk shape | (25, 2451, 2451) |
Order | C |
Read-only | False |
Compressor | Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0) |
Store type | zarr.storage.DirectoryStore |
No. bytes | 24029604000 (22.4G) |
No. bytes stored | 399 |
Storage ratio | 60224571.4 |
Chunks initialized | 0/20 |
Perform fft in batches and save to zaar
recs_batch = 30
irecs_in = np.arange(0, nr, recs_batch)
irecs_end = irecs_in + recs_batch
irecs_end[-1] = nr-1
for irec_in, irec_end in zip(irecs_in, irecs_end):
print('Processing batch %d/%d' % (irec_in / recs_batch, len(irecs_in)))
Rtwosided = np.concatenate((np.zeros(((irec_end - irec_in), ns, nt-1)), R[irec_in:irec_end]), axis=-1)
Rtwosided_fft = np.fft.rfft(Rtwosided, 2*nt-1, axis=-1)/np.sqrt(2*nt-1)
Rtwosided_fft = Rtwosided_fft[..., :nfmax]
Rtwosided_fft = Rtwosided_fft.transpose(2, 0, 1) # R[f, r, s]
Rtwosided_fft = Rtwosided_fft.astype(np.complex64)
Rzaar[:, int(irec_in):int(irec_end) ,:] = Rtwosided_fft
Rda = da.from_zarr(os.environ["STORE_PATH"]+'Marchenko3D/input3D.zarr')
Rda
|
plt.figure(figsize=(10, 10))
plt.imshow(np.abs(Rda[100].compute()))
plt.axis('tight');
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client