#!/usr/bin/env python # coding: utf-8 # In[91]: import os import fsspec from datetime import datetime, timedelta import ujson import sys sys.path.insert(1, '/home/peterm790/shared/users/petermarsh/kerchunk') from kerchunk.hdf import SingleHdf5ToZarr from kerchunk.combine import MultiZarrToZarr import kerchunk import xarray as xr import dask import numpy as np from pathlib import Path # ### Model run # In[92]: hourlyrun='00' # In[93]: fs = fsspec.filesystem('s3', anon='True', skip_instance_cache=True) # ### Latest data available # In[94]: latest = sorted(fs.glob('s3://noaa-nwm-pds/nwm*'))[-1] # ### This creates a dictionary containing a list of the first 48 hours of data for each ensemble # In[95]: ens_files={} for i in range(1,8): ens_files[i] = sorted(fs.glob(f's3://{latest}/medium_range_mem{i}/nwm.t{hourlyrun}z.medium_range.channel_rt_{i}.f0*.nc'))[:48] # In[96]: json_dir = '/home/peterm790/shared/users/petermarsh/ensemble_NWM/jsons/' # ### Create json reference files from these files # In[97]: fs2 = fsspec.filesystem("") # In[98]: def gen_json(u, json_dir): fstem = Path(u).stem outf = f'{json_dir}{fstem}.json' with fs.open(u, **so) as infile: h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300) with fs2.open(outf, 'wb') as f: f.write(ujson.dumps(h5chunks.translate()).encode()); # In[99]: so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first') # ### Passing this to a cluster can greatly speed this up # In[11]: get_ipython().run_cell_magic('time', '', 'for ens in ens_files:\n for file in ens_files[ens]:\n gen_json(file, json_dir)\n') # ### Create a single kerchunk json for each ensemble member # In[12]: ens_jsons = {} for ens in ens_files: flist = fs2.glob(f'{json_dir}nwm.t00z.medium_range.channel_rt_{i}*') ens_jsons[ens] = flist#[f's3:/{f}' for f in flist] # In[13]: for ens in ens_jsons: mzz = MultiZarrToZarr(ens_jsons[ens], remote_protocol='s3', remote_options={'anon':True}, #JSON files concat_dims = ['time'], identical_dims = ['feature_id', 'reference_time'], ) d = mzz.translate() with fs2.open(f'/home/peterm790/shared/users/petermarsh/ensemble_NWM/{ens}.json', 'wb') as f: f.write(ujson.dumps(d).encode()); # ### Create a single virtual dataset containing all 7 ensemble members # In[100]: flist = fs2.glob('/home/peterm790/shared/users/petermarsh/ensemble_NWM/*.json') # In[101]: import re ex = re.compile(r'.*?(\d+).json') # In[102]: flist[2] # In[103]: ex.match(flist[2]).groups()[0] # In[205]: def postprocess(refs): with open(flist[0]) as json_file: esri_pe_string = eval(ujson.load(json_file)['refs']['crs/.zattrs'])['esri_pe_string'] crs_attrs = eval(refs['crs/.zattrs']) crs_attrs["esri_pe_string"] = esri_pe_string refs['crs/.zattrs'] = ujson.dumps(crs_attrs) return refs # In[206]: mzz = MultiZarrToZarr(flist, remote_protocol='s3', remote_options={'anon':True}, coo_map={'ensemble' : ex, 'crs':''}, concat_dims = ['ensemble'], identical_dims = ['feature_id', 'reference_time', 'time'], preprocess = kerchunk.combine.drop('crs'), postprocess = postprocess) out = mzz.translate() # In[207]: date = latest.split('.')[-1] # In[208]: with fs2.open(f'/home/peterm790/shared/users/petermarsh/ensemble_NWM/complete/{date}.json', 'wb') as f: f.write(ujson.dumps(out).encode()); # In[209]: ds = xr.open_dataset( "reference://", backend_kwargs={"storage_options": {"fo": out, "remote_protocol": "s3"}, "consolidated": False}, engine="zarr", chunks={} ) # In[212]: ds # In[211]: ds.crs.attrs # ### Byte ranges between variables on different days differ subtely, so automatically updating the sidecar files without accessing the new data is not possible # In[65]: with open('/shared/users/petermarsh/ensemble_NWM/complete/20220627.json') as json_file: data_old = ujson.load(json_file) # In[117]: with open('/shared/users/petermarsh/ensemble_NWM/complete/20220628.json') as json_file: data_new = ujson.load(json_file) # In[38]: data_new['refs']['qBtmVertRunoff/0.1.0'] # In[39]: data_old['refs']['qBtmVertRunoff/0.1.0']