The National Water Model writes a new NetCDF file for each hour, resulting in 8760 files for a year, and 227904 files for the entire 26 year reanalysis (1993-01-01 00:00 - 2018-12-31 23:00).
For small datasets, rechunking the data to Zarr would be as simple as:
import xarray as xr
ds = xr.open_mfdataset('*.nc')
ds = ds.chunk({'time':672, 'feature_id':30000})
ds.to_zarr('all_nc.zarr', consolidated=True)
For large datasets, this approach is slow and uses too much memory. Here we process the data in batches of 672 time files at a time (one time chunk).
For each batch, we create an xarray dataset with open_mfdataset, then use rechunker, which creates a rechunked Zarr dataset for that batch. We then append each batch (each time chunk) along the time dimension, building up our overall dataset.
The nice part of this approach is that if something goes wrong with the batch, we can fix the problem and just carry on appending.
import numpy as np
import xarray as xr
import pandas as pd
import numcodecs
from dask.distributed import Client, progress, LocalCluster, performance_report
import zarr
import time
import shutil
import os
os.chdir('/usgs/gamone/data2/rsignell/data/NWM2')
Build a list of filenames for open_mfdataset
dates = pd.date_range(start='1993-01-01 00:00',end='2018-12-31 23:00', freq='1h')
files = ['./nc/{}/{}.CHRTOUT_DOMAIN1.comp'.format(date.strftime('%Y'),date.strftime('%Y%m%d%H%M')) for date in dates]
len(files)
dset = xr.open_dataset(files[0])
dset
A nice chunk size for object storage is on the order of 100Mb.
time_chunk_size = 672
feature_chunk_size = 30000
81312/672
nh_chunks = len(dset.feature_id)/feature_chunk_size
nh_chunks
nt_chunks = int(np.ceil(len(files)/time_chunk_size))
nt_chunks
(time_chunk_size * feature_chunk_size )*8 / 1e6
... Close enough to 100Mb
Create a function to drop stuff that messes up open_mfdataset
def drop_coords(ds):
ds = ds.drop(['reference_time','feature_id', 'crs'])
return ds.reset_coords(drop=True)
#cluster.close(); client.close()
Create a local dask cluster
#cluster = LocalCluster() # for this first step, having 32 workers is faster
cluster = LocalCluster(n_workers=8, threads_per_worker=1)
cluster
client = Client(cluster)
Tell blosc not to use threads since we are using dask to parallelize
numcodecs.blosc.use_threads = False
zarr_step = 'zarr/step'
zarr_chunked = 'zarr/nwm'
zarr_temp = 'zarr/tmp'
Step our way through the dataset, reading one chunk along the time dimension at a time, to avoid dask reading too many chunks before writing and blowing out memory. First time chunk is written to zarr, then others are appended.
from rechunker import rechunk
chunk_mem_factor = 0.8 #fraction of worker memory for each chunk (seems to be the max possible)
worker_mem = cluster.worker_spec[0]['options']['memory_limit']/1e9
max_mem = '{:.2f}GB'.format(chunk_mem_factor * worker_mem)
print(max_mem)
%%time
for i in range(nt_chunks):
print(i)
istart = i * time_chunk_size
istop = int(np.min([(i+1) * time_chunk_size, len(files)]))
ds = xr.open_mfdataset(files[istart:istop], parallel=True,
preprocess=drop_coords, combine='by_coords',
concat_dim='time', join='override')
# add back in the 'feature_id' coordinate removed by preprocessing
ds.coords['feature_id'] = dset.coords['feature_id']
# chunk this step to zarr using rechunker
# remote the temp and step zarr datasets
try:
shutil.rmtree(zarr_temp)
while os.path.exists(zarr_temp): # check if it still exists
pass
except:
pass
try:
shutil.rmtree(zarr_step)
while os.path.exists(zarr_step): # check if it still exists
pass
except:
pass
chunk_plan={}
for var in ds.data_vars:
if len(ds[var].dims)==2:
var_chunk = (time_chunk_size, feature_chunk_size)
chunk_plan[var] = var_chunk
array_plan = rechunk(ds, chunk_plan, max_mem, zarr_step,
temp_store=zarr_temp)
with performance_report(filename="dask-report.html"):
result = array_plan.execute(retries=10)
# read back in the zarr chunk rechunker wrote
ds = xr.open_zarr(zarr_step)
if i==0:
ds.to_zarr(zarr_chunked, consolidated=True, mode='w')
else:
ds.to_zarr(zarr_chunked, consolidated=True, append_dim='time')
The workflow threw an error on the last partial chunk because Rechunker doesn't think the chunk_plan is valid. But it is valid to have a partial last chunk. Here we just rechunk the last partial chunk without rechunker and append it to the overall Zarr dataset.
ds1 = ds.chunk({'feature_id':feature_chunk_size, 'time':time_chunk_size})
ds1.to_zarr('./zarr/last_step', consolidated=True, mode='w')
ds2 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/last_step', consolidated=True)
ds2.to_zarr(zarr_chunked, consolidated=True, append_dim='time')
Check the resulting chunked dataset for correct start time, stop time and for any gaps. If there are no gaps we should get just a single unique value of 3600s for the difference between the hourly time steps.
ds1 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/nwm', consolidated=True)
print(ds1.time[0].values)
print(ds1.time[-1].values)
d1 = ds1.time.diff(dim='time').values/1e9 # convert datetime64 nanoseconds to seconds
np.unique(d1)
#cluster.close(); client.close()
import hvplot.xarray
ds1.streamflow[:,1000].plot()