A full archive of ERA5 reanalysis data has been made available as a free to access public data set on AWS. To aid effiecent cloud access to this data the provider has reproduced the native netcdf variables as Zarr stores. While this is convenient for the end user the burden of processing and the cost duplicating the original data is significant.
Kerchunk seeks to provide some middle ground to this by creating a sidecar file to go along with the native netcdf dataset. While this requires some preprocessing from the data provider, it does not require any duplicating of data. Only a single 200 MB sidecar file is neccessary to allow severless, zarr-like, access to the native Netcdf datasets.
import fsspec
import ujson
import xarray as xr
import hvplot.xarray
import panel as pn
import cartopy.crs as ccrs
import zstandard
fs2 = fsspec.filesystem('s3', anon=True)
%%time
fs = fsspec.filesystem("reference", fo='s3://esip-qhub-public/ecmwf/ERA5_2020_2022_multivar.json.zst',
ref_storage_args={"compression": "zstd"},
remote_protocol='s3', remote_options={'anon':True})
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={'time0':288})
ds
CPU times: user 4.33 s, sys: 382 ms, total: 4.72 s Wall time: 33 s
<xarray.Dataset> Dimensions: (time0: 21168, lat: 721, lon: 1440) Coordinates: * lat (lat) float32 90.0 89.75 ... -90.0 * lon (lon) float32 0.0 0.25 ... 359.5 359.8 * time0 (time0) datetime64[ns] 2020-01-01 .... Data variables: air_pressure_at_mean_sea_level (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> air_temperature_at_2_metres (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> dew_point_temperature_at_2_metres (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> eastward_wind_at_100_metres (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> eastward_wind_at_10_metres (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> lwe_thickness_of_surface_snow_amount (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> northward_wind_at_100_metres (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> northward_wind_at_10_metres (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> sea_surface_temperature (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> surface_air_pressure (time0, lat, lon) float32 dask.array<chunksize=(288, 100, 100), meta=np.ndarray> Attributes: institution: ECMWF source: Reanalysis title: ERA5 forecasts
ds = ds.sel(time0 = '2021-01-01T00:00:00')
variables = list(ds.data_vars)
variables.reverse()
#If desired this will load all the variables on this particular day into memory to increase the render speed of the map
#ds = ds.load()
sel = pn.widgets.Select(options=variables, name='Data Variable')
pn.Column(sel, ds.hvplot.image(z=sel, cmap = 'viridis',
coastline=True,projection=ccrs.Orthographic(45, -10),
global_extent=True, frame_height=500))
Below is a step by step tutorial to reproduce this on a single variable
Kerchunk is a new package and changes frequently, it is best to import directly from github by cloning the repository or using:
#!pip install git+https://github.com/fsspec/kerchunk
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
First create a filesystem pointing to the location of 2 meter air temperature files on the S3 bucket. ERA5 is a public data set so anonymous access works fine
fs = fsspec.filesystem('s3', anon=True)
flist = fs.glob('s3://era5-pds/2020/*/data/air_pressure_at_mean_sea_level.nc')[:2] # only consider two files to speed this up
flist #chech the paths are consistent
['era5-pds/2020/01/data/air_pressure_at_mean_sea_level.nc', 'era5-pds/2020/02/data/air_pressure_at_mean_sea_level.nc']
Next create a local file system to save the files
fs2 = fsspec.filesystem('')
First it is neccessary to create a sidecar file for each individual file using kerchunk.hdf.SingleHdf5ToZarr. Later we will combine these into a single file
from pathlib import Path
import os
os.mkdir('ERA5_jsons')
json_dir = 'ERA5_jsons/'
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
def gen_json(u):
with fs.open(u, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
parts = u.split('/') # seperate file path to create a unique name for each json
year = parts[1]
month = parts[2]
fstem = Path(u).stem
outf = f'{json_dir}{year}{month}{fstem}.json'
print(outf)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
Create the individual sidecar files. This step is best run in parallel
%%time
for file in flist:
gen_json(file)
ERA5_jsons/202001air_pressure_at_mean_sea_level.json ERA5_jsons/202002air_pressure_at_mean_sea_level.json CPU times: user 14.3 s, sys: 1.89 s, total: 16.2 s Wall time: 14min 52s
Check these files have been successfully written by openning an single file
json_list = fs2.glob(str(json_dir)+'*air_pressure_at_mean_sea_level.json')
The next step is to use kerchunk.combine.MultiZarrtoZarr to combine these into a single sidecar file
%%time
fs_ = fsspec.filesystem("reference", fo=json_list[0], ref_storage_args={'skip_instance_cache':True},
remote_protocol='s3', remote_options={'anon':True})
m = fs_.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={})
print(ds)
<xarray.Dataset> Dimensions: (time0: 744, lat: 721, lon: 1440) Coordinates: * lat (lat) float32 90.0 89.75 ... -89.75 -90.0 * lon (lon) float32 nan 0.25 0.5 ... 359.5 359.8 * time0 (time0) datetime64[ns] 2020-01-01 ... 202... Data variables: air_pressure_at_mean_sea_level (time0, lat, lon) float32 dask.array<chunksize=(24, 100, 100), meta=np.ndarray> Attributes: institution: ECMWF source: Reanalysis title: ERA5 forecasts CPU times: user 34 ms, sys: 0 ns, total: 34 ms Wall time: 43.8 ms
All looks good, except for 0 lon being translated to Nan. This is as a result of no fill_value being assigned in the original Netcdf file and zarr defaulting to 0 as a fill_value. We can us the postprocess method to modify this.
import zarr
def modify_fill_value(out):
out_ = zarr.open(out)
out_.lon.fill_value = -999
out_.lat.fill_value = -999
return out
def postprocess(out):
out = modify_fill_value(out)
return out
Now we can use MultiZarrToZarr to combine the individual jsons along the 'time0' dimension
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
concat_dims=['time0'], #this is the dimension along which the individual files will be merged
postprocess = postprocess
)
%%time
d = mzz.translate()
CPU times: user 5.88 s, sys: 77.5 ms, total: 5.96 s Wall time: 5.91 s
with fs2.open('combined.json', 'wb') as f:
f.write(ujson.dumps(d).encode())
%%time
fs = fsspec.filesystem("reference", fo='combined.json', ref_storage_args={'skip_instance_cache':True},
remote_protocol='s3', remote_options={'anon':True})
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={})
print(ds)
<xarray.Dataset> Dimensions: (time0: 1440, lat: 721, lon: 1440) Coordinates: * lat (lat) float32 90.0 89.75 ... -89.75 -90.0 * lon (lon) float32 0.0 0.25 0.5 ... 359.5 359.8 * time0 (time0) datetime64[ns] 2020-01-01 ... 202... Data variables: air_pressure_at_mean_sea_level (time0, lat, lon) float32 dask.array<chunksize=(24, 100, 100), meta=np.ndarray> Attributes: institution: ECMWF source: Reanalysis title: ERA5 forecasts CPU times: user 33.4 ms, sys: 0 ns, total: 33.4 ms Wall time: 32.4 ms
%%time
ds = ds.sel(time0 = '2020-01-01T12:00:00')
ds.air_pressure_at_mean_sea_level.hvplot.image(cmap = 'viridis', coastline=True,
projection=ccrs.Orthographic(45, -10),global_extent=True)
CPU times: user 175 ms, sys: 15.7 ms, total: 191 ms Wall time: 191 ms