#!/usr/bin/env python # coding: utf-8 # # NWM ReferenceFileSystem JSON # Create ReferenceFileSystem JSON file for a collection of NWM NetCDF files on S3 # In[1]: import os import fsspec import ujson # fast json from fsspec_reference_maker.hdf import SingleHdf5ToZarr from fsspec_reference_maker.combine import MultiZarrToZarr import xarray as xr import dask import hvplot.xarray # In[2]: fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True) # In[3]: best_hour='f001' var = 'channel_rt' # #### Cheat on file list # globbing all the files takes a long time (> 5 minutes), so instead, just read the dates and generate 24 files for each date, which of course assumes no missing files # In[4]: #%%time #flist = fs.glob(f'noaa-nwm-pds/nwm.*/short_range/nwm.*.short_range.{var}.{best_hour}.conus.nc') # In[5]: days = fs.glob(f'noaa-nwm-pds/nwm.*') # In[6]: print(days[0]) print(days[-1]) # In[7]: flist=[] for day in days[2:-2]: for i in range(24): flist.append(f'{day}/short_range/nwm.t{i:02d}z.short_range.{var}.{best_hour}.conus.nc') # In[8]: flist.extend(fs.glob(f'{days[-1]}/short_range/nwm.*.short_range.{var}.{best_hour}.conus.nc')) # In[9]: fs.size(flist[0])/1e6 # In[10]: ds = xr.open_dataset(fs.open(flist[0])) # In[11]: ds.streamflow.encoding # In[12]: ds.nbytes/1e6 # In[13]: print(flist[0]) print(flist[-1]) # #### Join the "best time series" from past forecasts with the latest forecast # * Remove the first day of data since this is a rolling collection and we don't want to be trying to access files that soon will be removed # * Use all the files from the last forecast cycle # In[14]: last_dir = f'{os.path.dirname(flist[-1])}' last_dir # In[15]: last_file = os.path.basename(flist[-1]).split('.') last_file # In[16]: last_files = fs.glob(f'{last_dir}/{last_file[0]}.{last_file[1]}.{last_file[2]}.{var}.*.conus.nc') last_files # Skip the first of the last_files since it's a duplicate: # In[17]: flist.extend(last_files[1:]) # In[18]: print(flist[0]) print(flist[-1]) # We need to include the "s3://" prefix to the list of files so that fsspec will recognize that these JSON files are on S3. There is no "storage_ # In[19]: urls = ["s3://" + f for f in flist] so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first') # In[20]: print(urls[0]) print(urls[-1]) # In[21]: fs.size(urls[10]) # In[22]: import os import sys sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib')) import ebdpy as ebd ebd.set_credentials(profile='esip-qhub') profile = 'esip-qhub' region = 'us-west-2' endpoint = f's3.{region}.amazonaws.com' ebd.set_credentials(profile=profile, region=region, endpoint=endpoint) worker_max = 30 client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max, region=region, use_existing_cluster=True, adaptive_scaling=False, wait_for_cluster=False, environment='pangeo', worker_profile='Pangeo Worker', propagate_env=True) # #### Create the individual JSON files directly on S3 # We passed AWS credentials to the Dask workers via environment variables above, and the dask workers don't have the AWS credentials file with profiles defined, so we don't define a profile here, we just set `anon=False` and let the workers find the credentials via the environment variables: # In[23]: fs2 = fsspec.filesystem('s3', anon=False) # If the directory exists, remove it (and all the files) # In[24]: json_dir = 's3://esip-qhub/usgs/nwm_forecast/jsons/' # In[25]: try: fs2.rm(json_dir, recursive=True) except: pass # In[26]: def gen_json(u): with fs.open(u, **so) as infile: h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300) p = u.split('/') date = p[3] fname = p[5] outf = f'{json_dir}{date}.{fname}.json' print(outf) with fs2.open(outf, 'wb') as f: f.write(ujson.dumps(h5chunks.translate()).encode()); # #### Send the list of delayed tasks to the Dask worksers to compute # In[27]: get_ipython().run_cell_magic('time', '', 'dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);\n') # In[28]: flist2 = fs2.ls(json_dir) furls = sorted(['s3://'+f for f in flist2]) furls[0] # In[29]: len(furls) # In[30]: fs.size(flist[0]) # In[31]: mzz = MultiZarrToZarr(furls[-240:], storage_options={'anon':False}, remote_protocol='s3', remote_options={'anon' : 'True'}, #JSON files xarray_open_kwargs={ 'decode_cf' : False, 'mask_and_scale' : False, 'decode_times' : False, 'use_cftime' : False, 'drop_variables': ['reference_time', 'crs'], 'decode_coords' : False }, xarray_concat_args={ # "data_vars": "minimal", # "coords": "minimal", # "compat": "override", "join": "override", "combine_attrs": "override", "dim": "time" } ) # In[32]: get_ipython().run_cell_magic('time', '', "#%%prun -D multizarr_profile \nmzz.translate('nwm.json')\n") # #### Copy the local consolidated JSON file to S3 # In[33]: rpath = 's3://esip-qhub/usgs/forecast/nwm.json' fs2.put_file(lpath='nwm.json', rpath=rpath) # #### Try opening the consolidated JSON file from S3 # In[34]: s_opts = {'requester_pays':True, 'skip_instance_cache':True} r_opts = {'anon':True} fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts, remote_protocol='s3', remote_options=r_opts) m = fs.get_mapper("") ds = xr.open_dataset(m, engine="zarr") # In[35]: ds # In[36]: get_ipython().run_cell_magic('time', '', "ds.streamflow[:,1000].hvplot(x='time', grid=True)\n") # In[37]: cluster.shutdown(); client.close() # In[ ]: