#!/usr/bin/env python # coding: utf-8 # In[1]: import os import fsspec from datetime import datetime, timedelta import ujson from kerchunk.hdf import SingleHdf5ToZarr from kerchunk.combine import MultiZarrToZarr import xarray as xr import dask import numpy as np from pathlib import Path # ### Model run # In[2]: hourlyrun='00' # In[3]: fs = fsspec.filesystem('s3', anon='True', skip_instance_cache=True) # ### Latest data available # In[4]: latest = sorted(fs.glob('s3://noaa-nwm-pds/nwm*'))[-1] # ### This creates a dictionary containing a list of the first 9 hours of data for each ensemble # In[5]: ens_files={} for i in range(1,8): ens_files[i] = sorted(fs.glob(f's3://{latest}/medium_range_mem{i}/nwm.t{hourlyrun}z.medium_range.channel_rt_{i}.f00*.nc')) # In[6]: json_dir = '/home/peterm790/shared/users/petermarsh/ensemble_NWM/jsons/' # ### Create json reference files from these files # In[7]: def gen_json(u, json_dir): fstem = Path(u).stem outf = f'{json_dir}{fstem}.json' with fs.open(u, **so) as infile: h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300) with fs2.open(outf, 'wb') as f: f.write(ujson.dumps(h5chunks.translate()).encode()); # In[3]: fs2 = fsspec.filesystem("") # In[4]: so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first') # In[10]: get_ipython().run_cell_magic('time', '', 'for ens in ens_files:\n for file in ens_files[ens]:\n gen_json(file, json_dir)\n') # In[11]: ens_jsons = {} for ens in ens_files: flist = fs2.glob(f'{json_dir}nwm.t00z.medium_range.channel_rt_{i}*') ens_jsons[ens] = flist#[f's3:/{f}' for f in flist] # ### Create a single kerchunk json for each ensemble member # Post process appends the ensemble name to each variable # In[14]: for ens in ens_jsons: def post_process(d): keys = list(d.keys()) for key in keys: var = key.split('/')[0] try: end = '/'.join(key.split('/')[1:]) except: end = None if var not in ['.zgroup', '.zattrs', 'time', 'feature_id']: d[f'{var}_{ens}/{end}'] = d.pop(key) else: pass return d mzz = MultiZarrToZarr(ens_jsons[ens], remote_protocol='s3', remote_options={'anon':True}, #JSON files concat_dims = ['time'], identical_dims = ['feature_id', 'reference_time'], postprocess = post_process ) d = mzz.translate() with fs2.open(f'/home/peterm790/shared/users/petermarsh/ensemble_NWM/{ens}.json', 'wb') as f: f.write(ujson.dumps(d).encode()); # ### Create a single virtual dataset containing all 7 ensemble members # In[5]: flist = fs2.glob('/home/peterm790/shared/users/petermarsh/ensemble_NWM/*.json') # In[6]: mzz = MultiZarrToZarr(flist, remote_protocol='s3', remote_options={'anon':True}, concat_dims = ['time'], identical_dims = ['feature_id', 'reference_time'] ) d = mzz.translate() # In[7]: get_ipython().run_cell_magic('time', '', 'fs = fsspec.filesystem("reference", fo=d, ref_storage_args={\'skip_instance_cache\':True},\n remote_protocol=\'s3\', remote_options={\'anon\':False})\nm = fs.get_mapper("")\nds_kerchunk = xr.open_dataset(m, engine="zarr", backend_kwargs={\'consolidated\':False}, chunks={})\n') # In[8]: ds_kerchunk # In[ ]: