#!/usr/bin/env python # coding: utf-8 # # ReferenceMaker/ReferenceFileSystem GRIB2/HRRR Example # # Requires development version of `fsspec_reference_maker` # * `pip install --user git+https://github.com/intake/fsspec-reference-maker` # In[1]: import json import fsspec from fsspec_reference_maker.grib2 import scan_grib from fsspec_reference_maker.combine import MultiZarrToZarr import os import re import hvplot.xarray import datetime as dt import dask import ujson # In[2]: fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True) # In[3]: today = dt.datetime.utcnow().strftime('%Y%m%d') # In[4]: files = fs.glob(f's3://noaa-hrrr-bdp-pds/hrrr.{today}/conus/*wrfsfcf01.grib2') files # In[5]: latest = files[-1].split('/')[3].split('.')[1] print(latest) # In[6]: latest_files = fs.glob(f's3://noaa-hrrr-bdp-pds/hrrr.{today}/conus/hrrr.{latest}.wrfsfc*.grib2') files.extend(latest_files[2:]) # In[7]: files # #### Create Dask gateway cluster with credentials to write to specified S3 bucket # This is for the ESIP qhub: you will need to modify to work elsewhere. # In[8]: import os import sys sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib')) import ebdpy as ebd ebd.set_credentials(profile='esip-qhub') profile = 'esip-qhub' region = 'us-west-2' endpoint = f's3.{region}.amazonaws.com' ebd.set_credentials(profile=profile, region=region, endpoint=endpoint) worker_max = 30 client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max, region=region, use_existing_cluster=True, adaptive_scaling=False, wait_for_cluster=False, environment='pangeo', worker_profile='Pangeo Worker', propagate_env=True) # ## Create reference jsons # In[9]: afilter={'typeOfLevel': 'heightAboveGround', 'level': 2} so = {"anon": True, "default_cache_type": "readahead"} common = ['time', 'step', 'latitude', 'longitude', 'valid_time'] # In[10]: def gen_json(u): date = u.split('/')[3].split('.')[1] name = u.split('/')[5].split('.')[1:3] outfname = f'{json_dir}{date}.{name[0]}.{name[1]}.json' out = scan_grib(u, common, so, inline_threashold=100, filter=afilter) with fs2.open(outfname, "w") as f: f.write(ujson.dumps(out)) # In[11]: json_dir = 's3://esip-qhub/noaa/hrrr/jsons/' fs2 = fsspec.filesystem('s3', anon=False) # In[12]: try: fs2.rm(json_dir, recursive=True) except: pass # In[13]: urls = [f's3://{file}' for file in files] #so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first') # In[14]: get_ipython().run_cell_magic('time', '', '_ = dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);\n') # ## Use `MultiZarrToZarr()` to combine into single reference # In[15]: flist2 = fs2.ls(json_dir) furls = sorted(['s3://'+f for f in flist2]) print(furls[0]) print(furls[-1]) # In[16]: # mzz = MultiZarrToZarr(furls, # storage_options={'anon':False}, # remote_protocol='s3', # remote_options={'anon' : 'True'}, #JSON files # xarray_open_kwargs={ # 'decode_cf' : False, # 'mask_and_scale' : False, # 'decode_times' : False, # 'use_cftime' : False, # 'drop_variables': ['reference_time', 'crs'], # 'decode_coords' : False # }, # xarray_concat_args={ # # "data_vars": "minimal", # # "coords": "minimal", # # "compat": "override", # "join": "override", # "combine_attrs": "override", # "dim": "time" # } # ) mzz = MultiZarrToZarr(furls, storage_options={'anon':False}, remote_protocol='s3', remote_options={'anon': True}, xarray_concat_args={'dim': 'time'}) # In[17]: get_ipython().run_cell_magic('time', '', "#%%prun -D multizarr_profile \nmzz.translate('hrrr_best.json')\n") # In[18]: rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json' fs2.put_file(lpath='hrrr_best.json', rpath=rpath) # ## Access data and plot # In[19]: import xarray as xr from fsspec_reference_maker.grib2 import GRIBCodec import cartopy.crs as ccrs import cartopy.feature as cfeature import matplotlib.pyplot as plt import pandas as pd # In[20]: s_opts = {'requester_pays':True, 'skip_instance_cache':True} r_opts = {'anon':True} fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts, remote_protocol='s3', remote_options=r_opts) m = fs.get_mapper("") ds = xr.open_dataset(m, engine="zarr") # In[21]: ds # In[23]: ds.u10.hvplot.quadmesh(x='longitude', y='latitude', rasterize=True, geo=True, tiles='OSM', cmap='turbo')