Download specific station data from the NWM

Read the zarr dataset on s3. Use a local Dask cluster and write to Zarr to speed things up!

In [17]:
import xarray as xr
import fsspec
import numpy as np
import dask

Start a Dask cluster

This is not required, but speeds up computations. Here we start a local cluster by just doing:

from dask.distributed import Client
client = Client()

but there are many other ways to set up Dask clusters that can scale larger than this. In particular if you are running on HPC you might try using Dask JobQueue.

In [2]:
from dask.distributed import Client
client = Client()
In [3]:
client
Out[3]:

Client

Cluster

  • Workers: 6
  • Cores: 36
  • Memory: 137.44 GB

Open Zarr datasets in Xarray using a mapper from fsspec. We use anon=True for free-access public buckets like the AWS Open Data Program, and requester_pays=True for requester-pays public buckets.

In [4]:
url = 's3://noaa-nwm-retro-v2-zarr-pds'
In [7]:
fs = fsspec.filesystem('s3', anon=True)
In [8]:
%%time
ds = xr.open_zarr(fs.get_mapper(url), consolidated=True)
CPU times: user 1.52 s, sys: 98.3 ms, total: 1.62 s
Wall time: 9.47 s
In [12]:
print(f"Variable size: {ds['streamflow'].nbytes/1e12:.1f} TB")
Variable size: 5.0 TB
In [13]:
idx = (ds.latitude > 41.0) & (ds.latitude < 51.0) & (ds.longitude > -75.0) & (ds.longitude < -62.0)
In [18]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ds_out = ds[['streamflow']].isel(feature_id=idx).sel(time=slice('2000-01-01',None))
CPU times: user 664 ms, sys: 73.8 ms, total: 738 ms
Wall time: 19.2 s

We want to just specify the chunking in each dimension, but xarray wants us to specify the chunking for each variable. So use a function that creates the chunking for each variable in a dataset:

In [19]:
def gchunks(ds_chunk, chunks):
    group_chunks = {}

    for var in ds_chunk.variables:
        # pick appropriate chunks from above, and default to full length chunks for dimensions that are not in `chunks` above.
        group_chunks[var] = []
        for di in ds_chunk[var].dims:
            if di in chunks.keys():
                if chunks[di] > len(ds_chunk[di]):
                    group_chunks[var].append(len(ds_chunk[di]))
                else:
                    group_chunks[var].append(chunks[di])

            else:
                group_chunks[var].append(len(ds_chunk[di]))

        ds_chunk[var] = ds_chunk[var].chunk(tuple(group_chunks[var]))
        group_chunks[var] = {'chunks':tuple(group_chunks[var])}
    return group_chunks
In [20]:
encoding = gchunks(ds_out, {'time':672, 'feature_id':10000})
In [22]:
%%time
# write local zarr
ds_out.to_zarr('./zarr/gulf_of_maine', mode='w', encoding=encoding)
CPU times: user 4min 3s, sys: 10.7 s, total: 4min 13s
Wall time: 23min 1s
Out[22]:
<xarray.backends.zarr.ZarrStore at 0x2aac442298e0>
In [ ]:
%%time
# write zarr to S3
#fs2 = fsspec.filesystem('s3', anon=False, profile='esip-qhub')
#ds_out.to_zarr(fs2.get_mapper('esip-qhub/usgs/rsignell/testing/zarr/gulf_of_maine'), 
#                mode='w', encoding=encoding)