NWM ReferenceFileSystem JSON

Create ReferenceFileSystem JSON file for a collection of NWM NetCDF files on S3

In [1]:
import os
import fsspec
import ujson   # fast json
from fsspec_reference_maker.hdf import SingleHdf5ToZarr 
from fsspec_reference_maker.combine import MultiZarrToZarr
import xarray as xr
import pandas as pd
In [2]:
import fsspec_reference_maker
fsspec_reference_maker.__version__
Out[2]:
'0.0.1+57.g395385b'
In [3]:
fs = fsspec.filesystem('s3', anon=True)

Create the list of dates

NWM is stored in yyyy/yyyymmddhhmm format, so generate a list of datetimes and use them to generate URLS

In [4]:
import pandas as pd
dates = pd.date_range(start='1979-02-01 01:00',end='2020-12-31 23:00', freq='1h')
In [5]:
len(dates)
Out[5]:
367439
In [6]:
def date2cfile(date):
# Create S3 URL from date
    yyyymmddhh = date.strftime('%Y%m%d%H')
    yyyy = date.strftime('%Y')
    cfile = f's3://noaa-nwm-retrospective-2-1-pds/model_output/{yyyy}/{yyyymmddhh}00.CHRTOUT_DOMAIN1.comp'
    return cfile

We need to include the "s3://" prefix to the list of files so that fsspec will recognize that these JSON files are on S3:

In [7]:
urls = [date2cfile(date) for date in dates]

so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
In [8]:
print(urls[0])
print(urls[-1])
s3://noaa-nwm-retrospective-2-1-pds/model_output/1979/197902010100.CHRTOUT_DOMAIN1.comp
s3://noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312300.CHRTOUT_DOMAIN1.comp

Start a Dask Gateway cluster

This was done using a special function on the JupyterHub system I'm using that propegates certain environment variables and AWS credentials. The rest of this notebook should work no matter how you spin up Dask workers/clusters. This cluster contains 30 workers

In [9]:
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd

ebd.set_credentials(profile='esip-qhub')

profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max, 
                                      region=region, use_existing_cluster=True,
                                      adaptive_scaling=False, wait_for_cluster=False, 
                                      environment='pangeo', worker_profile='Pangeo Worker', 
                                      propagate_env=True)
/home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/dask_gateway/client.py:21: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead.
  from distributed.utils import LoopRunner, format_bytes
No Cluster running.
Starting new cluster.
Setting Fixed Scaling workers=30
Reconnect client to clear cache
client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub):
https://jupyter.qhub.esipfed.org/gateway/clusters/dev.358879f07f6b4d0b82e92a13e338f7bf/status
Propagating environment variables to workers

Install the latest fsspec-reference-maker on the dask workers

Since the environment on this JupyterHub does not include the latest version of fsspec-reference-maker, this code propegates a pip install to each of the workers

In [10]:
import subprocess
import logging
from distributed import WorkerPlugin

class PipPlugin(WorkerPlugin):
    """
    Install packages on a worker as it starts up.

    Parameters
    ----------
    packages : List[str]
        A list of packages to install with pip on startup.
    """
    def __init__(self, packages):
        self.packages = packages

    def setup(self, worker):
        logger = logging.getLogger("distributed.worker")
        subprocess.call(['python', '-m', 'pip', 'install', '--upgrade'] + self.packages)
        logger.info("Installed %s", self.packages)
In [11]:
plugin = PipPlugin(['fsspec-reference-maker'])
In [12]:
client.register_worker_plugin(plugin);

Function to generate combined references for a given URL list

This function loops over a list of data file URLs, creates a reference (in dict form), and appends those references to single_references. Then, single_references (a list of reference dicts) is passed to MultiZarrToZarr to combine into a single reference, which is returned.

In [13]:
def gen_reference(urls):
    
    def preprocess(ds):
        return ds.set_coords(['latitude', 'longitude'])
    
    single_references = []
    for u in urls:
        with fs.open(u, **so) as infile:
            single_references.append(SingleHdf5ToZarr(infile, u, inline_threshold=300).translate())
    
    mzz = MultiZarrToZarr(
        single_references,
        remote_protocol="s3",
        remote_options={'anon':True},
        xarray_open_kwargs={
            "decode_cf" : False,
            "mask_and_scale" : False,
            "drop_variables": ["crs", "reference_time"]
        },
        xarray_concat_args={
            'dim' : 'time'
        },
        preprocess=preprocess
    )

    return [mzz.translate(template_count=None)]

Parallel creation of references using Dask Bag

While it's certainly possible to pass all 367,000 URLs to the function above, it would be very slow and probably run into memory issues. Instead, we can use dask.bag to parallelize reference generation. Below, I take the list of URLs, use dask.bag to split it into 400 partitions, and run the gen_reference() function on each partition. What is returned is a list of 400 combined references, which can then be further combined with MultiZarrToZarr into a single file.

Note: This will take a long time, depending on the number of workers used. With 30 workers on a dedicated cluster, this took 2h 20min to complete

In [14]:
import dask.bag as db
In [15]:
b = db.from_sequence(urls, npartitions=400).map_partitions(gen_reference)
In [ ]:
%%time
out = b.compute(retries=10)
In [25]:
len(out)
Out[25]:
400

