Create ReferenceFileSystem JSON file for a collection of NWM NetCDF files on S3
This repository uses fsspec-reference-maker
, which can be installed with
pip install git+https://github.com/intake/fsspec-reference-maker
import os
import fsspec
import ujson # fast json
from fsspec_reference_maker.hdf import SingleHdf5ToZarr
from fsspec_reference_maker.combine import MultiZarrToZarr
import xarray as xr
import pandas as pd
import fsspec_reference_maker
fsspec_reference_maker.__version__
fs = fsspec.filesystem('s3', anon=True)
NWM is stored in yyyy/yyyymmddhhmm
format, so generate a list of datetimes and use them to generate URLS
import pandas as pd
dates = pd.date_range(start='1979-02-01 01:00',end='2020-12-31 23:00', freq='1h')
len(dates)
def date2cfile(date):
# Create S3 URL from date
yyyymmddhh = date.strftime('%Y%m%d%H')
yyyy = date.strftime('%Y')
cfile = f's3://noaa-nwm-retrospective-2-1-pds/model_output/{yyyy}/{yyyymmddhh}00.CHRTOUT_DOMAIN1.comp'
return cfile
We need to include the "s3://" prefix to the list of files so that fsspec will recognize that these JSON files are on S3:
urls = [date2cfile(date) for date in dates]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
print(urls[0])
print(urls[-1])
This was done using a special function on the JupyterHub system I'm using that propegates certain environment variables and AWS credentials. The rest of this notebook should work no matter how you spin up Dask workers/clusters. This cluster contains 30 workers
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
ebd.set_credentials(profile='esip-qhub')
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
environment='pangeo', worker_profile='Pangeo Worker',
propagate_env=True)
Since the environment on this JupyterHub does not include the latest version of fsspec-reference-maker
, this code propegates a pip install
to each of the workers
import subprocess
import logging
from distributed import WorkerPlugin
class PipPlugin(WorkerPlugin):
"""
Install packages on a worker as it starts up.
Parameters
----------
packages : List[str]
A list of packages to install with pip on startup.
"""
def __init__(self, packages):
self.packages = packages
def setup(self, worker):
logger = logging.getLogger("distributed.worker")
subprocess.call(['python', '-m', 'pip', 'install', '--upgrade'] + self.packages)
logger.info("Installed %s", self.packages)
plugin = PipPlugin(['git+https://github.com/intake/fsspec-reference-maker'])
client.register_worker_plugin(plugin);
This function loops over a list of data file URLs, creates a reference (in dict
form), and appends those references to single_references
. Then, single_references
(a list of reference dicts) is passed to MultiZarrToZarr
to combine into a single reference, which is returned.
def gen_reference(urls):
def preprocess(ds):
return ds.set_coords(['latitude', 'longitude'])
single_references = []
for u in urls:
with fs.open(u, **so) as infile:
single_references.append(SingleHdf5ToZarr(infile, u, inline_threshold=300).translate())
mzz = MultiZarrToZarr(
single_references,
remote_protocol="s3",
remote_options={'anon':True},
xarray_open_kwargs={
"decode_cf" : False,
"mask_and_scale" : False,
"drop_variables": ["crs", "reference_time"]
},
xarray_concat_args={
'dim' : 'time'
},
preprocess=preprocess
)
return [mzz.translate(template_count=None)]
While it's certainly possible to pass all 367,000 URLs to the function above, it would be very slow and probably run into memory issues. Instead, we can use dask.bag
to parallelize reference generation. Below, I take the list of URLs, use dask.bag
to split it into 400 partitions, and run the gen_reference()
function on each partition. What is returned is a list of 400 combined references, which can then be further combined with MultiZarrToZarr
into a single file.
Note: This will take a long time, depending on the number of workers used. With 30 workers on a dedicated cluster, this took 2h 20min to complete
import dask.bag as db
b = db.from_sequence(urls, npartitions=400).map_partitions(gen_reference)
%%time
out = b.compute(retries=10)
len(out)
def preprocess(ds):
return ds.set_coords(['latitude', 'longitude'])
mzz = MultiZarrToZarr(
out,
remote_protocol="s3",
remote_options={'anon':True},
xarray_open_kwargs={
"decode_cf" : False,
"mask_and_scale" : False,
"drop_variables": ["crs", "reference_time"]
},
xarray_concat_args={
'dim' : 'time'
},
preprocess=preprocess
)
%time mzz.translate('./full-workflow-combined.json', template_count=None)
fsspec
and xarray
¶%%time
fs = fsspec.filesystem('reference', fo='./full-workflow-combined.json', remote_protocol='s3', remote_options=dict(anon=True))
ds = xr.open_dataset(fs.get_mapper(""), engine='zarr')
ds
ds1 = ds.sel(time='2017-08-27 18:00:00', method='nearest')
var = 'streamflow'
df = ds1[var].to_pandas().to_frame()
date_title = pd.to_datetime(ds1.time.values).strftime('%Y-%m-%d %H:%M:%S')
date_title = f'{var}: {date_title}'
df = df.assign(latitude=ds1.latitude)
df = df.assign(longitude=ds1.longitude)
df.rename(columns={0: var}, inplace=True)
df
import hvplot.pandas
import geoviews as gv
from holoviews.operation.datashader import rasterize
import cartopy.crs as ccrs
import numpy as np
import pandas as pd