Create ReferenceFileSystem JSON file for a collection of NWM NetCDF files on S3
This repository uses fsspec-reference-maker
, which can be installed with
pip install git+https://github.com/intake/fsspec-reference-maker
import os
import fsspec
import ujson # fast json
from fsspec_reference_maker.hdf import SingleHdf5ToZarr
from fsspec_reference_maker.combine import MultiZarrToZarr
import xarray as xr
import pandas as pd
import fsspec_reference_maker
fsspec_reference_maker.__version__
'0.0.1+57.g395385b'
fs = fsspec.filesystem('s3', anon=True)
NWM is stored in yyyy/yyyymmddhhmm
format, so generate a list of datetimes and use them to generate URLS
import pandas as pd
dates = pd.date_range(start='1979-02-01 01:00',end='2020-12-31 23:00', freq='1h')
len(dates)
367439
def date2cfile(date):
# Create S3 URL from date
yyyymmddhh = date.strftime('%Y%m%d%H')
yyyy = date.strftime('%Y')
cfile = f's3://noaa-nwm-retrospective-2-1-pds/model_output/{yyyy}/{yyyymmddhh}00.CHRTOUT_DOMAIN1.comp'
return cfile
We need to include the "s3://" prefix to the list of files so that fsspec will recognize that these JSON files are on S3:
urls = [date2cfile(date) for date in dates]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
print(urls[0])
print(urls[-1])
s3://noaa-nwm-retrospective-2-1-pds/model_output/1979/197902010100.CHRTOUT_DOMAIN1.comp s3://noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312300.CHRTOUT_DOMAIN1.comp
This was done using a special function on the JupyterHub system I'm using that propegates certain environment variables and AWS credentials. The rest of this notebook should work no matter how you spin up Dask workers/clusters. This cluster contains 30 workers
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
ebd.set_credentials(profile='esip-qhub')
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
environment='pangeo', worker_profile='Pangeo Worker',
propagate_env=True)
/home/conda/store/578732b5a39dadc3cadc71a29a33e58ded03e8a9d5c888edac6151d80eda2868-pangeo/lib/python3.8/site-packages/dask_gateway/client.py:21: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead. from distributed.utils import LoopRunner, format_bytes
No Cluster running. Starting new cluster. Setting Fixed Scaling workers=30 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://jupyter.qhub.esipfed.org/gateway/clusters/dev.e8fb0e1c2c8e4cad8e1966375c516f6a/status Propagating environment variables to workers
Since the environment on this JupyterHub does not include the latest version of fsspec-reference-maker
, this code propegates a pip install
to each of the workers
import subprocess
import logging
from distributed import WorkerPlugin
class PipPlugin(WorkerPlugin):
"""
Install packages on a worker as it starts up.
Parameters
----------
packages : List[str]
A list of packages to install with pip on startup.
"""
def __init__(self, packages):
self.packages = packages
def setup(self, worker):
logger = logging.getLogger("distributed.worker")
subprocess.call(['python', '-m', 'pip', 'install', '--upgrade'] + self.packages)
logger.info("Installed %s", self.packages)
plugin = PipPlugin(['git+https://github.com/intake/fsspec-reference-maker'])
client.register_worker_plugin(plugin);
This function loops over a list of data file URLs, creates a reference (in dict
form), and appends those references to single_references
. Then, single_references
(a list of reference dicts) is passed to MultiZarrToZarr
to combine into a single reference, which is returned.
def gen_reference(urls):
def preprocess(ds):
return ds.set_coords(['latitude', 'longitude'])
single_references = []
for u in urls:
with fs.open(u, **so) as infile:
single_references.append(SingleHdf5ToZarr(infile, u, inline_threshold=300).translate())
mzz = MultiZarrToZarr(
single_references,
remote_protocol="s3",
remote_options={'anon':True},
xarray_open_kwargs={
"decode_cf" : False,
"mask_and_scale" : False,
"drop_variables": ["crs", "reference_time"]
},
xarray_concat_args={
'dim' : 'time'
},
preprocess=preprocess
)
return [mzz.translate(template_count=None)]
While it's certainly possible to pass all 367,000 URLs to the function above, it would be very slow and probably run into memory issues. Instead, we can use dask.bag
to parallelize reference generation. Below, I take the list of URLs, use dask.bag
to split it into 400 partitions, and run the gen_reference()
function on each partition. What is returned is a list of 400 combined references, which can then be further combined with MultiZarrToZarr
into a single file.
Note: This will take a long time, depending on the number of workers used. With 30 workers on a dedicated cluster, this took 2h 20min to complete
import dask.bag as db
b = db.from_sequence(urls, npartitions=400).map_partitions(gen_reference)
%%time
out = b.compute(retries=10)
CPU times: user 15.2 s, sys: 1.11 s, total: 16.3 s Wall time: 2h 20min 29s
len(out)
400
def preprocess(ds):
return ds.set_coords(['latitude', 'longitude'])
mzz = MultiZarrToZarr(
out,
remote_protocol="s3",
remote_options={'anon':True},
xarray_open_kwargs={
"decode_cf" : False,
"mask_and_scale" : False,
"drop_variables": ["crs", "reference_time"]
},
xarray_concat_args={
'dim' : 'time'
},
preprocess=preprocess
)
%time mzz.translate('./full-workflow-combined.json', template_count=None)
CPU times: user 46.7 s, sys: 2.18 s, total: 48.9 s Wall time: 56.1 s
fsspec
and xarray
¶%%time
fs = fsspec.filesystem('reference', fo='./full-workflow-combined.json', remote_protocol='s3', remote_options=dict(anon=True))
ds = xr.open_dataset(fs.get_mapper(""), engine='zarr')
<timed exec>:2: RuntimeWarning: Failed to open Zarr store with consolidated metadata, falling back to try reading non-consolidated metadata. This is typically much slower for opening a dataset. To silence this warning, consider: 1. Consolidating metadata in this existing store with zarr.consolidate_metadata(). 2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or 3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata. /home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'qBtmVertRunoff' has multiple fill values {-9999000, 0}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'qBucket' has multiple fill values {-999900000, 0}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'qSfcLatRunoff' has multiple fill values {-999900000, 0}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'q_lateral' has multiple fill values {0, -99990}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'streamflow' has multiple fill values {0, -999900}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/store/896e738a7fff13f931bce6a4a04b3575ecd1f4cbd0e7da9d83afcc7273e57b60-pangeo/lib/python3.8/site-packages/xarray/conventions.py:512: SerializationWarning: variable 'velocity' has multiple fill values {0, -999900}, decoding all values to NaN. new_vars[k] = decode_cf_variable(
CPU times: user 29 s, sys: 2.09 s, total: 31 s Wall time: 35.2 s
ds
<xarray.Dataset> Dimensions: (time: 367439, feature_id: 2776738) Coordinates: * feature_id (feature_id) float64 101.0 179.0 181.0 ... 1.18e+09 1.18e+09 latitude (feature_id) float32 ... longitude (feature_id) float32 ... * time (time) datetime64[ns] 1979-02-01T01:00:00 ... 2020-12-31T... Data variables: elevation (time, feature_id) float32 ... order (time, feature_id) float64 ... qBtmVertRunoff (time, feature_id) float64 ... qBucket (time, feature_id) float64 ... qSfcLatRunoff (time, feature_id) float64 ... q_lateral (time, feature_id) float64 ... streamflow (time, feature_id) float64 ... velocity (time, feature_id) float64 ... Attributes: (12/19) Conventions: CF-1.6 TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 _NCProperties: version=2,netcdf=4.7.4,hdf5=1.10.7, cdm_datatype: Station code_version: v5.2.0-beta2 dev: dev_ prefix indicates development/internal me... ... ... model_output_type: channel_rt model_output_valid_time: 1979-02-01_01:00:00 model_total_valid_times: 1416 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
array([1.010000e+02, 1.790000e+02, 1.810000e+02, ..., 1.180002e+09, 1.180002e+09, 1.180002e+09])
[2776738 values with dtype=float32]
[2776738 values with dtype=float32]
array(['1979-02-01T01:00:00.000000000', '1979-02-01T02:00:00.000000000', '1979-02-01T03:00:00.000000000', ..., '2020-12-31T21:00:00.000000000', '2020-12-31T22:00:00.000000000', '2020-12-31T23:00:00.000000000'], dtype='datetime64[ns]')
[1020281833982 values with dtype=float32]
[1020281833982 values with dtype=float64]
[1020281833982 values with dtype=float64]
[1020281833982 values with dtype=float64]
[1020281833982 values with dtype=float64]
[1020281833982 values with dtype=float64]
[1020281833982 values with dtype=float64]
[1020281833982 values with dtype=float64]
ds1 = ds.sel(time='2017-08-27 18:00:00', method='nearest')
var = 'streamflow'
df = ds1[var].to_pandas().to_frame()
date_title = pd.to_datetime(ds1.time.values).strftime('%Y-%m-%d %H:%M:%S')
date_title = f'{var}: {date_title}'
df = df.assign(latitude=ds1.latitude)
df = df.assign(longitude=ds1.longitude)
df.rename(columns={0: var}, inplace=True)
df
streamflow | latitude | longitude | |
---|---|---|---|
feature_id | |||
1.010000e+02 | 0.38 | 31.086876 | -94.640541 |
1.790000e+02 | NaN | 46.022163 | -67.986412 |
1.810000e+02 | NaN | 46.016491 | -67.998726 |
1.830000e+02 | NaN | 46.020847 | -67.998833 |
1.850000e+02 | NaN | 46.019711 | -67.998619 |
... | ... | ... | ... |
1.180002e+09 | NaN | 32.158100 | -115.953217 |
1.180002e+09 | NaN | 32.162037 | -115.935883 |
1.180002e+09 | NaN | 32.159569 | -115.981506 |
1.180002e+09 | NaN | 32.155148 | -115.958038 |
1.180002e+09 | NaN | 32.158184 | -115.937431 |
2776738 rows × 3 columns
import hvplot.pandas
import geoviews as gv
from holoviews.operation.datashader import rasterize
import cartopy.crs as ccrs
import numpy as np
import pandas as pd
p = df.hvplot.points(x='longitude', y='latitude', geo=True,
c=var, colorbar=True, size=14, label=date_title)
g = rasterize(p, aggregator='mean', x_sampling=0.02,
y_sampling=0.02, width=500).opts(tools=['hover'],
aspect='equal', logz=True, cmap='viridis', clim=(1e-2, np.nan))
g * gv.tile_sources.OSM