Now, combine the 400 reference dicts into one, and write to JSON

In [5]:
def preprocess(ds):
    return ds.set_coords(['latitude', 'longitude'])

mzz = MultiZarrToZarr(
    out,
    remote_protocol="s3",
    remote_options={'anon':True},
    xarray_open_kwargs={
        "decode_cf" : False,
        "mask_and_scale" : False,
        "drop_variables": ["crs", "reference_time"]
    },
    xarray_concat_args={
        'dim' : 'time'
    },
    preprocess=preprocess
)
In [6]:
%time mzz.translate('./full-workflow-combined.json', template_count=None)
CPU times: user 46.7 s, sys: 2.18 s, total: 48.9 s
Wall time: 56.1 s

Open the data with fsspec and xarray

Open data with xarray and plot

In [7]:
%%time
fs = fsspec.filesystem('reference', fo='./full-workflow-combined.json', remote_protocol='s3', remote_options=dict(anon=True))
ds = xr.open_dataset(fs.get_mapper(""), engine='zarr')
<timed exec>:2: RuntimeWarning: Failed to open Zarr store with consolidated metadata, falling back to try reading non-consolidated metadata. This is typically much slower for opening a dataset. To silence this warning, consider:
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
/home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'qBtmVertRunoff' has multiple fill values {-9999000, 0}, decoding all values to NaN.
  new_vars[k] = decode_cf_variable(
/home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'qBucket' has multiple fill values {-999900000, 0}, decoding all values to NaN.
  new_vars[k] = decode_cf_variable(
/home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'qSfcLatRunoff' has multiple fill values {-999900000, 0}, decoding all values to NaN.
  new_vars[k] = decode_cf_variable(
/home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'q_lateral' has multiple fill values {0, -99990}, decoding all values to NaN.
  new_vars[k] = decode_cf_variable(
/home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'streamflow' has multiple fill values {0, -999900}, decoding all values to NaN.
  new_vars[k] = decode_cf_variable(
/home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'velocity' has multiple fill values {0, -999900}, decoding all values to NaN.
  new_vars[k] = decode_cf_variable(
CPU times: user 29 s, sys: 2.09 s, total: 31 s
Wall time: 35.2 s
In [8]:
ds
Out[8]:
<xarray.Dataset>
Dimensions:         (time: 367439, feature_id: 2776738)
Coordinates:
  * feature_id      (feature_id) float64 101.0 179.0 181.0 ... 1.18e+09 1.18e+09
    latitude        (feature_id) float32 ...
    longitude       (feature_id) float32 ...
  * time            (time) datetime64[ns] 1979-02-01T01:00:00 ... 2020-12-31T...
Data variables:
    elevation       (time, feature_id) float32 ...
    order           (time, feature_id) float64 ...
    qBtmVertRunoff  (time, feature_id) float64 ...
    qBucket         (time, feature_id) float64 ...
    qSfcLatRunoff   (time, feature_id) float64 ...
    q_lateral       (time, feature_id) float64 ...
    streamflow      (time, feature_id) float64 ...
    velocity        (time, feature_id) float64 ...
Attributes: (12/19)
    Conventions:                CF-1.6
    TITLE:                      OUTPUT FROM WRF-Hydro v5.2.0-beta2
    _NCProperties:              version=2,netcdf=4.7.4,hdf5=1.10.7,
    cdm_datatype:               Station
    code_version:               v5.2.0-beta2
    dev:                        dev_ prefix indicates development/internal me...
    ...                         ...
    model_output_type:          channel_rt
    model_output_valid_time:    1979-02-01_01:00:00
    model_total_valid_times:    1416
    proj4:                      +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
    station_dimension:          feature_id
    stream_order_output:        1

Plotting code below courtesy of Rich Signell (USGS)

In [9]:
ds1 = ds.sel(time='2017-08-27 18:00:00', method='nearest')
In [10]:
var = 'streamflow'
df = ds1[var].to_pandas().to_frame()
In [11]:
date_title = pd.to_datetime(ds1.time.values).strftime('%Y-%m-%d %H:%M:%S')
date_title = f'{var}: {date_title}'
In [12]:
df = df.assign(latitude=ds1.latitude)
df = df.assign(longitude=ds1.longitude)
df.rename(columns={0: var}, inplace=True)
In [13]:
df
Out[13]:
streamflow latitude longitude
feature_id
1.010000e+02 0.38 31.086876 -94.640541
1.790000e+02 NaN 46.022163 -67.986412
1.810000e+02 NaN 46.016491 -67.998726
1.830000e+02 NaN 46.020847 -67.998833
1.850000e+02 NaN 46.019711 -67.998619
... ... ... ...
1.180002e+09 NaN 32.158100 -115.953217
1.180002e+09 NaN 32.162037 -115.935883
1.180002e+09 NaN 32.159569 -115.981506
1.180002e+09 NaN 32.155148 -115.958038
1.180002e+09 NaN 32.158184 -115.937431

2776738 rows × 3 columns

In [14]:
import hvplot.pandas
import geoviews as gv
from holoviews.operation.datashader import rasterize
import cartopy.crs as ccrs
import numpy as np
import pandas as pd