Convert lots of small NetCDFs to one big Zarr

The National Water Model writes a new NetCDF file for each hour, resulting in 8760 files for a year, and 227904 files for the entire 26 year reanalysis (1993-01-01 00:00 - 2018-12-31 23:00).

For small datasets, rechunking the data to Zarr would be as simple as:

import xarray as xr
ds = xr.open_mfdataset('*.nc')
ds = ds.chunk({'time':672, 'feature_id':30000})
ds.to_zarr('all_nc.zarr', consolidated=True)

For large datasets, this approach is slow and uses too much memory. Here we process the data in batches of 672 time files at a time (one time chunk).

For each batch, we create an xarray dataset with open_mfdataset, then use rechunker, which creates a rechunked Zarr dataset for that batch. We then append each batch (each time chunk) along the time dimension, building up our overall dataset.

The nice part of this approach is that if something goes wrong with the batch, we can fix the problem and just carry on appending.

In [1]:
import numpy as np
import xarray as xr
import pandas as pd
import numcodecs
from dask.distributed import Client, progress, LocalCluster, performance_report
import zarr
import time
import shutil
In [2]:
import os
os.chdir('/usgs/gamone/data2/rsignell/data/NWM2')

Build a list of filenames for open_mfdataset

In [3]:
dates = pd.date_range(start='1993-01-01 00:00',end='2018-12-31 23:00', freq='1h')

files = ['./nc/{}/{}.CHRTOUT_DOMAIN1.comp'.format(date.strftime('%Y'),date.strftime('%Y%m%d%H%M')) for date in dates]
In [4]:
len(files)
Out[4]:
227904
In [5]:
dset = xr.open_dataset(files[0])
In [6]:
dset
Out[6]:
<xarray.Dataset>
Dimensions:         (feature_id: 2729077, reference_time: 1, time: 1)
Coordinates:
  * time            (time) datetime64[ns] 1993-01-01
  * reference_time  (reference_time) datetime64[ns] 1992-11-01
  * feature_id      (feature_id) int32 101 179 181 ... 1180001803 1180001804
    latitude        (feature_id) float32 ...
    longitude       (feature_id) float32 ...
Data variables:
    crs             |S1 b''
    order           (feature_id) int32 ...
    elevation       (feature_id) float32 ...
    streamflow      (feature_id) float64 ...
    q_lateral       (feature_id) float64 ...
    velocity        (feature_id) float64 ...
    qSfcLatRunoff   (feature_id) float64 ...
    qBucket         (feature_id) float64 ...
    qBtmVertRunoff  (feature_id) float64 ...
Attributes:
    featureType:                timeSeries
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
    model_initialization_time:  1992-11-01_00:00:00
    station_dimension:          feature_id
    model_output_valid_time:    1993-01-01_00:00:00
    model_total_valid_times:    1464
    stream_order_output:        1
    cdm_datatype:               Station
    Conventions:                CF-1.6
    code_version:               v5.1.0-alpha11
    model_output_type:          channel_rt
    model_configuration:        retrospective
    dev_OVRTSWCRT:              1
    dev_NOAH_TIMESTEP:          3600
    dev_channel_only:           0
    dev_channelBucket_only:     0
    dev:                        dev_ prefix indicates development/internal me...

A nice chunk size for object storage is on the order of 100Mb.

In [7]:
time_chunk_size = 672   
feature_chunk_size = 30000
In [8]:
81312/672
Out[8]:
121.0
In [9]:
nh_chunks = len(dset.feature_id)/feature_chunk_size
nh_chunks
Out[9]:
90.96923333333334
In [10]:
nt_chunks = int(np.ceil(len(files)/time_chunk_size))
nt_chunks
Out[10]:
340
In [11]:
(time_chunk_size * feature_chunk_size )*8 / 1e6
Out[11]:
161.28

... Close enough to 100Mb

Create a function to drop stuff that messes up open_mfdataset

In [12]:
def drop_coords(ds):
    ds = ds.drop(['reference_time','feature_id', 'crs'])
    return ds.reset_coords(drop=True)
In [13]:
#cluster.close(); client.close()

Create a local dask cluster

In [14]:
#cluster = LocalCluster()   # for this first step, having 32 workers is faster
cluster = LocalCluster(n_workers=8, threads_per_worker=1) 
cluster
In [15]:
client = Client(cluster)

Tell blosc not to use threads since we are using dask to parallelize

In [16]:
numcodecs.blosc.use_threads = False
In [17]:
zarr_step = 'zarr/step'
zarr_chunked = 'zarr/nwm'

zarr_temp = 'zarr/tmp'

Step our way through the dataset, reading one chunk along the time dimension at a time, to avoid dask reading too many chunks before writing and blowing out memory. First time chunk is written to zarr, then others are appended.

Set up Rechunker

In [18]:
from rechunker import rechunk
In [19]:
chunk_mem_factor = 0.8  #fraction of worker memory for each chunk (seems to be the max possible)
worker_mem = cluster.worker_spec[0]['options']['memory_limit']/1e9
max_mem = '{:.2f}GB'.format(chunk_mem_factor * worker_mem)
print(max_mem)
13.50GB

