In this tutorial we are going to demonstrate building fast kerchunk aggregations of NODD grib2 weather forecasts. This workflow primarily involves xarray-datatree, pandas and the new grib_tree
function released in kerchunk v0.2.3.
In this demo we will be looking at GRIB2 files generated by NOAA Global Ensemble Forecast System (GEFS). This dataset is a weather forecast model made up of 21 separate forecasts, or ensemble members. GEFS has global coverage and is produced four times a day with forecasts going out to 16 days. It is updated 4 times a day, every 6 hours starting at midnight.
More information on this dataset can be found here
Concepts | Importance | Notes |
---|---|---|
Kerchunk Basics | Required | Core |
Pandas Tutorial | Required | Core |
Kerchunk and Xarray-Datatree | Required | IO |
Xarray-Datatree Overview | Required | IO |
As we know that kerchunk provides a unified way to represent a variety of chunked, compressed data formats (e.g. NetCDF/HDF5, GRIB2, TIFF, …) by generating references. This task flow has ability to build large aggregations from NODD grib forecasts
in a fraction of the time using the idx files
.
import copy
import fsspec
import pandas as pd
import xarray as xr
import datetime
from kerchunk.grib2 import (
AggregationType,
build_idx_grib_mapping,
extract_datatree_chunk_index,
grib_tree,
map_from_index,
parse_grib_idx,
reinflate_grib_store,
scan_grib,
strip_datavar_chunks,
)
We are using the newly implemented Kerchunk grib_tree
function to build a hierarchical data model from a set of scanned grib messages.This data model can be opened directly using either zarr or xarray datatree. This way of building the aggregation is very slow. Here we're going to use xarray-datatree to open and view it:
s3_files = [
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006",
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af012",
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af018",
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af024",
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af030",
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af036",
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af042",
]
# converting the references into the hierarchical datamodel
grib_tree_store = grib_tree(
[
group
for f in s3_files
for group in scan_grib(f, storage_options=dict(anon=True))
],
remote_options=dict(anon=True),
)
/home/rsignell/miniforge3/envs/grib-idx/lib/python3.12/site-packages/kerchunk/combine.py:376: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [1483250400] warnings.warn(
# Transforming the output to datatree to view it. This tree model the variables
s3_dt = xr.open_datatree(
fsspec.filesystem(
"reference",
fo=grib_tree_store,
remote_protocol="s3",
remote_options={"anon": True},
).get_mapper(""),
engine="zarr",
consolidated=False,
)
# In this tree model, the variables are organized into hierarchical groups, first by "stepType" and then by "typeOfLevel."
s3_dt
<xarray.DatasetView> Size: 0B Dimensions: () Data variables: *empty*
Note: If trying out this notebook, the above way of building the aggregation will take few minutes for this
GEFS
S3 repository. But in general, it take more time, based on the size of the grib files
idx
files¶This method of aggregating GRIB files requires that each GRIB file be accompanied by its corresponding idx
file in text format. This file, also called index files, contain some key metadata of the GRIB messages across the files. These metadata is then used to build an reference index, which helps in aggregation.
The advantage is, building aggregation this way doesn't involve scanning the whole archive of GRIB files but only a single file.
Here is what the contents of an idx
file looks like.
1:0:d=2017010100:HGT:10 mb:12 hour fcst:ENS=low-res ctl
2:48163:d=2017010100:TMP:10 mb:12 hour fcst:ENS=low-res ctl
3:68112:d=2017010100:RH:10 mb:12 hour fcst:ENS=low-res ctl
4:79092:d=2017010100:UGRD:10 mb:12 hour fcst:ENS=low-res ctl
5:102125:d=2017010100:VGRD:10 mb:12 hour fcst:ENS=low-res ctl
6:122799:d=2017010100:HGT:50 mb:12 hour fcst:ENS=low-res ctl
7:178898:d=2017010100:TMP:50 mb:12 hour fcst:ENS=low-res ctl
8:201799:d=2017010100:RH:50 mb:12 hour fcst:ENS=low-res ctl
9:224321:d=2017010100:UGRD:50 mb:12 hour fcst:ENS=low-res ctl
10:272234:d=2017010100:VGRD:50 mb:12 hour fcst:ENS=low-res ctl
11:318288:d=2017010100:HGT:100 mb:12 hour fcst:ENS=low-res ctl
12:379010:d=2017010100:TMP:100 mb:12 hour fcst:ENS=low-res ctl
13:405537:d=2017010100:RH:100 mb:12 hour fcst:ENS=low-res ctl
14:441517:d=2017010100:UGRD:100 mb:12 hour fcst:ENS=low-res ctl
15:497421:d=2017010100:VGRD:100 mb:12 hour fcst:ENS=low-res ctl
The general format of idx
data is: index:offset:date_with_runtime:variable:forecast_time:.
The metadata are separated by ":" (colon) and we need to convert it into a Dataframe
for building the index.
Note: This way of building the reference index only works for a particular horizon file irrespective of the run time of the model, as the messages from a same time horizon have an identical structure
# converting the idx data into a dataframe
idxdf = parse_grib_idx(
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006",
storage_options=dict(anon=True),
)
idxdf.head()
offset | date | attrs | length | idx_uri | grib_uri | |
---|---|---|---|---|---|---|
idx | ||||||
1 | 0 | d=2017010106 | HGT:10 mb:6 hour fcst:ENS=low-res ctl | 47493 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... |
2 | 47493 | d=2017010106 | TMP:10 mb:6 hour fcst:ENS=low-res ctl | 19438 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... |
3 | 66931 | d=2017010106 | RH:10 mb:6 hour fcst:ENS=low-res ctl | 10835 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... |
4 | 77766 | d=2017010106 | UGRD:10 mb:6 hour fcst:ENS=low-res ctl | 22625 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... |
5 | 100391 | d=2017010106 | VGRD:10 mb:6 hour fcst:ENS=low-res ctl | 20488 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... |
Creation of the reference index or k_index first involves generating a single mapping from the grib/zarr metadata(the grib_tree
output) to the attributes in the idx files. They are unique for each time horizon, so while building the index, we need to build a unique mapping for the 1 hour forecast, the 2 hour forecast and so on.
This mapping can be made from any arbitrary GRIB file, given that the it belongs to the same time horizon of files we're trying to index.
We'll start by examining the GRIB data. The metadata that we'll be extracting will be static in nature. We're going to use a single node by accessing it with datatree.
s3_dt.groups
('/', '/cape', '/cfrzr', '/cicep', '/cin', '/crain', '/csnow', '/gh', '/mslhf', '/msshf', '/prmsl', '/pwat', '/r', '/r2', '/sde', '/sdlwrf', '/sdswrf', '/sdwe', '/soilw', '/sp', '/st', '/sulwrf', '/suswrf', '/t', '/t2m', '/tcc', '/tmax', '/tmin', '/tp', '/u', '/u10', '/v', '/v10', '/w', '/cape/instant', '/cfrzr/avg', '/cicep/avg', '/cin/instant', '/crain/avg', '/csnow/avg', '/gh/instant', '/mslhf/avg', '/msshf/avg', '/prmsl/instant', '/pwat/instant', '/r/instant', '/r2/instant', '/sde/instant', '/sdlwrf/avg', '/sdswrf/avg', '/sdwe/instant', '/soilw/instant', '/sp/instant', '/st/instant', '/sulwrf/avg', '/suswrf/avg', '/t/instant', '/t2m/instant', '/tcc/avg', '/tmax/max', '/tmin/min', '/tp/accum', '/u/instant', '/u10/instant', '/v/instant', '/v10/instant', '/w/instant', '/cape/instant/pressureFromGroundLayer', '/cfrzr/avg/surface', '/cicep/avg/surface', '/cin/instant/pressureFromGroundLayer', '/crain/avg/surface', '/csnow/avg/surface', '/gh/instant/isobaricInhPa', '/mslhf/avg/surface', '/msshf/avg/surface', '/prmsl/instant/meanSea', '/pwat/instant/atmosphereSingleLayer', '/r/instant/isobaricInhPa', '/r2/instant/heightAboveGround', '/sde/instant/surface', '/sdlwrf/avg/surface', '/sdswrf/avg/surface', '/sdwe/instant/surface', '/soilw/instant/depthBelowLandLayer', '/sp/instant/surface', '/st/instant/depthBelowLandLayer', '/sulwrf/avg/nominalTop', '/sulwrf/avg/surface', '/suswrf/avg/surface', '/t/instant/isobaricInhPa', '/t2m/instant/heightAboveGround', '/tcc/avg/atmosphere', '/tmax/max/heightAboveGround', '/tmin/min/heightAboveGround', '/tp/accum/surface', '/u/instant/isobaricInhPa', '/u10/instant/heightAboveGround', '/v/instant/isobaricInhPa', '/v10/instant/heightAboveGround', '/w/instant/isobaricInhPa')
# Parsing the grib metadata from a single datatree node and converting it into a dataframe
grib_df = extract_datatree_chunk_index(s3_dt["sulwrf/avg/nominalTop"], grib_tree_store)
grib_df.head()
varname | typeOfLevel | stepType | name | nominalTop | number | step | time | valid_time | uri | offset | length | inline_value | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | sulwrf | nominalTop | avg | Surface upward long-wave radiation flux | 0.0 | 0 | 0 days 06:00:00 | 2017-01-01 06:00:00 | 2017-01-01 12:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 3924345 | 43221 | None |
1 | sulwrf | nominalTop | avg | Surface upward long-wave radiation flux | 0.0 | 0 | 0 days 12:00:00 | 2017-01-01 06:00:00 | 2017-01-01 18:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 3951119 | 42550 | None |
2 | sulwrf | nominalTop | avg | Surface upward long-wave radiation flux | 0.0 | 0 | 0 days 18:00:00 | 2017-01-01 06:00:00 | 2017-01-02 00:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 3948238 | 43273 | None |
3 | sulwrf | nominalTop | avg | Surface upward long-wave radiation flux | 0.0 | 0 | 1 days 00:00:00 | 2017-01-01 06:00:00 | 2017-01-02 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 3876964 | 42613 | None |
4 | sulwrf | nominalTop | avg | Surface upward long-wave radiation flux | 0.0 | 0 | 1 days 06:00:00 | 2017-01-01 06:00:00 | 2017-01-02 12:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 3865989 | 42782 | None |
Note: Above process is part of the mapping creation, the function call to
extract_datatree_chunk_index
in handled insidebuild_idx_grib_mapping
function
# creating a mapping for a single horizon file which is to be used later
mapping = build_idx_grib_mapping(
"s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z.pgrb2af006",
storage_options=dict(anon=True),
validate=True,
)
mapping.head()
The grib hierarchy in s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z.pgrb2af006 is not unique for 54 variables: ['gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'u', 'v', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 'gh', 't', 'r', 'u', 'v', 't', 'r', 'u', 'v', 'gh']
offset_idx | date | attrs | length_idx | idx_uri | grib_uri | varname | typeOfLevel | stepType | name | level | step | time | valid_time | uri | offset_grib | length_grib | inline_value | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
idx | ||||||||||||||||||
1 | 0 | d=2017010100 | HGT:10 mb:6 hour fcst:ENS=low-res ctl | 47423 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | gh | isobaricInhPa | instant | Geopotential height | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 0 | 47423 | None |
2 | 47423 | d=2017010100 | TMP:10 mb:6 hour fcst:ENS=low-res ctl | 19684 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | t | isobaricInhPa | instant | Temperature | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 47423 | 19684 | None |
3 | 67107 | d=2017010100 | RH:10 mb:6 hour fcst:ENS=low-res ctl | 10334 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | r | isobaricInhPa | instant | Relative humidity | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 67107 | 10334 | None |
4 | 77441 | d=2017010100 | UGRD:10 mb:6 hour fcst:ENS=low-res ctl | 23094 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | u | isobaricInhPa | instant | U component of wind | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 77441 | 23094 | None |
5 | 100535 | d=2017010100 | VGRD:10 mb:6 hour fcst:ENS=low-res ctl | 20427 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | v | isobaricInhPa | instant | V component of wind | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 100535 | 20427 | None |
Note: Reference index can be combined across many horizons but each horizon must be indexed separately.
Now if we parse the runtime from the idx file and other attributes, we can build a fully compatible kerchunk index(k_index) for that particular file. Before creating the index, we need to clean some of the data in the mapping and index dataframe for the some variables as they tend to contain duplicate values, as demonstrated below.
# cleaning the mapping
mapping.loc[~mapping["attrs"].duplicated(keep="first"), :].head()
offset_idx | date | attrs | length_idx | idx_uri | grib_uri | varname | typeOfLevel | stepType | name | level | step | time | valid_time | uri | offset_grib | length_grib | inline_value | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
idx | ||||||||||||||||||
1 | 0 | d=2017010100 | HGT:10 mb:6 hour fcst:ENS=low-res ctl | 47423 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | gh | isobaricInhPa | instant | Geopotential height | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 0 | 47423 | None |
2 | 47423 | d=2017010100 | TMP:10 mb:6 hour fcst:ENS=low-res ctl | 19684 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | t | isobaricInhPa | instant | Temperature | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 47423 | 19684 | None |
3 | 67107 | d=2017010100 | RH:10 mb:6 hour fcst:ENS=low-res ctl | 10334 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | r | isobaricInhPa | instant | Relative humidity | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 67107 | 10334 | None |
4 | 77441 | d=2017010100 | UGRD:10 mb:6 hour fcst:ENS=low-res ctl | 23094 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | u | isobaricInhPa | instant | U component of wind | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 77441 | 23094 | None |
5 | 100535 | d=2017010100 | VGRD:10 mb:6 hour fcst:ENS=low-res ctl | 20427 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | v | isobaricInhPa | instant | V component of wind | 0.0 | 0 days 06:00:00 | 2017-01-01 | 2017-01-01 06:00:00 | s3://noaa-gefs-pds/gefs.20170101/00/gec00.t00z... | 100535 | 20427 | None |
# this step will be performed for every idx dataframe where we will be using the "mapping" dataframe which we created previously
mapped_index = map_from_index(
pd.Timestamp("2017-01-01T06"),
mapping.loc[~mapping["attrs"].duplicated(keep="first"), :],
idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :],
)
mapped_index.head()
varname | typeOfLevel | stepType | name | step | level | time | valid_time | uri | offset | length | inline_value | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | gh | isobaricInhPa | instant | Geopotential height | 0 days 06:00:00 | 0.0 | 2017-01-01 06:00:00 | 2017-01-01 12:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 0 | 47493 | None |
1 | t | isobaricInhPa | instant | Temperature | 0 days 06:00:00 | 0.0 | 2017-01-01 06:00:00 | 2017-01-01 12:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 47493 | 19438 | None |
2 | r | isobaricInhPa | instant | Relative humidity | 0 days 06:00:00 | 0.0 | 2017-01-01 06:00:00 | 2017-01-01 12:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 66931 | 10835 | None |
3 | u | isobaricInhPa | instant | U component of wind | 0 days 06:00:00 | 0.0 | 2017-01-01 06:00:00 | 2017-01-01 12:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 77766 | 22625 | None |
4 | v | isobaricInhPa | instant | V component of wind | 0 days 06:00:00 | 0.0 | 2017-01-01 06:00:00 | 2017-01-01 12:00:00 | s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z... | 100391 | 20488 | None |
For the final step of the aggregation, we will create an index for each GRIB file to cover a two-month period. We will be using the 6-hour horizon file for building the aggregation, this will be from 2017-01-01
to 2017-02-28
. The generated index can then be combined with indexes of other time horizons and stored for later use.
%%time
mapped_index_list = []
deduped_mapping = mapping.loc[~mapping["attrs"].duplicated(keep="first"), :]
# this process is manually done because the runtime and forecast time will vary between models i.e. GEFS, GFS, HRRR etc.
for date in pd.date_range("2017-01-01", "2017-02-28"):
for runtime in range(0, 24, 6):
fname = f"s3://noaa-gefs-pds/gefs.{date.strftime('%Y%m%d')}/{runtime:02}/gec00.t{runtime:02}z.pgrb2af006"
idxdf = parse_grib_idx(basename=fname, storage_options=dict(anon=True))
mapped_index = map_from_index(
pd.Timestamp(date + datetime.timedelta(hours=runtime)),
deduped_mapping,
idxdf.loc[~idxdf["attrs"].duplicated(keep="first"), :],
)
mapped_index_list.append(mapped_index)
s3_kind = pd.concat(mapped_index_list)
CPU times: user 9.21 s, sys: 303 ms, total: 9.51 s Wall time: 1min 15s
Tip: To confirm the above step, check out this notebook
The difference between .idx and Kerchunk index that we built is that the former indexes the GRIB messages and the latter indexes the variables in those messages, in a time horizon. Now we'll need a tree model from grib_tree function to reinflate index (variables in the messages). The important point to note here is that the tree model should be made from the GRIB files of the repository that we are indexing.
grib_tree_store = grib_tree(
scan_grib(
"s3://noaa-gefs-pds/gefs.20170101/06/gec00.t06z.pgrb2af006",
storage_options=dict(anon=True),
),
remote_options=dict(anon=True),
)
/home/rsignell/miniforge3/envs/grib-idx/lib/python3.12/site-packages/kerchunk/combine.py:376: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [1483250400] warnings.warn( /home/rsignell/miniforge3/envs/grib-idx/lib/python3.12/site-packages/kerchunk/combine.py:376: UserWarning: Concatenated coordinate 'step' contains less than expectednumber of values across the datasets: [6] warnings.warn(
Then we need to run that tree model through strip_datavar_chunks
function, which will strip the tree of the variables in place. This step is optional
deflated_s3_grib_tree_store = copy.deepcopy(
grib_tree_store
) # not affecting the original tree model
strip_datavar_chunks(deflated_s3_grib_tree_store)
After stripping the tree model, we need to introduce two new axes: the runtime
used for mapping and date_range
for indexing the new reinflated tree.
axes = [
pd.Index(
[
pd.timedelta_range(
start="0 hours", end="6 hours", freq="6h", closed="right", name="6 hour"
),
],
name="step",
),
pd.date_range(
"2017-01-01T00:00", "2017-02-28T18:00", freq="360min", name="valid_time"
),
]
axes
[Index([[0 days 06:00:00]], dtype='object', name='step'), DatetimeIndex(['2017-01-01 00:00:00', '2017-01-01 06:00:00', '2017-01-01 12:00:00', '2017-01-01 18:00:00', '2017-01-02 00:00:00', '2017-01-02 06:00:00', '2017-01-02 12:00:00', '2017-01-02 18:00:00', '2017-01-03 00:00:00', '2017-01-03 06:00:00', ... '2017-02-26 12:00:00', '2017-02-26 18:00:00', '2017-02-27 00:00:00', '2017-02-27 06:00:00', '2017-02-27 12:00:00', '2017-02-27 18:00:00', '2017-02-28 00:00:00', '2017-02-28 06:00:00', '2017-02-28 12:00:00', '2017-02-28 18:00:00'], dtype='datetime64[ns]', name='valid_time', length=236, freq='360min')]
Now for reinflating, we'll be needing the aggregation types which are: horizon
, valid_time
, run_time
and best_available
. We will also be needing the variable(s) that we are reinflating.
s3_store = reinflate_grib_store(
axes=axes,
aggregation_type=AggregationType.HORIZON,
chunk_index=s3_kind.loc[s3_kind.varname.isin(["sulwrf", "prmsl"])],
zarr_ref_store=deflated_s3_grib_tree_store,
)
/home/rsignell/miniforge3/envs/grib-idx/lib/python3.12/site-packages/kerchunk/_grib_idx.py:347: PerformanceWarning: indexing past lexsort depth may impact performance. if lookup not in unique_groups:
In this step, we can view the new subset as a datatree
model.
s3_dt_subset = xr.open_datatree(
fsspec.filesystem(
"reference", fo=s3_store, remote_protocol="s3", remote_options={"anon": True}
).get_mapper(""),
engine="zarr",
consolidated=False,
)
s3_dt_subset.groups
('/', '/prmsl', '/sulwrf', '/prmsl/instant', '/sulwrf/avg', '/prmsl/instant/meanSea', '/sulwrf/avg/nominalTop', '/sulwrf/avg/surface')
da = s3_dt_subset.sulwrf.avg.nominalTop.sulwrf.isel(model_horizons=0)
da
<xarray.DataArray 'sulwrf' (valid_times: 236, latitude: 181, longitude: 360)> Size: 123MB [15377760 values with dtype=float64] Coordinates: * latitude (latitude) float64 1kB 90.0 89.0 88.0 87.0 ... -88.0 -89.0 -90.0 * longitude (longitude) float64 3kB 0.0 1.0 2.0 3.0 ... 357.0 358.0 359.0 nominalTop float64 8B ... step (valid_times) timedelta64[ns] 2kB ... time (valid_times) datetime64[ns] 2kB ... valid_time (valid_times) datetime64[ns] 2kB ... Dimensions without coordinates: valid_times Attributes: (12/31) GRIB_NV: 0 GRIB_Nx: 360 GRIB_Ny: 181 GRIB_cfName: unknown GRIB_cfVarName: sulwrf GRIB_dataType: cf ... ... GRIB_typeOfLevel: nominalTop GRIB_units: W m**-2 GRIB_uvRelativeToGrid: 0 long_name: Surface upward long-wave radiat... standard_name: unknown units: W m**-2
da= da.rename({'valid_times':'time'}).drop_vars(['step'])
/tmp/ipykernel_134388/309788138.py:1: UserWarning: rename 'valid_times' to 'time' does not create an index anymore. Try using swap_dims instead or use set_index after rename to create an indexed coordinate. da= da.rename({'valid_times':'time'}).drop_vars(['step'])
da = da.set_index(time='time')
da
<xarray.DataArray 'sulwrf' (time: 236, latitude: 181, longitude: 360)> Size: 123MB [15377760 values with dtype=float64] Coordinates: * latitude (latitude) float64 1kB 90.0 89.0 88.0 87.0 ... -88.0 -89.0 -90.0 * longitude (longitude) float64 3kB 0.0 1.0 2.0 3.0 ... 357.0 358.0 359.0 nominalTop float64 8B ... * time (time) datetime64[ns] 2kB 2016-12-31T18:00:00 ... 2017-02-28T... valid_time (time) datetime64[ns] 2kB ... Attributes: (12/31) GRIB_NV: 0 GRIB_Nx: 360 GRIB_Ny: 181 GRIB_cfName: unknown GRIB_cfVarName: sulwrf GRIB_dataType: cf ... ... GRIB_typeOfLevel: nominalTop GRIB_units: W m**-2 GRIB_uvRelativeToGrid: 0 long_name: Surface upward long-wave radiat... standard_name: unknown units: W m**-2
da.sel(time='2017-01-02', method='nearest')
<xarray.DataArray 'sulwrf' (latitude: 181, longitude: 360)> Size: 521kB [65160 values with dtype=float64] Coordinates: * latitude (latitude) float64 1kB 90.0 89.0 88.0 87.0 ... -88.0 -89.0 -90.0 * longitude (longitude) float64 3kB 0.0 1.0 2.0 3.0 ... 357.0 358.0 359.0 nominalTop float64 8B ... time datetime64[ns] 8B 2017-01-02 valid_time datetime64[ns] 8B ... Attributes: (12/31) GRIB_NV: 0 GRIB_Nx: 360 GRIB_Ny: 181 GRIB_cfName: unknown GRIB_cfVarName: sulwrf GRIB_dataType: cf ... ... GRIB_typeOfLevel: nominalTop GRIB_units: W m**-2 GRIB_uvRelativeToGrid: 0 long_name: Surface upward long-wave radiat... standard_name: unknown units: W m**-2
Time series extraction with GRIB is slow because one needs to read and uncompress the entire dataset: Each data chunk covers the entire spatial domain!
%%time
da[:,90,180].load()
CPU times: user 3.02 s, sys: 325 ms, total: 3.35 s Wall time: 46.8 s
<xarray.DataArray 'sulwrf' (time: 236)> Size: 2kB array([ nan, 290., 290., 289., 287., 285., 290., 294., 292., 291., 293., 291., 298., 298., 298., 295., 293., 294., 290., 284., 284., 289., 291., 289., 295., 299., 298., 295., 292., 296., 298., 292., 294., 296., 294., 292., 292., 291., 287., 285., 287., 290., 289., 287., 281., 287., 288., 279., 288., 287., 290., 288., 290., 290., 289., 289., 291., 294., 294., 295., 295., 290., 291., 289., 286., 290., 283., 270., 236., 266., 277., 277., 276., 281., 280., 276., 279., 283., 283., 278., 271., 282., 283., 287., 291., 294., 291., 285., 286., 290., 291., 288., 288., 292., 294., 295., 297., 297., 298., 289., 284., 291., 286., 266., 269., 253., 251., 275., 285., 290., 288., 284., 288., 294., 293., 292., 293., 295., 294., 290., 291., 290., 292., 291., 290., 291., 291., 293., 296., 300., 302., 299., 300., 303., 300., 296., 296., 297., 293., 295., 298., 298., 296., 296., 298., 301., 297., 297., 296., 300., 299., 299., 299., 302., 301., 299., 298., 295., 299., 301., 298., 286., 284., 283., 291., 298., 300., 298., 298., 300., 302., 301., 300., 302., 303., 303., 297., 302., 303., 301., 299., 300., 300., 295., 296., 298., 301., 301., 302., 308., 309., 306., 308., 311., 305., 302., 299., 293., 292., 239., 238., 282., 284., 294., 292., 294., 292., 295., 296., 293., 291., 289., 292., 295., 293., 289., 292., 296., 295., 291., 288., 286., 284., 280., 280., 279., 281., 281., 283., 287., 284., 279., 281., 286., 290., 287.]) Coordinates: latitude float64 8B 0.0 longitude float64 8B 180.0 nominalTop float64 8B 0.0 * time (time) datetime64[ns] 2kB 2016-12-31T18:00:00 ... 2017-02-28T... valid_time (time) datetime64[ns] 2kB 2017-01-01 ... 2017-02-28T18:00:00 Attributes: (12/31) GRIB_NV: 0 GRIB_Nx: 360 GRIB_Ny: 181 GRIB_cfName: unknown GRIB_cfVarName: sulwrf GRIB_dataType: cf ... ... GRIB_typeOfLevel: nominalTop GRIB_units: W m**-2 GRIB_uvRelativeToGrid: 0 long_name: Surface upward long-wave radiat... standard_name: unknown units: W m**-2