Create ReferenceFileSystem JSON file for a collection of NWM NetCDF files on S3
import os
import fsspec
import ujson # fast json
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import xarray as xr
import dask
import hvplot.xarray
fs = fsspec.filesystem('s3', anon=True)
fs.ls('noaa-nwm-retrospective-2-1-pds/model_output/1979/')[:5]
['noaa-nwm-retrospective-2-1-pds/model_output/1979/', 'noaa-nwm-retrospective-2-1-pds/model_output/1979/197902010100.CHRTOUT_DOMAIN1.comp', 'noaa-nwm-retrospective-2-1-pds/model_output/1979/197902010100.GWOUT_DOMAIN1.comp', 'noaa-nwm-retrospective-2-1-pds/model_output/1979/197902010100.LAKEOUT_DOMAIN1.comp', 'noaa-nwm-retrospective-2-1-pds/model_output/1979/197902010200.CHRTOUT_DOMAIN1.comp']
fs.ls('noaa-nwm-retrospective-2-1-pds/model_output/2020/')[-5:]
['noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312200.GWOUT_DOMAIN1.comp', 'noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312200.LAKEOUT_DOMAIN1.comp', 'noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312300.CHRTOUT_DOMAIN1.comp', 'noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312300.GWOUT_DOMAIN1.comp', 'noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312300.LAKEOUT_DOMAIN1.comp']
import pandas as pd
dates = pd.date_range(start='1979-02-01 01:00',end='2020-12-31 23:00', freq='1h')
len(dates)
367439
def date2cfile(date):
# Create S3 URL from date
yyyymmddhh = date.strftime('%Y%m%d%H')
yyyy = date.strftime('%Y')
cfile = f's3://noaa-nwm-retrospective-2-1-pds/model_output/{yyyy}/{yyyymmddhh}00.CHRTOUT_DOMAIN1.comp'
return cfile
We need to include the "s3://" prefix to the list of files so that fsspec will recognize that these JSON files are on S3:
urls = [date2cfile(date) for date in dates]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
print(urls[0])
print(urls[-1])
s3://noaa-nwm-retrospective-2-1-pds/model_output/1979/197902010100.CHRTOUT_DOMAIN1.comp s3://noaa-nwm-retrospective-2-1-pds/model_output/2020/202012312300.CHRTOUT_DOMAIN1.comp
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
ebd.set_credentials(profile='esip-qhub')
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
worker_profile='Medium Worker',
propagate_env=True)
/home/conda/users/4361c28f1215abfc8e92ba79945ca2259eea5a4db76840e1e31529c3bad1700a-20220519-164930-465860-30-pangeo/lib/python3.9/site-packages/dask_gateway/client.py:21: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead. from distributed.utils import LoopRunner, format_bytes
Region: us-west-2 No Cluster running. Starting new cluster. Setting Cluster Environment Variable AWS_DEFAULT_REGION us-west-2 Setting Fixed Scaling workers=30 Reconnect client to clear cache client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub): https://jupyter.qhub.esipfed.org/gateway/clusters/dev.177bac8ad0a144679b62be366c04ad2c/status Propagating environment variables to workers Using environment: users/pangeo
We passed AWS credentials to the Dask workers via environment variables above, and the dask workers don't have the AWS credentials file with profiles defined, so we don't define a profile here, we just set anon=False
and let the workers find the credentials via the environment variables:
fs2 = fsspec.filesystem('s3', anon=False)
If the directory exists, remove it (and all the files)
json_dir = 's3://esip-qhub/testing/nwm_reanalysis_v21/jsons/'
try:
fs2.rm(json_dir, recursive=True)
except:
pass
urls[0].split('/')[-1]
'197902010100.CHRTOUT_DOMAIN1.comp'
def gen_json(u):
with fs.open(u, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
p = u.split('/')
fname = p[-1]
outf = f'{json_dir}{fname}.json'
print(outf)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
import dask.bag as db
len(urls)
367439
# tid bit
b = db.from_sequence(urls[:90], npartitions=3)
# whole enchilada
# b = db.from_sequence(urls, npartitions=900)
b1 = b.map(gen_json)
%%time
from dask.distributed import performance_report
with performance_report(filename="dask-report-whole.html"):
b1.compute(retries=10)
CPU times: user 228 ms, sys: 151 ms, total: 379 ms Wall time: 48.1 s
%%time
json_list = fs2.ls(json_dir)
json_list = sorted(['s3://'+f for f in json_list])
json_list[0]
CPU times: user 16.2 ms, sys: 0 ns, total: 16.2 ms Wall time: 74.1 ms
's3://esip-qhub/testing/nwm_reanalysis_v21/jsons/197902010100.CHRTOUT_DOMAIN1.comp.json'
len(json_list)
90
Since MultiZarrtoZarr only writes local files, we close the client and create a local cluster instead
client.close()
from dask.distributed import Client
client = Client(n_workers=1)
client
Client-da346f8a-e273-11ec-8395-ea80a2936d8d
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
1e16cd03
Dashboard: http://127.0.0.1:8787/status | Workers: 1 |
Total threads: 3 | Total memory: 16.00 GiB |
Status: running | Using processes: True |
Scheduler-0abcdd03-2227-4257-a4b9-b19d6c0f24d2
Comm: tcp://127.0.0.1:46803 | Workers: 1 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 3 |
Started: Just now | Total memory: 16.00 GiB |
Comm: tcp://127.0.0.1:32829 | Total threads: 3 |
Dashboard: http://127.0.0.1:40865/status | Memory: 16.00 GiB |
Nanny: tcp://127.0.0.1:44513 | |
Local directory: /shared/users/rsignell/notebooks/NWM/dask-worker-space/worker-0q_rvqqz |
import kerchunk
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options=dict(anon=True),
concat_dims = ['time'],
identical_dims=["crs", "latitude", "longitude"],
preprocess = kerchunk.combine.drop("reference_time"))
%%time
#takes the list of files in mzz and writes it to an output file
d = mzz.translate()
CPU times: user 591 ms, sys: 44.8 ms, total: 636 ms Wall time: 3.06 s
fs = fsspec.filesystem('file')
combined_json = f'nwm_combined.json'
with fs.open(combined_json, 'wb') as f:
#create json file and write it to the path specified above (f)
#dumps: dictionary to string
#translate: translate contents of HDF5 to Zarr
f.write(ujson.dumps(d).encode());
rpath = 's3://esip-qhub-public/testing/nwm/nwm_reanalysis_v21.json'
fs2.put_file(lpath=combined_json, rpath=rpath)
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", chunks={}, backend_kwargs=dict(consolidated=False))
/home/conda/users/4361c28f1215abfc8e92ba79945ca2259eea5a4db76840e1e31529c3bad1700a-20220519-164930-465860-30-pangeo/lib/python3.9/site-packages/xarray/conventions.py:516: SerializationWarning: variable 'qBtmVertRunoff' has multiple fill values {-9999000, 0}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/users/4361c28f1215abfc8e92ba79945ca2259eea5a4db76840e1e31529c3bad1700a-20220519-164930-465860-30-pangeo/lib/python3.9/site-packages/xarray/conventions.py:516: SerializationWarning: variable 'qBucket' has multiple fill values {-999900000, 0}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/users/4361c28f1215abfc8e92ba79945ca2259eea5a4db76840e1e31529c3bad1700a-20220519-164930-465860-30-pangeo/lib/python3.9/site-packages/xarray/conventions.py:516: SerializationWarning: variable 'qSfcLatRunoff' has multiple fill values {-999900000, 0}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/users/4361c28f1215abfc8e92ba79945ca2259eea5a4db76840e1e31529c3bad1700a-20220519-164930-465860-30-pangeo/lib/python3.9/site-packages/xarray/conventions.py:516: SerializationWarning: variable 'q_lateral' has multiple fill values {0, -99990}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/users/4361c28f1215abfc8e92ba79945ca2259eea5a4db76840e1e31529c3bad1700a-20220519-164930-465860-30-pangeo/lib/python3.9/site-packages/xarray/conventions.py:516: SerializationWarning: variable 'streamflow' has multiple fill values {0, -999900}, decoding all values to NaN. new_vars[k] = decode_cf_variable( /home/conda/users/4361c28f1215abfc8e92ba79945ca2259eea5a4db76840e1e31529c3bad1700a-20220519-164930-465860-30-pangeo/lib/python3.9/site-packages/xarray/conventions.py:516: SerializationWarning: variable 'velocity' has multiple fill values {0, -999900}, decoding all values to NaN. new_vars[k] = decode_cf_variable(
print(ds)
<xarray.Dataset> Dimensions: (time: 90, feature_id: 2776738) Coordinates: * feature_id (feature_id) float64 101.0 179.0 181.0 ... 1.18e+09 1.18e+09 latitude (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray> longitude (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray> * time (time) datetime64[ns] 1979-02-01T01:00:00 ... 1979-02-04T... Data variables: crs object ... elevation (time, feature_id) float32 dask.array<chunksize=(1, 2776738), meta=np.ndarray> order (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray> qBtmVertRunoff (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray> qBucket (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray> qSfcLatRunoff (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray> q_lateral (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray> streamflow (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray> velocity (time, feature_id) float64 dask.array<chunksize=(1, 2776738), meta=np.ndarray> Attributes: (12/18) Conventions: CF-1.6 TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2 cdm_datatype: Station code_version: v5.2.0-beta2 dev: dev_ prefix indicates development/internal me... dev_NOAH_TIMESTEP: 3600 ... ... model_output_type: channel_rt model_output_valid_time: 1979-02-01_01:00:00 model_total_valid_times: 1416 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
Load all the time steps at a specific streamgage (a specific feature_id
)
%%time
ds.streamflow[:,1000].hvplot(x='time', grid=True)
CPU times: user 693 ms, sys: 62.6 ms, total: 756 ms Wall time: 21 s