#!/usr/bin/env python # coding: utf-8 # # GRIB2 NWP Archive aggretation using .idx files # # Work with NODD GFS data on GCP (theoretically works with AWS as well) # # 1) Demonstrate building a DataTree, Index File mapping and Static Metdata data file. Use the mapping to rapidly build a on month dataset. # 2) Demonstrate using zarr chunk_getitems hack to parallelize getting a timeseries # # ## Import some stuff... # In[1]: get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') import logging import importlib importlib.reload(logging) logging.basicConfig( format="%(asctime)s.%(msecs)03dZ %(processName)s %(threadName)s %(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S", level=logging.WARNING, ) logger = logging.getLogger("juypter") # In[2]: import datetime import copy import xarray as xr import numpy as np import pandas as pd import fsspec import kerchunk from kerchunk.grib2 import ( grib_tree, scan_grib, extract_datatree_chunk_index, strip_datavar_chunks, reinflate_grib_store, AggregationType, read_store, write_store, parse_grib_idx, build_idx_grib_mapping, map_from_index ) import gcsfs pd.set_option('display.max_columns', None) # ## Extract the zarr store metadata # # # Pick a file, any file... Must be on GCS so that coords use the same file store as the data vars # In[3]: get_ipython().run_cell_magic('time', '', '# Pick two files to build a grib_tree with the correct dimensions\ngfs_files = [\n "gs://global-forecast-system/gfs.20230928/00/atmos/gfs.t00z.pgrb2.0p25.f000",\n "gs://global-forecast-system/gfs.20230928/00/atmos/gfs.t00z.pgrb2.0p25.f001"\n]\n\n# This operation reads two of the large Grib2 files from GCS\n# scan_grib extracts the zarr kerchunk metadata for each individual grib message\n# grib_tree builds a zarr/xarray compatible hierarchical view of the dataset\ngfs_grib_tree_store = grib_tree([group for f in gfs_files for group in scan_grib(f)])\n# it is slow even in parallel because it requires a huge amount of IO\n') # In[4]: get_ipython().run_cell_magic('time', '', '# The grib_tree can be opened directly using either zarr or xarray datatree\n# But this is too slow to build big aggregations\ngfs_dt = xr.open_datatree(fsspec.filesystem("reference", fo=gfs_grib_tree_store).get_mapper(""), engine="zarr", consolidated=False)\ngfs_dt\n') # ## Separating static metadata from the chunk indexes # In[5]: # The key metadata associated with each grib message can be extracted into a table gfs_kind = extract_datatree_chunk_index(gfs_dt, gfs_grib_tree_store, grib=True) gfs_kind # In[6]: # While the static zarr metadata associated with the dataset can be seperated - created once. deflated_gfs_grib_tree_store = copy.deepcopy(gfs_grib_tree_store) strip_datavar_chunks(deflated_gfs_grib_tree_store) write_store("./", deflated_gfs_grib_tree_store) print("Original references: ", len(gfs_grib_tree_store["refs"])) print("Stripped references: ", len(deflated_gfs_grib_tree_store["refs"])) # ## Building it faster # # Okay that was fun - I promise you can recombine these pieces but it still takes the same amount of time to run scan_grib. # # The k(erchunk) index data looks a lot like the idx files that are present for every grib file in NODD's GCS archive though... # # # ``` # 1:0:d=2023092800:PRMSL:mean sea level:1 hour fcst: # 2:990253:d=2023092800:CLWMR:1 hybrid level:1 hour fcst: # 3:1079774:d=2023092800:ICMR:1 hybrid level:1 hour fcst: # 4:1332540:d=2023092800:RWMR:1 hybrid level:1 hour fcst: # 5:1558027:d=2023092800:SNMR:1 hybrid level:1 hour fcst: # 6:1638489:d=2023092800:GRLE:1 hybrid level:1 hour fcst: # 7:1673516:d=2023092800:REFD:1 hybrid level:1 hour fcst: # 8:2471710:d=2023092800:REFD:2 hybrid level:1 hour fcst: # 9:3270627:d=2023092800:REFC:entire atmosphere:1 hour fcst: # 10:4144435:d=2023092800:VIS:surface:1 hour fcst: # ``` # # But the metadata isn't quiet the same... they have mangled the attributes in the : seperated attributes. # In[7]: # We can pull this out into a dataframe, that starts to look a bit like what we got above extracted from the actual grib files # But this method runs in under a second reading a file that is less than 100k idxdf = parse_grib_idx( basename="gs://global-forecast-system/gfs.20230901/00/atmos/gfs.t00z.pgrb2.0p25.f006" ) idxdf # In[8]: # Unfortunately, some accumulation variables have duplicate attributes making them # indesinguishable from the IDX file idxdf.loc[idxdf['attrs'].duplicated(keep=False), :] # In[9]: get_ipython().run_cell_magic('time', '', '# What we need is a mapping from our grib/zarr metadata to the attributes in the idx files\n# They are unique for each time horizon e.g. you need to build a unique mapping for the 1 hour\n# forecast, the 2 hour forecast... the 48 hour forecast.\n\n# let\'s make one for the 6 hour horizon. This requires reading both the grib and the idx file,\n# mapping the data for each grib message in order\nmapping = build_idx_grib_mapping(\n basename="gs://global-forecast-system/gfs.20230928/00/atmos/gfs.t00z.pgrb2.0p25.f006",\n)\nmapping\n') # In[10]: # Now if we parse the RunTime from the idx file name `gfs.20230901/00/` # We can build a fully compatible k_index mapped_index = map_from_index( pd.Timestamp("2023-09-01T00"), mapping.loc[~mapping["attrs"].duplicated(keep="first"), :], idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :] ) mapped_index # In[11]: mapping.loc[mapping.varname == "sdswrf"] # In[12]: get_ipython().run_cell_magic('time', '', 'mapped_index_list = []\n\ndeduped_mapping = mapping.loc[~mapping["attrs"].duplicated(keep="first"), :]\nfor date in pd.date_range("2023-09-01", "2023-09-30"):\n for runtime in range(0,24,6):\n horizon=6\n fname=f"gs://global-forecast-system/gfs.{date.strftime(\'%Y%m%d\')}/{runtime:02}/atmos/gfs.t{runtime:02}z.pgrb2.0p25.f{horizon:03}"\n\n idxdf = parse_grib_idx(\n basename=fname\n )\n\n mapped_index = map_from_index(\n pd.Timestamp( date + datetime.timedelta(hours=runtime)),\n deduped_mapping,\n idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :],\n )\n mapped_index_list.append(mapped_index)\n\ngfs_kind = pd.concat(mapped_index_list)\ngfs_kind\n') # ## We just aggregated a 120 GFS grib files in 18 seconds! # # Lets build it back into a data tree! # # The reinflate_grib_store interface is pretty opaque but allows building any slice of an FMRC. A good area for future improvement, but for now, since # we have just a single 6 hour horizon slice let's build that... # In[13]: axes = [ pd.Index( [ pd.timedelta_range(start="0 hours", end="6 hours", freq="6h", closed="right", name="6 hour"), ], name="step" ), pd.date_range("2023-09-01T06:00", "2023-10T00:00", freq="360min", name="valid_time") ] axes # In[14]: # It is fast to rebuild the datatree - but lets pull out two varables to look at... # If you skipped building the deflated store, read it here. #deflated_gfs_grib_tree_store = read_store("./") gfs_store = reinflate_grib_store( axes=axes, aggregation_type=AggregationType.HORIZON, chunk_index=gfs_kind.loc[gfs_kind.varname.isin(["sdswrf", "t2m"])], zarr_ref_store=deflated_gfs_grib_tree_store ) # In[15]: gfs_dt = xr.open_datatree(fsspec.filesystem("reference", fo=gfs_store).get_mapper(""), engine="zarr", consolidated=False) gfs_dt # In[16]: get_ipython().run_cell_magic('time', '', "# Reading the data - especially extracting point time series isn't any faster once you have\n# the xarray datatree. This is just a much faster way of building the aggregations than\n# directly running scan_grib over all the data first.\ngfs_dt.sdswrf.avg.surface.sdswrf[0,0:10,300,400].compute()\n") # In[17]: get_ipython().run_cell_magic('time', '', 'gfs_dt.t2m.instant.heightAboveGround.t2m[0,0:10,300,400].compute()\n') # In[18]: gfs_dt.sdswrf.avg.surface.sdswrf[0,1,:,:].plot(figsize=(12,8)) # In[19]: gfs_dt.sdswrf.avg.surface.sdswrf[0,2,:,:].plot(figsize=(12,8)) # In[20]: gfs_dt.sdswrf.avg.surface.sdswrf[0,3,:,:].plot(figsize=(12,8)) # # Timeseries # In[21]: # from joblib import parallel_config # with parallel_config(n_jobs=8): #res = gfs_dt.dswrf.avg.surface.dswrf.interp(longitude=[320.5, 300.2], latitude=[20.6, 45.7], method="linear") # In[24]: get_ipython().run_cell_magic('time', '', 'res = gfs_dt.sdswrf.avg.surface.sdswrf.interp(longitude=[320.5], latitude=[20.6], method="linear")\nres.plot()\n')