import os
import fsspec
from datetime import datetime, timedelta
import ujson
import sys
sys.path.insert(1, '/home/peterm790/shared/users/petermarsh/kerchunk')
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import kerchunk
import xarray as xr
import dask
import numpy as np
from pathlib import Path
hourlyrun='00'
fs = fsspec.filesystem('s3', anon='True', skip_instance_cache=True)
latest = sorted(fs.glob('s3://noaa-nwm-pds/nwm*'))[-1]
ens_files={}
for i in range(1,8):
ens_files[i] = sorted(fs.glob(f's3://{latest}/medium_range_mem{i}/nwm.t{hourlyrun}z.medium_range.channel_rt_{i}.f0*.nc'))[:48]
json_dir = '/home/peterm790/shared/users/petermarsh/ensemble_NWM/jsons/'
fs2 = fsspec.filesystem("")
def gen_json(u, json_dir):
fstem = Path(u).stem
outf = f'{json_dir}{fstem}.json'
with fs.open(u, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
%%time
for ens in ens_files:
for file in ens_files[ens]:
gen_json(file, json_dir)
CPU times: user 1min 1s, sys: 11.9 s, total: 1min 13s Wall time: 9min 51s
ens_jsons = {}
for ens in ens_files:
flist = fs2.glob(f'{json_dir}nwm.t00z.medium_range.channel_rt_{i}*')
ens_jsons[ens] = flist#[f's3:/{f}' for f in flist]
for ens in ens_jsons:
mzz = MultiZarrToZarr(ens_jsons[ens],
remote_protocol='s3',
remote_options={'anon':True}, #JSON files
concat_dims = ['time'],
identical_dims = ['feature_id', 'reference_time'],
)
d = mzz.translate()
with fs2.open(f'/home/peterm790/shared/users/petermarsh/ensemble_NWM/{ens}.json', 'wb') as f:
f.write(ujson.dumps(d).encode());
flist = fs2.glob('/home/peterm790/shared/users/petermarsh/ensemble_NWM/*.json')
import re
ex = re.compile(r'.*?(\d+).json')
flist[2]
'/home/peterm790/shared/users/petermarsh/ensemble_NWM/3.json'
ex.match(flist[2]).groups()[0]
'3'
def postprocess(refs):
with open(flist[0]) as json_file:
esri_pe_string = eval(ujson.load(json_file)['refs']['crs/.zattrs'])['esri_pe_string']
crs_attrs = eval(refs['crs/.zattrs'])
crs_attrs["esri_pe_string"] = esri_pe_string
refs['crs/.zattrs'] = ujson.dumps(crs_attrs)
return refs
mzz = MultiZarrToZarr(flist,
remote_protocol='s3',
remote_options={'anon':True},
coo_map={'ensemble' : ex, 'crs':''},
concat_dims = ['ensemble'],
identical_dims = ['feature_id', 'reference_time', 'time'],
preprocess = kerchunk.combine.drop('crs'),
postprocess = postprocess)
out = mzz.translate()
date = latest.split('.')[-1]
with fs2.open(f'/home/peterm790/shared/users/petermarsh/ensemble_NWM/complete/{date}.json', 'wb') as f:
f.write(ujson.dumps(out).encode());
ds = xr.open_dataset(
"reference://",
backend_kwargs={"storage_options": {"fo": out, "remote_protocol": "s3"},
"consolidated": False},
engine="zarr",
chunks={}
)
ds
<xarray.Dataset> Dimensions: (crs: 0, ensemble: 7, feature_id: 2776738, time: 48, reference_time: 1) Coordinates: * crs (crs) float64 * ensemble (ensemble) object '1' '2' '3' '4' '5' '6' '7' * feature_id (feature_id) float64 101.0 179.0 181.0 ... 1.18e+09 1.18e+09 * reference_time (reference_time) datetime64[ns] 2022-06-28 * time (time) datetime64[ns] 2022-06-28T01:00:00 ... 2022-06-30 Data variables: nudge (ensemble, time, feature_id) float64 dask.array<chunksize=(1, 1, 925580), meta=np.ndarray> qBtmVertRunoff (ensemble, time, feature_id) float64 dask.array<chunksize=(1, 1, 925580), meta=np.ndarray> qBucket (ensemble, time, feature_id) float64 dask.array<chunksize=(1, 1, 925580), meta=np.ndarray> qSfcLatRunoff (ensemble, time, feature_id) float64 dask.array<chunksize=(1, 1, 925580), meta=np.ndarray> streamflow (ensemble, time, feature_id) float64 dask.array<chunksize=(1, 1, 925580), meta=np.ndarray> velocity (ensemble, time, feature_id) float64 dask.array<chunksize=(1, 1, 925580), meta=np.ndarray> Attributes: (12/20) Conventions: CF-1.6 NWM_version_number: v2.1 TITLE: OUTPUT FROM NWM v2.1 cdm_datatype: Station code_version: v5.2.0-beta2 dev: dev_ prefix indicates development/internal me... ... ... model_output_type: channel_rt model_output_valid_time: 2022-06-28_01:00:00 model_total_valid_times: 204 proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ... station_dimension: feature_id stream_order_output: 1
ds.crs.attrs
{'esri_pe_string': 'GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]];-400 -400 1000000000;-100000 10000;-100000 10000;8.98315284119521E-09;0.001;0.001;IsHighPrecision'}
with open('/shared/users/petermarsh/ensemble_NWM/complete/20220627.json') as json_file:
data_old = ujson.load(json_file)
with open('/shared/users/petermarsh/ensemble_NWM/complete/20220628.json') as json_file:
data_new = ujson.load(json_file)
data_new['refs']['qBtmVertRunoff/0.1.0']
['noaa-nwm-pds/nwm.20220628/medium_range_mem7/nwm.t00z.medium_range.channel_rt_7.f002.conus.nc', 5407356, 1633493]
data_old['refs']['qBtmVertRunoff/0.1.0']
['noaa-nwm-pds/nwm.20220627/medium_range_mem7/nwm.t00z.medium_range.channel_rt_7.f002.conus.nc', 5451216, 1638332]