Do the big loop

In [20]:
%%time
for i in range(nt_chunks):
    print(i)
    istart = i * time_chunk_size
    istop = int(np.min([(i+1) * time_chunk_size, len(files)]))
    
    ds = xr.open_mfdataset(files[istart:istop], parallel=True, 
                           preprocess=drop_coords, combine='by_coords', 
                       concat_dim='time', join='override')
       
    # add back in the 'feature_id' coordinate removed by preprocessing 
    ds.coords['feature_id'] = dset.coords['feature_id']
    
    # chunk this step to zarr using rechunker

    # remote the temp and step zarr datasets
    try:
        shutil.rmtree(zarr_temp)
        while os.path.exists(zarr_temp): # check if it still exists
            pass
    except:
        pass

    try:
        shutil.rmtree(zarr_step)
        while os.path.exists(zarr_step): # check if it still exists
            pass
    except:
        pass
    
    chunk_plan={}
    for var in ds.data_vars:
        if len(ds[var].dims)==2:
            var_chunk = (time_chunk_size, feature_chunk_size)
            chunk_plan[var] = var_chunk

    array_plan = rechunk(ds, chunk_plan, max_mem, zarr_step, 
                     temp_store=zarr_temp)
    
    with performance_report(filename="dask-report.html"):
        result = array_plan.execute(retries=10)

    # read back in the zarr chunk rechunker wrote
    ds = xr.open_zarr(zarr_step)

    if i==0:
        ds.to_zarr(zarr_chunked, consolidated=True, mode='w')
    else:
        ds.to_zarr(zarr_chunked, consolidated=True, append_dim='time')
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<timed exec> in <module>

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, executor)
    301         target_options=target_options,
    302         temp_store=temp_store,
--> 303         temp_options=temp_options,
    304     )
    305     plan = executor.prepare_plan(copy_spec)

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options)
    381                 temp_store_or_group=temp_group,
    382                 temp_options=options,
--> 383                 name=name,
    384             )
    385             copy_spec.write.array.attrs.update(variable_attrs)  # type: ignore

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/api.py in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    484         itemsize,
    485         max_mem,
--> 486         consolidate_reads=consolidate_reads,
    487     )
    488 

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/algorithm.py in rechunking_plan(shape, source_chunks, target_chunks, itemsize, max_mem, consolidate_reads, consolidate_writes)
    127 
    128     if consolidate_writes:
--> 129         write_chunks = consolidate_chunks(shape, target_chunks, itemsize, max_mem)
    130     else:
    131         write_chunks = tuple(target_chunks)

~/miniconda3/envs/pangeo/lib/python3.7/site-packages/rechunker/algorithm.py in consolidate_chunks(shape, chunks, itemsize, max_mem, chunk_limits)
     51                 chunk_limit_per_axis[n_ax] = shape[n_ax]
     52             else:
---> 53                 raise ValueError(f"Invalid chunk_limits {chunk_limits}.")
     54 
     55     chunk_mem = itemsize * prod(chunks)

ValueError: Invalid chunk_limits (96, 2729077).

The workflow threw an error on the last partial chunk because Rechunker doesn't think the chunk_plan is valid. But it is valid to have a partial last chunk. Here we just rechunk the last partial chunk without rechunker and append it to the overall Zarr dataset.

In [22]:
ds1 = ds.chunk({'feature_id':feature_chunk_size, 'time':time_chunk_size})
In [23]:
ds1.to_zarr('./zarr/last_step', consolidated=True, mode='w')
Out[23]:
<xarray.backends.zarr.ZarrStore at 0x7f53d6518530>
In [24]:
ds2 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/last_step', consolidated=True)
In [25]:
ds2.to_zarr(zarr_chunked, consolidated=True, append_dim='time')
Out[25]:
<xarray.backends.zarr.ZarrStore at 0x7f544c8b4410>

Check the resulting chunked dataset for correct start time, stop time and for any gaps. If there are no gaps we should get just a single unique value of 3600s for the difference between the hourly time steps.

In [26]:
ds1 = xr.open_zarr('/usgs/gamone/data2/rsignell/data/NWM2/zarr/nwm', consolidated=True)
print(ds1.time[0].values)
print(ds1.time[-1].values)
1993-01-01T00:00:00.000000000
2018-12-31T23:00:00.000000000
In [27]:
d1 = ds1.time.diff(dim='time').values/1e9   # convert datetime64 nanoseconds to seconds
In [30]:
np.unique(d1)
Out[30]:
array([3600], dtype='timedelta64[ns]')
In [ ]:
#cluster.close();  client.close()
In [36]:
import hvplot.xarray
ds1.streamflow[:,1000].plot()
Matplotlib is building the font cache; this may take a moment.
Out[36]:
[<matplotlib.lines.Line2D at 0x7f53f34c1110>]
In [ ]: