#!/usr/bin/env python # coding: utf-8 # # Fast NODD GRIB Aggregations # ## Overview # # In this tutorial we are going to demonstrate building fast kerchunk aggregations of NODD grib2 weather forecasts. This workflow primarily involves [xarray-datatree](https://xarray-datatree.readthedocs.io/en/latest/), [pandas](https://pandas.pydata.org/) and the new `grib_tree` function released in kerchunk v0.2.3. # # # ### About the Dataset # # In this demo we will be looking at GRIB2 files generated by [**NOAA Global Ensemble Forecast System (GEFS)**](https://www.ncei.noaa.gov/products/weather-climate-models/global-ensemble-forecast). This dataset is a weather forecast model made up of 21 separate forecasts, or ensemble members. GEFS has global coverage and is produced four times a day with forecasts going out to 16 days. It is updated 4 times a day, every 6 hours starting at midnight. # # More information on this dataset can be found [here](https://registry.opendata.aws/noaa-gefs) # # # ## Prerequisites # | Concepts | Importance | Notes | # | --- | --- | --- | # | [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core | # | [Pandas Tutorial](https://foundations.projectpythia.org/core/pandas/pandas.html#) | Required | Core | # | [Kerchunk and Xarray-Datatree](https://projectpythia.org/kerchunk-cookbook/notebooks/using_references/Datatree.html) | Required | IO | # | [Xarray-Datatree Overview](https://xarray-datatree.readthedocs.io/en/latest/quick-overview.html)| Required | IO | # # - **Time to learn**: 30 minutes # # ## Motivation # # As we know that **kerchunk** provides a unified way to represent a variety of chunked, compressed data formats (e.g. NetCDF/HDF5, GRIB2, TIFF, …) by generating *references*. This task flow has ability to build large aggregations from **NODD grib forecasts** # in a fraction of the time using the `idx files`. # ## Imports # In[1]: import copy import fsspec import pandas as pd import xarray as xr import datetime from kerchunk.grib2 import ( AggregationType, build_idx_grib_mapping, extract_datatree_chunk_index, grib_tree, map_from_index, parse_grib_idx, reinflate_grib_store, scan_grib, strip_datavar_chunks, ) # ## Building the Aggregation directly from the GRIB files # # We are using the newly implemented Kerchunk `grib_tree` function to build a hierarchical data model from a set of scanned grib messages.This data model can be opened directly using either zarr or xarray datatree. *This way of building the aggregation is very slow*. Here we're going to use xarray-datatree to open and view it: # In[2]: s3_files = [ "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006", "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af012", "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af018", "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af024", "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af030", "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af036", "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af042", ] # In[3]: # converting the references into the hierarchical datamodel grib_tree_store = grib_tree( [ group for f in s3_files for group in scan_grib(f, storage_options=dict(anon=True)) ], remote_options=dict(anon=True), ) # In[4]: # Transforming the output to datatree to view it. This tree model the variables s3_dt = xr.open_datatree( fsspec.filesystem( "reference", fo=grib_tree_store, remote_protocol="s3", remote_options={"anon": True}, ).get_mapper(""), engine="zarr", consolidated=False, ) # In[5]: # In this tree model, the variables are organized into hierarchical groups, first by "stepType" and then by "typeOfLevel." s3_dt # > **Note**: If trying out this notebook, the above way of building the aggregation will take few minutes for this `GEFS` S3 repository. But in general, it take more time, based on the size of the grib files # ## Building the aggregation faster with `idx` files # # This method of aggregating GRIB files requires that each GRIB file be accompanied by its corresponding `idx` file in text format. This file, also called index files, contain some key metadata of the GRIB messages across the files. These metadata is then used to build an reference index, which helps in aggregation. # # The advantage is, building aggregation this way doesn't involve scanning the whole archive of GRIB files but only a single file. # # ### Index Dataframe made from a single Grib file # # Here is what the contents of an `idx` file looks like. # # ``` # 1:0:d=2017010100:HGT:10 mb:12 hour fcst:ENS=low-res ctl # 2:48163:d=2017010100:TMP:10 mb:12 hour fcst:ENS=low-res ctl # 3:68112:d=2017010100:RH:10 mb:12 hour fcst:ENS=low-res ctl # 4:79092:d=2017010100:UGRD:10 mb:12 hour fcst:ENS=low-res ctl # 5:102125:d=2017010100:VGRD:10 mb:12 hour fcst:ENS=low-res ctl # 6:122799:d=2017010100:HGT:50 mb:12 hour fcst:ENS=low-res ctl # 7:178898:d=2017010100:TMP:50 mb:12 hour fcst:ENS=low-res ctl # 8:201799:d=2017010100:RH:50 mb:12 hour fcst:ENS=low-res ctl # 9:224321:d=2017010100:UGRD:50 mb:12 hour fcst:ENS=low-res ctl # 10:272234:d=2017010100:VGRD:50 mb:12 hour fcst:ENS=low-res ctl # 11:318288:d=2017010100:HGT:100 mb:12 hour fcst:ENS=low-res ctl # 12:379010:d=2017010100:TMP:100 mb:12 hour fcst:ENS=low-res ctl # 13:405537:d=2017010100:RH:100 mb:12 hour fcst:ENS=low-res ctl # 14:441517:d=2017010100:UGRD:100 mb:12 hour fcst:ENS=low-res ctl # 15:497421:d=2017010100:VGRD:100 mb:12 hour fcst:ENS=low-res ctl # ``` # # The general format of `idx` data is: **index:offset:date_with_runtime:variable:forecast_time:**. # # The metadata are separated by ":" (colon) and we need to convert it into a `Dataframe` for building the index. # # # > **Note**: This way of building the reference index only works for a particular **horizon** file irrespective of the run time of the model, as the messages from a same time horizon have an identical structure # In[6]: # converting the idx data into a dataframe idxdf = parse_grib_idx( "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006", storage_options=dict(anon=True), ) idxdf.head() # ### Building a mapping between the index data and grib metadata # # Creation of the reference index or **k_index** first involves generating a single mapping from the grib/zarr metadata(the `grib_tree` output) to the attributes in the idx files. They are unique for each time horizon, so while building the index, we need to build a unique mapping for the 1 hour forecast, the 2 hour forecast and so on. # # This mapping can be made from any arbitrary GRIB file, given that the it belongs to the **same time horizon** of files we're trying to index. # # We'll start by examining the GRIB data. The metadata that we'll be extracting will be static in nature. We're going to use a single node by [accessing](https://projectpythia.org/kerchunk-cookbook/notebooks/using_references/Datatree.html#accessing-the-datatree) it with datatree. # In[7]: s3_dt.groups # In[8]: # Parsing the grib metadata from a single datatree node and converting it into a dataframe grib_df = extract_datatree_chunk_index(s3_dt["sulwrf/avg/nominalTop"], grib_tree_store) grib_df.head() # > **Note**: Above process is part of the mapping creation, the function call to `extract_datatree_chunk_index` in handled inside `build_idx_grib_mapping` function # In[9]: # creating a mapping for a single horizon file which is to be used later mapping = build_idx_grib_mapping( "s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z.pgrb2af006", storage_options=dict(anon=True), validate=True, ) mapping.head() # >**Note**: Reference index can be combined across many horizons but **each horizon must be indexed separately**. # ### Building the index # # Now if we parse the runtime from the idx file and other attributes, we can build a fully compatible kerchunk index(k_index) for that # **particular file**. Before creating the index, we need to clean some of the data in the mapping and index dataframe for the some variables as they tend to contain duplicate values, as demonstrated below. # In[10]: # cleaning the mapping mapping.loc[~mapping["attrs"].duplicated(keep="first"), :].head() # In[11]: # this step will be performed for every idx dataframe where we will be using the "mapping" dataframe which we created previously mapped_index = map_from_index( pd.Timestamp("2017-01-01T06"), mapping.loc[~mapping["attrs"].duplicated(keep="first"), :], idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :], ) mapped_index.head() # ### Final step of building of Aggregation # # For the final step of the aggregation, we will create an index for each GRIB file to cover a two-month period. We will be using the 6-hour horizon file for building the aggregation, this will be from `2017-01-01` to `2017-02-28`. The generated index can then be combined with indexes of other time horizons and stored for later use. # In[12]: get_ipython().run_cell_magic('time', '', 'mapped_index_list = []\n\ndeduped_mapping = mapping.loc[~mapping["attrs"].duplicated(keep="first"), :]\n\n# this process is manually done because the runtime and forecast time will vary between models i.e. GEFS, GFS, HRRR etc.\nfor date in pd.date_range("2017-01-01", "2017-02-28"):\n for runtime in range(0, 24, 6):\n fname = f"s3://noaa-gefs-pds/gefs.{date.strftime(\'%Y%m%d\')}/{runtime:02}/gec00.t{runtime:02}z.pgrb2af006"\n \n idxdf = parse_grib_idx(basename=fname, storage_options=dict(anon=True))\n \n mapped_index = map_from_index(\n pd.Timestamp(date + datetime.timedelta(hours=runtime)),\n deduped_mapping,\n idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :],\n )\n \n mapped_index_list.append(mapped_index)\n\ns3_kind = pd.concat(mapped_index_list)\n') # > **Tip**: To confirm the above step, check out this [notebook](https://gist.github.com/Anu-Ra-g/efa01ad1c274c1bd1c14ee01666caa77) # ## Using the aggregation # # The difference between .idx and Kerchunk index that we built is that the former indexes the GRIB messages and the latter indexes the variables in those messages, in a time horizon. Now we'll need a tree model from grib_tree function to reinflate index (variables in the messages). The important point to note here is that the tree model should be made from the GRIB files of the repository that we are indexing. # In[13]: grib_tree_store = grib_tree( scan_grib( "s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006", storage_options=dict(anon=True), ), remote_options=dict(anon=True), ) # Then we need to run that tree model through `strip_datavar_chunks` function, which will strip the tree of the variables *in place*. This step is optional # In[14]: deflated_s3_grib_tree_store = copy.deepcopy( grib_tree_store ) # not affecting the original tree model strip_datavar_chunks(deflated_s3_grib_tree_store) # After stripping the tree model, we need to introduce two new axes: the `runtime` used for mapping and `date_range` for indexing the new reinflated tree. # In[15]: axes = [ pd.Index( [ pd.timedelta_range( start="0 hours", end="6 hours", freq="6h", closed="right", name="6 hour" ), ], name="step", ), pd.date_range( "2017-01-01T00:00", "2017-02-28T18:00", freq="360min", name="valid_time" ), ] axes # Now for reinflating, we'll be needing the aggregation types which are: `horizon`, `valid_time`, `run_time` and `best_available`. We will also be needing the variable(s) that we are reinflating. # In[16]: s3_store = reinflate_grib_store( axes=axes, aggregation_type=AggregationType.HORIZON, chunk_index=s3_kind.loc[s3_kind.varname.isin(["sulwrf", "prmsl"])], zarr_ref_store=deflated_s3_grib_tree_store, ) # ## Viewing the new subset of the datatree # # In this step, we can view the new subset as a `datatree` model. # In[17]: s3_dt_subset = xr.open_datatree( fsspec.filesystem( "reference", fo=s3_store, remote_protocol="s3", remote_options={"anon": True} ).get_mapper(""), engine="zarr", consolidated=False, ) # In[18]: s3_dt_subset.groups # In[19]: da = s3_dt_subset.sulwrf.avg.nominalTop.sulwrf.isel(model_horizons=0) da # #### Create an index for time # In[20]: da= da.rename({'valid_times':'time'}).drop_vars(['step']) # In[21]: da = da.set_index(time='time') # In[22]: da # In[23]: da.sel(time='2017-01-02', method='nearest') # #### Try extracting some data! # Time series extraction with GRIB is slow because one needs to read and uncompress the entire dataset: Each data chunk covers the entire spatial domain! # In[25]: get_ipython().run_cell_magic('time', '', 'da[:,90,180].load()\n') # In[ ]: