#!/usr/bin/env python # coding: utf-8 # # HRRR Forecast Collection Best Time Series # Read a collection of GRIB2 files on AWS as a single dataset using the Zarr library, via fsspec's ReferenceFileSystem. This notebook also demonstrates both how to generate the JSON file that fsspec uses, speeding up extracting the metadata from each GRIB2 file using [Reference File Maker](https://github.com/intake/fsspec-reference-maker) with a Dask Cluster. # Requires development version of `fsspec_reference_maker` # * `pip install --user git+https://github.com/intake/fsspec-reference-maker` # In[1]: import xarray as xr import hvplot.xarray import datetime as dt import pandas as pd import dask import panel as pn import json import fsspec from fsspec_reference_maker.grib2 import scan_grib from fsspec_reference_maker.combine import MultiZarrToZarr # #### Create a best time series dataset # There is a new HRRR forecast every hour, so use forecast hour 1 from # past forecasts and then append the latest forecast # In[2]: fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True) # In[3]: today = dt.datetime.utcnow().strftime('%Y%m%d') # In[4]: last_file = fs.glob(f's3://noaa-hrrr-bdp-pds/hrrr.{today}/conus/*wrfsfcf01.grib2')[-1] # In[5]: end_time = dt.datetime.strptime(last_file, 'noaa-hrrr-bdp-pds/hrrr.%Y%m%d/conus/hrrr.t%Hz.wrfsfcf01.grib2') # In[6]: start_time = end_time - dt.timedelta(days=7) # In[7]: last_file # In[8]: glob_pattern_for_latest_forecast = end_time.strftime('s3://noaa-hrrr-bdp-pds/hrrr.%Y%m%d/conus/hrrr.t%Hz.wrfsfcf*.grib2') print(glob_pattern_for_latest_forecast) # In[9]: dates = pd.date_range(start=start_time, end=end_time, freq='1h') # In[10]: files = [date.strftime('s3://noaa-hrrr-bdp-pds/hrrr.%Y%m%d/conus/hrrr.t%Hz.wrfsfcf01.grib2') for date in dates] latest_files = fs.glob(glob_pattern_for_latest_forecast) latest_files = [f's3://{file}' for file in latest_files] print(latest_files[-1]) # In[11]: files.extend(latest_files[2:]) print(files[0]) print(files[-1]) # #### Create Dask gateway cluster with credentials to write to specified S3 bucket # This is for the ESIP qhub: you will need to modify to work elsewhere. # In[12]: import os import sys sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib')) import ebdpy as ebd ebd.set_credentials(profile='esip-qhub') profile = 'esip-qhub' region = 'us-west-2' endpoint = f's3.{region}.amazonaws.com' ebd.set_credentials(profile=profile, region=region, endpoint=endpoint) worker_max = 30 client,cluster = ebd.start_dask_cluster(profile=profile,worker_max=worker_max, region=region, use_existing_cluster=True, adaptive_scaling=False, wait_for_cluster=False, environment='pangeo', worker_profile='Pangeo Worker', propagate_env=True) # #### Create individual ReferenceFileSystem jsons # The `afilter` below controls which grib variables we want [cfgrib](https://github.com/ecmwf/cfgrib#filter-heterogeneous-grib-files) to capture # In[13]: afilter={'typeOfLevel': 'heightAboveGround', 'level': 2} # In[14]: so = {"anon": True, "default_cache_type": "readahead"} common = ['time', 'step', 'latitude', 'longitude', 'valid_time'] # In[15]: def gen_json(u): date = u.split('/')[3].split('.')[1] name = u.split('/')[5].split('.')[1:3] outfname = f'{json_dir}{date}.{name[0]}.{name[1]}.json' out = scan_grib(u, common, so, inline_threashold=100, filter=afilter) with fs2.open(outfname, "w") as f: f.write(json.dumps(out)) # #### Bucket to store individual JSONs (need permission, so anon=False) # In[16]: json_dir = 's3://esip-qhub/noaa/hrrr/jsons/' fs2 = fsspec.filesystem('s3', anon=False, skip_instance_cache=True) # In[17]: try: fs2.rm(json_dir, recursive=True) except: pass # #### Compute the individual JSONs in parallel by computing a list of Dask delayed objects # In[18]: get_ipython().run_cell_magic('time', '', '_ = dask.compute(*[dask.delayed(gen_json)(u) for u in files], retries=10);\n') # #### Use `MultiZarrToZarr()` to combine into single reference # In[19]: flist2 = fs2.ls(json_dir) furls = sorted(['s3://'+f for f in flist2]) print(furls[0]) print(furls[-1]) # In[20]: # mzz = MultiZarrToZarr(furls, # storage_options={'anon':False}, # remote_protocol='s3', # remote_options={'anon' : 'True'}, #JSON files # xarray_open_kwargs={ # 'decode_cf' : False, # 'mask_and_scale' : False, # 'decode_times' : False, # 'use_cftime' : False, # 'drop_variables': ['reference_time', 'crs'], # 'decode_coords' : False # }, # xarray_concat_args={ # # "data_vars": "minimal", # # "coords": "minimal", # # "compat": "override", # "join": "override", # "combine_attrs": "override", # "dim": "time" # } # ) mzz = MultiZarrToZarr(furls, storage_options={'anon':False}, remote_protocol='s3', remote_options={'anon': True}, xarray_concat_args={'dim': 'valid_time'}) # In[21]: get_ipython().run_cell_magic('time', '', "#%%prun -D multizarr_profile \nmzz.translate('hrrr_best.json')\n") # In[22]: rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json' fs2.put_file(lpath='hrrr_best.json', rpath=rpath) # ## Access data and plot # In[23]: rpath = 's3://esip-qhub-public/noaa/hrrr/hrrr_best.json' s_opts = {'requester_pays':True, 'skip_instance_cache':True} r_opts = {'anon':True} fs = fsspec.filesystem("reference", fo=rpath, ref_storage_args=s_opts, remote_protocol='s3', remote_options=r_opts) m = fs.get_mapper("") ds2 = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) # In[24]: ds2.data_vars # In[25]: var='t2m' # In[26]: ds2[var].nbytes/1e9 # In[27]: ds2[var] # Hvplot wants lon [-180,180], not [0,360]: # In[28]: ds2 = ds2.assign_coords(longitude=(((ds2.longitude + 180) % 360) - 180)) # hvplot has a slider for time steps, but we want a dropdown list, so we use Panel # In[29]: extra_dims = list(ds2[var].dims[:-2]) mesh = ds2[var].hvplot.quadmesh(x='longitude', y='latitude', rasterize=True, geo=True, tiles='OSM', cmap='turbo') pn.panel(mesh, widgets={k: pn.widgets.Select for k in extra_dims}) # #### Extract a time series at a point # We are reading GRIB2 files, which compress the entire spatial domain as a single chunk. Therefore reading all the time values at a single point actually needs to load and uncompress *all* the data for that variable. With 30 cores, accessing a weeks worth of data take just under two minutes. # In[30]: get_ipython().run_cell_magic('time', '', "ds2[var][:,500,500].hvplot(x='valid_time', grid=True)\n") # In[31]: client.close(); cluster.shutdown()