Create ReferenceFileSystem JSON file for a collection of NWM NetCDF files on S3
import os
import fsspec
import ujson # fast json
from fsspec_reference_maker.hdf import SingleHdf5ToZarr
from fsspec_reference_maker.combine import MultiZarrToZarr
import xarray as xr
import dask
import hvplot.xarray
import fsspec_reference_maker
fsspec_reference_maker.__version__
'0.0.1+6.gc3757ec'
fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
best_hour='f001'
var = 'channel_rt'
globbing all the files takes a long time (> 5 minutes), so instead, just read the dates and generate 24 files for each date, which of course assumes no missing files
#%%time
#flist = fs.glob(f'noaa-nwm-pds/nwm.*/short_range/nwm.*.short_range.{var}.{best_hour}.conus.nc')
days = fs.glob(f'noaa-nwm-pds/nwm.*')
print(days[0])
print(days[-1])
noaa-nwm-pds/nwm.20210803 noaa-nwm-pds/nwm.20210901
flist=[]
for day in days[2:-2]:
for i in range(24):
flist.append(f'{day}/short_range/nwm.t{i:02d}z.short_range.{var}.{best_hour}.conus.nc')
flist.extend(fs.glob(f'{days[-1]}/short_range/nwm.*.short_range.{var}.{best_hour}.conus.nc'))
fs.size(flist[0])/1e6
ds = xr.open_dataset(fs.open(flist[0]))
ds.streamflow.encoding
ds.nbytes/1e6
print(flist[0])
print(flist[-1])
last_dir = f'{os.path.dirname(flist[-1])}'
last_dir
last_file = os.path.basename(flist[-1]).split('.')
last_file
last_files = fs.glob(f'{last_dir}/{last_file[0]}.{last_file[1]}.{last_file[2]}.{var}.*.conus.nc')
last_files
Skip the first of the last_files since it's a duplicate:
flist.extend(last_files[1:])
print(flist[0])
print(flist[-1])
We need to include the "s3://" prefix to the list of files so that fsspec will recognize that these JSON files are on S3. There is no "storage_
urls = ["s3://" + f for f in flist]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
print(urls[0])
print(urls[-1])
fs.size(urls[10])
import os
import sys
sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
import ebdpy as ebd
ebd.set_credentials(profile='esip-qhub')
profile = 'esip-qhub'
region = 'us-west-2'
endpoint = f's3.{region}.amazonaws.com'
ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
worker_max = 30
client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max,
region=region, use_existing_cluster=True,
adaptive_scaling=False, wait_for_cluster=False,
environment='pangeo', worker_profile='Pangeo Worker',
propagate_env=True)
We passed AWS credentials to the Dask workers via environment variables above, and the dask workers don't have the AWS credentials file with profiles defined, so we don't define a profile here, we just set anon=False
and let the workers find the credentials via the environment variables:
fs2 = fsspec.filesystem('s3', anon=False)
If the directory exists, remove it (and all the files)
json_dir = 's3://esip-qhub/usgs/nwm_forecast/jsons/'
try:
fs2.rm(json_dir, recursive=True)
except:
pass
def gen_json(u):
with fs.open(u, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
p = u.split('/')
date = p[3]
fname = p[5]
outf = f'{json_dir}{date}.{fname}.json'
print(outf)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
%%time
dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);
flist2 = fs2.ls(json_dir)
furls = sorted(['s3://'+f for f in flist2])
furls[0]
's3://esip-qhub/usgs/nwm_forecast/jsons/nwm.20210701.nwm.t00z.short_range.channel_rt.f001.conus.nc.json'
len(furls)
679
from dask.distributed import Client
#client.close()
client = Client(n_workers=1)
client
mzz = MultiZarrToZarr(furls,
storage_options={'anon':False},
remote_protocol='s3',
remote_options={'anon' : 'True'}, #JSON files
xarray_open_kwargs={
'decode_cf' : False,
'mask_and_scale' : False,
'decode_times' : False,
'use_cftime' : False,
'drop_variables': ['reference_time', 'crs'],
'decode_coords' : False
},
xarray_concat_args={
# "data_vars": "minimal",
# "coords": "minimal",
# "compat": "override",
"join": "override",
"combine_attrs": "override",
"dim": "time"
}
)
%%time
#%%prun -D multizarr_profile
mzz.translate('nwm.json')
CPU times: user 3.5 s, sys: 233 ms, total: 3.73 s Wall time: 23.4 s
rpath = 's3://esip-qhub-public/noaa/nwm/nwm_forecast.json'
fs2.put_file(lpath='nwm.json', rpath=rpath)
s_opts = {'requester_pays':True, 'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts,
remote_protocol='s3', remote_options=r_opts)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr")
ds
%%time
ds.streamflow[:,1000].hvplot(x='time', grid=True)
cluster.shutdown(); client.close()