#!/usr/bin/env python # coding: utf-8 # # Download specific station data from the NWM # Read the zarr dataset on s3. # Use a local Dask cluster and write to Zarr to speed things up! # In[17]: import xarray as xr import fsspec import numpy as np import dask # ### Start a Dask cluster # This is not required, but speeds up computations. Here we start a local cluster by just doing: # ``` # from dask.distributed import Client # client = Client() # ``` # but there are [many other ways to set up Dask clusters](https://docs.dask.org/en/latest/setup.html) that can scale larger than this. In particular if you are running on HPC you might try using Dask JobQueue. # In[2]: from dask.distributed import Client client = Client() # In[3]: client # Open Zarr datasets in Xarray using a mapper from fsspec. We use `anon=True` for free-access public buckets like the AWS Open Data Program, and `requester_pays=True` for requester-pays public buckets. # In[4]: url = 's3://noaa-nwm-retro-v2-zarr-pds' # In[7]: fs = fsspec.filesystem('s3', anon=True) # In[8]: get_ipython().run_cell_magic('time', '', 'ds = xr.open_zarr(fs.get_mapper(url), consolidated=True)\n') # In[12]: print(f"Variable size: {ds['streamflow'].nbytes/1e12:.1f} TB") # In[13]: idx = (ds.latitude > 41.0) & (ds.latitude < 51.0) & (ds.longitude > -75.0) & (ds.longitude < -62.0) # In[18]: get_ipython().run_cell_magic('time', '', "with dask.config.set(**{'array.slicing.split_large_chunks': False}):\n ds_out = ds[['streamflow']].isel(feature_id=idx).sel(time=slice('2000-01-01',None))\n") # We want to just specify the chunking in each dimension, but xarray wants us to specify the chunking for each variable. So use a function that creates the chunking for each variable in a dataset: # In[19]: def gchunks(ds_chunk, chunks): group_chunks = {} for var in ds_chunk.variables: # pick appropriate chunks from above, and default to full length chunks for dimensions that are not in `chunks` above. group_chunks[var] = [] for di in ds_chunk[var].dims: if di in chunks.keys(): if chunks[di] > len(ds_chunk[di]): group_chunks[var].append(len(ds_chunk[di])) else: group_chunks[var].append(chunks[di]) else: group_chunks[var].append(len(ds_chunk[di])) ds_chunk[var] = ds_chunk[var].chunk(tuple(group_chunks[var])) group_chunks[var] = {'chunks':tuple(group_chunks[var])} return group_chunks # In[20]: encoding = gchunks(ds_out, {'time':672, 'feature_id':10000}) # In[22]: get_ipython().run_cell_magic('time', '', "# write local zarr\nds_out.to_zarr('./zarr/gulf_of_maine', mode='w', encoding=encoding)\n") # In[ ]: get_ipython().run_cell_magic('time', '', "# write zarr to S3\n#fs2 = fsspec.filesystem('s3', anon=False, profile='esip-qhub')\n#ds_out.to_zarr(fs2.get_mapper('esip-qhub/usgs/rsignell/testing/zarr/gulf_of_maine'), \n# mode='w', encoding=encoding)\n")