Data contained in these grib2 files described here: https://www.nco.ncep.noaa.gov/pmb/products/gens/
Approach:
valid_time
dimension (the forecast hour, or tau
dimension)ensemble dimension
time
dimension (the daily forecast date, 00 each day)%xmode minimal
Exception reporting mode: Minimal
import os
import fsspec
from datetime import datetime, timedelta
import xarray as xr
import ujson
import kerchunk
from kerchunk.grib2 import scan_grib
from kerchunk.combine import MultiZarrToZarr
from pathlib import Path
import dask
from dask.distributed import LocalCluster, Client, performance_report
import dask.bag as db
from datatree import DataTree
import panel as pn
import hvplot.xarray
xr.__version__
'2023.3.0'
Create a set of fsspec file systems to read the latest GEFS forecast and write the reference files locally
fs_local = fsspec.filesystem('', skip_instance_cache = True, use_listings_cache=False)
fs_s3 = fsspec.filesystem('s3', anon = True)
s3://noaa-gefs-pds/gefs.{date}/{cycle}/atmos/pgrb2ap5/gep{ensemble_member}.t{cycle}.pgrb2a.0p50.f{forecast_hour}
bucket = 's3://noaa-gefs-retrospective'
flist = fs_s3.ls(bucket)
flist
['noaa-gefs-retrospective/Description_of_reforecast_data.pdf', 'noaa-gefs-retrospective/GEFSv12', 'noaa-gefs-retrospective/index.html', 'noaa-gefs-retrospective/landsfc.pgrb2.0p25', 'noaa-gefs-retrospective/landsfc.pgrb2.0p50']
dates = fs_s3.glob(f'{bucket}/GEFSv12/reforecast/2019/??????????')
print(len(dates))
print(dates[0])
print(dates[-1])
365 noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019010100 noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019123100
fs_s3.ls(dates[-1])
['noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019123100/c00', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019123100/p01', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019123100/p02', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019123100/p03', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019123100/p04']
date = Path(dates[-3]).name
print(date)
print(date[:4])
2019122900 2019
search for a specific cycle, forecast hour, variable group and date
cycle = '00' # using the 00z cycle (cycles at 00z, 06z, 12z, 18z, but only 00 is distributed)
tau = '000'
#date = '2019123000'
year = date[:4]
f = fs_s3.glob(f's3://noaa-gefs-retrospective/GEFSv12/reforecast/{year}/{date}/???/Days:1-10/weasd_sfc_{date}_???.grib2')
np_ensembles = len(f)
print(np_ensembles)
f
5
['noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019122900/c00/Days:1-10/weasd_sfc_2019122900_c00.grib2', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019122900/p01/Days:1-10/weasd_sfc_2019122900_p01.grib2', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019122900/p02/Days:1-10/weasd_sfc_2019122900_p02.grib2', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019122900/p03/Days:1-10/weasd_sfc_2019122900_p03.grib2', 'noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019122900/p04/Days:1-10/weasd_sfc_2019122900_p04.grib2']
ensembles = ['c00', 'p01', 'p02', 'p03', 'p04']
search for a ensemble member and date
flist = sorted(fs_s3.glob(f's3://noaa-gefs-retrospective/GEFSv12/reforecast/{year}/{date}/c00/Days:1-10/*.grib2'))
flist = [f's3://{f}' for f in flist]
len(flist)
60
Make a list of variable names (extracting them from the grib file names)
varnames = []
for f in flist:
file = Path(f).parts[8]
p = file.split('_')
pp = p[:-2]
var = '_'.join(pp)
varnames.append(var)
varnames
['acpcp_sfc', 'apcp_sfc', 'cape_sfc', 'cin_sfc', 'dlwrf_sfc', 'dswrf_sfc', 'gflux_sfc', 'gust_sfc', 'hgt_ceiling', 'hgt_hybr', 'hgt_pres', 'hgt_pres_abv700mb', 'hgt_sfc', 'hlcy_hgt', 'lhtfl_sfc', 'ncpcp_sfc', 'pbl_hgt', 'pres_hybr', 'pres_msl', 'pres_pvor', 'pres_sfc', 'pvort_isen', 'pwat_eatm', 'rh_hybr', 'sfcr_sfc', 'shtfl_sfc', 'soilw_bgrnd', 'spfh_2m', 'spfh_pres', 'spfh_pres_abv700mb', 'tcdc_eatm', 'tmax_2m', 'tmin_2m', 'tmp_2m', 'tmp_hybr', 'tmp_pres', 'tmp_pres_abv700mb', 'tmp_pvor', 'tmp_sfc', 'tozne_eatm', 'tsoil_bgrnd', 'uflx_sfc', 'ugrd_hgt', 'ugrd_hybr', 'ugrd_pres', 'ugrd_pres_abv700mb', 'ugrd_pvor', 'ulwrf_sfc', 'ulwrf_tatm', 'uswrf_sfc', 'vflx_sfc', 'vgrd_hgt', 'vgrd_hybr', 'vgrd_pres', 'vgrd_pres_abv700mb', 'vgrd_pvor', 'vvel_pres', 'vvel_pres_abv700mb', 'watr_sfc', 'weasd_sfc']
n_ensembles = len(ensembles)
s3_so = {
'anon': True,
'skip_instance_cache': True
}
Try scan_grib on one grib file to deterine the number of messages
Scanning one file works if they all have the same number, but not all of the reforecast files do. It turns out that in these grib files, each file is for a specific variable, and the messages contain not only all the tau values but also the different levels. So for surface vars, there are 80 messages, but for variables with 4 levels there are 320 messages, etc.
%%time
out = scan_grib(flist[0], storage_options= s3_so)
CPU times: user 7.84 s, sys: 1.63 s, total: 9.47 s Wall time: 11.5 s
n_messages = len(out)
n_messages
80
messages = [f'{i:03d}' for i in range(n_messages)]
#client.close(); cluster.close()
individual_dir = 'individual_retro'
try:
fs_local.rm(individual_dir, recursive = True)
except:
pass
fs_local.mkdirs(individual_dir, exist_ok=True)
Path(flist[0]).parts
('s3:', 'noaa-gefs-retrospective', 'GEFSv12', 'reforecast', '2019', '2019122900', 'c00', 'Days:1-10', 'acpcp_sfc_2019122900_c00.grib2')
def make_json_name(url, grib_message_number, json_dir):
p = Path(url).parts
return f'{json_dir}/{Path(p[8]).stem}_m{grib_message_number:03d}.json'
test make_json_name on one grib file
make_json_name(flist[0], 0, individual_dir)
'individual_retro/acpcp_sfc_2019122900_c00_m000.json'
Define function to generate single JSONs
def gen_json(file_url, json_dir):
out = scan_grib(file_url, storage_options=s3_so)
for i, message in enumerate(out): # scan_grib outputs a list containing one reference file per grib message
out_file_name = make_json_name(file_url, i, json_dir) #get name
with fs_local.open(out_file_name, "w") as f:
f.write(ujson.dumps(message)) #write to file
Process all the messages in one grib file as a test
%%time
gen_json(flist[0], individual_dir)
flist[0]
CPU times: user 8.02 s, sys: 2.15 s, total: 10.2 s Wall time: 14.5 s
's3://noaa-gefs-retrospective/GEFSv12/reforecast/2019/2019122900/c00/Days:1-10/acpcp_sfc_2019122900_c00.grib2'
json_list = sorted(fs_local.ls(individual_dir))
len(json_list)
80
def return_ds(d):
fs_ = fsspec.filesystem("reference", fo=d, remote_protocol='s3', remote_options={'anon':True})
m = fs_.get_mapper("")
return xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False))
json_list[-1]
'/home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/acpcp_sfc_2019122900_c00_m079.json'
return_ds(json_list[-1])
/home/rsignell/miniconda3/envs/pangeo/lib/python3.9/site-packages/xarray/backends/plugins.py:71: RuntimeWarning: Engine 'rasterio' loading failed: libLerc.so.4: cannot open shared object file: No such file or directory warnings.warn(f"Engine {name!r} loading failed:\n{ex}", RuntimeWarning)
<xarray.Dataset> Dimensions: (latitude: 721, longitude: 1440, number: 1, step: 1, surface: 1, time: 1, valid_time: 1) Coordinates: * latitude (latitude) float64 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0 * longitude (longitude) float64 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8 * number (number) int64 0 * step (step) timedelta64[ns] 10 days * surface (surface) int64 0 * time (time) datetime64[ns] 2019-12-29 * valid_time (valid_time) datetime64[ns] 2020-01-08 Data variables: acpcp (latitude, longitude) float64 ... Attributes: centre: kwbc centreDescription: US National Weather Service - NCEP edition: 2 subCentre: 2
Use dask.distributed to create Jsons in parallel
#client.close(); cluster.close()
n_workers = 31
threads_per_worker = 2
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
client = Client(cluster)
flist = sorted(fs_s3.glob(f's3://noaa-gefs-retrospective/GEFSv12/reforecast/{year}/{date}/???/Days:1-10/*.grib2'))
flist = [f's3://{f}' for f in flist]
len(flist)
300
b = db.from_sequence(flist, npartitions=n_workers*threads_per_worker)
b1 = b.map(gen_json, individual_dir)
%%time
with performance_report(filename="dask-report.html"):
_ = b1.compute(retries=10)
CPU times: user 59.1 s, sys: 23.9 s, total: 1min 22s Wall time: 6min 52s
json_list = sorted(fs_local.ls(individual_dir))
print(len(json_list))
print(json_list[0])
print(json_list[-1])
86800 /home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/acpcp_sfc_2019122900_c00_m000.json /home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/weasd_sfc_2019122900_p04_m079.json
json_list[-1]
'/home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/weasd_sfc_2019122900_p04_m079.json'
return_ds(json_list[-1])
<xarray.Dataset> Dimensions: (latitude: 721, longitude: 1440, number: 1, step: 1, surface: 1, time: 1, valid_time: 1) Coordinates: * latitude (latitude) float64 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0 * longitude (longitude) float64 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8 * number (number) int64 4 * step (step) timedelta64[ns] 10 days * surface (surface) int64 0 * time (time) datetime64[ns] 2019-12-29 * valid_time (valid_time) datetime64[ns] 2020-01-08 Data variables: sdwe (latitude, longitude) float64 ... Attributes: centre: kwbc centreDescription: US National Weather Service - NCEP edition: 2 subCentre: 2
join along valid_time
time dimension for each message in each ensemble member for a specific date
valid_dir = 'valid'
try:
fs_local.rm(valid_dir, recursive=True)
except:
pass
fs_local.mkdirs(valid_dir, exist_ok=True)
json_list[0]
'/home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/acpcp_sfc_2019122900_c00_m000.json'
#fs_local.glob('individual_retro/hgt_hybr_2019123000_*.json')
#return_ds('/home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/hgt_hybr_2019123000_c00_m004.json')
fs_local.ls(individual_dir)[0]
'/home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/ugrd_pres_2019122900_p02_m253.json'
ensemble='c00'
pattern = f'{individual_dir}/{varnames[0]}_{date}_{ensemble}_m???.json'
print(pattern)
json_list = fs_local.glob(pattern)
len(json_list)
individual_retro/acpcp_sfc_2019122900_c00_m???.json
80
def var80(var):
pattern = f'{individual_dir}/{var}_{date}_{ensemble}_m???.json'
json_list = fs_local.glob(pattern)
if len(json_list)==80:
v = var
else:
v = 'null'
return v
%%time
ensemble='c00'
a = dask.compute(*[dask.delayed(var80)(v) for v in varnames], retries=10);
v80 = [v for v in a if 'null' not in v]
CPU times: user 1.76 s, sys: 809 ms, total: 2.57 s Wall time: 17.4 s
len(v80)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Input In [1], in <cell line: 1>() ----> 1 len(v80) NameError: name 'v80' is not defined
def combine_valid_time(date, ensemble):
for var in v80:
json_list = fs_local.glob(f'{individual_dir}/{var}_{date}_{ensemble}_*.json')
mzz = MultiZarrToZarr(json_list,
concat_dims = ['valid_time'],
remote_protocol='s3',
remote_options=dict(anon=True),
identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step'])
name = f'{valid_dir}/{Path(json_list[0]).parts[-1]}'
with fs_local.open(name, 'w') as f:
f.write(ujson.dumps(mzz.translate()))
json_list[0]
'/home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/acpcp_sfc_2019122900_c00_m000.json'
%%time
# Try one ensemble as a test
combine_valid_time(date, ensembles[0])
CPU times: user 43.7 s, sys: 1min 5s, total: 1min 49s Wall time: 1min 35s
json_list = sorted(fs_local.ls(valid_dir))
len(json_list)
37
ds = return_ds(json_list[0])
ds
<xarray.Dataset> Dimensions: (valid_time: 80, latitude: 721, longitude: 1440, number: 1, step: 1, surface: 1, time: 1) Coordinates: * latitude (latitude) float64 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0 * longitude (longitude) float64 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8 * number (number) int64 0 * step (step) timedelta64[ns] 03:00:00 * surface (surface) int64 0 * time (time) datetime64[ns] 2019-12-29 * valid_time (valid_time) datetime64[ns] 2019-12-29T03:00:00 ... 2020-01-08 Data variables: acpcp (valid_time, latitude, longitude) float64 ... Attributes: centre: kwbc centreDescription: US National Weather Service - NCEP edition: 2 subCentre: 2
ensembles
['c00', 'p01', 'p02', 'p03', 'p04']
%%time
_ = dask.compute(*[dask.delayed(combine_valid_time)(date,ensemble) for ensemble in ensembles], retries=10);
CPU times: user 10.3 s, sys: 4.11 s, total: 14.4 s Wall time: 1min 49s
json_list = sorted(fs_local.ls(valid_dir))
len(json_list)
185
%%time
ensemble_dir = 'ensemble'
try:
fs_local.rm(ensemble_dir, recursive = True)
except:
pass
fs_local.mkdirs(ensemble_dir)
CPU times: user 1.78 ms, sys: 213 µs, total: 1.99 ms Wall time: 3.4 ms
def combine_ensemble(date):
for var in v80:
json_list = fs_local.glob(f'{valid_dir}/{var}_{date}_???_m000.json')
mzz = MultiZarrToZarr(json_list,
concat_dims = ['number'],
remote_protocol='s3',
remote_options=dict(anon=True),
identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step'])
name = f'{ensemble_dir}/{Path(json_list[0]).parts[-1]}'
with fs_local.open(name, 'w') as f:
f.write(ujson.dumps(mzz.translate()))
%%time
combine_ensemble(date)
CPU times: user 1.28 s, sys: 326 ms, total: 1.61 s Wall time: 1.55 s
json_list = fs_local.glob(f'{ensemble_dir}/*.json')
len(json_list)
37
return_ds(json_list[0])
<xarray.Dataset> Dimensions: (number: 5, valid_time: 80, latitude: 721, longitude: 1440, step: 1, surface: 1, time: 1) Coordinates: * latitude (latitude) float64 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0 * longitude (longitude) float64 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8 * number (number) int64 0 1 2 3 4 * step (step) timedelta64[ns] 03:00:00 * surface (surface) int64 0 * time (time) datetime64[ns] 2019-12-29 * valid_time (valid_time) datetime64[ns] 2019-12-29T03:00:00 ... 2020-01-08 Data variables: acpcp (number, valid_time, latitude, longitude) float64 ... Attributes: centre: kwbc centreDescription: US National Weather Service - NCEP edition: 2 subCentre: 2
Here we open each reference message and determine what type of vertical level it contains. We will use this later to combine the messages alongs these levels
%%time
typeoflevel = {}
for i,ref in enumerate(json_list):
try:
ds = return_ds(ref)
dim = [dim for dim in list(ds.dims) if dim not in ['valid_time', 'x', 'y', 'step', 'time','latitude', 'longitude', 'number']] #I manually figure out which dims are common
typeoflevel[i] = dim[0]
except:
print(i, 'not included')
pass
CPU times: user 705 ms, sys: 134 ms, total: 840 ms Wall time: 702 ms
%%time
groups = {}
for key, value in sorted(typeoflevel.items()):
groups.setdefault(value, []).append(key)
CPU times: user 13 µs, sys: 8 µs, total: 21 µs Wall time: 24.6 µs
groups
{'surface': [0, 1, 2, 3, 4, 5, 6, 7, 9, 11, 12, 13, 16, 18, 19, 26, 28, 30, 32, 33, 35, 36], 'cloudCeiling': [8], 'heightAboveGroundLayer': [10], 'meanSea': [14], 'potentialVorticity': [15, 25, 29, 34], 'atmosphereSingleLayer': [17, 27], 'heightAboveGround': [20, 22, 23, 24], 'atmosphere': [21], 'nominalTop': [31]}
We can now use this groups dictionary to combine the compatible messages
groups_dir = 'groups'
#try:
# fs_local.rm(groups_dir, recursive = True)
#except:
# pass
#fs_local.mkdirs(groups_dir)
def combine_groups(json_list, group):
mzz = MultiZarrToZarr(json_list,
concat_dims = group,
remote_protocol='s3',
remote_options=dict(anon=True),
identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step'])
name = f'{groups_dir}/{date}_{group}.json'
with fs_local.open(name, 'w') as f:
f.write(ujson.dumps(mzz.translate()))
for group,glist in groups.items():
print(group,glist)
surface [0, 1, 2, 3, 4, 5, 6, 7, 9, 11, 12, 13, 16, 18, 19, 26, 28, 30, 32, 33, 35, 36] cloudCeiling [8] heightAboveGroundLayer [10] meanSea [14] potentialVorticity [15, 25, 29, 34] atmosphereSingleLayer [17, 27] heightAboveGround [20, 22, 23, 24] atmosphere [21] nominalTop [31]
json_list[0]
'/home/rsignell/EarthMap/Projects/notebooks/gefs/ensemble/acpcp_sfc_2019122900_c00_m000.json'
for group,igroup in groups.items():
json_list=[]
print(group, len(igroup))
for i in igroup:
json_list.append(f'{ensemble_dir}/{v80[i]}_{date}_c00_m000.json')
combine_groups(json_list, group)
surface 22 cloudCeiling 1 heightAboveGroundLayer 1 meanSea 1 potentialVorticity 4 atmosphereSingleLayer 2 heightAboveGround 4
ValueError: Values being mapped cannot also be identical
This leaves us with 6 reference files which we can now use to open the GEFS data as an xarray-datatree
json_list = fs_local.glob(f"{groups_dir}/*.json")
json_list
['/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019122900_atmosphereSingleLayer.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019122900_cloudCeiling.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019122900_heightAboveGroundLayer.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019122900_meanSea.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019122900_potentialVorticity.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019122900_surface.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019123000_atmosphereSingleLayer.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019123000_cloudCeiling.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019123000_heightAboveGroundLayer.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019123000_meanSea.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019123000_potentialVorticity.json', '/home/rsignell/EarthMap/Projects/notebooks/gefs/groups/2019123000_surface.json']
import xarray_fmrc
Path(dates[-3]).stem
'2019122900'
ds0 = return_ds(f'{groups_dir}/{Path(dates[-3]).stem}_surface.json')
ds0 = ds0.rename({'time':'forecast_reference_time'})
ds0 = ds0.rename({'valid_time':'time'})
ds0
<xarray.Dataset> Dimensions: (surface: 1, number: 5, time: 80, latitude: 721, longitude: 1440, step: 1, forecast_reference_time: 1) Coordinates: * latitude (latitude) float64 90.0 89.75 89.5 ... -89.75 -90.0 * longitude (longitude) float64 0.0 0.25 0.5 ... 359.5 359.8 * number (number) int64 0 1 2 3 4 * step (step) timedelta64[ns] 03:00:00 * surface (surface) int64 0 * forecast_reference_time (forecast_reference_time) datetime64[ns] 2019-12-29 * time (time) datetime64[ns] 2019-12-29T03:00:00 ... 20... Data variables: (12/22) acpcp (surface, number, time, latitude, longitude) float64 ... cape (surface, number, time, latitude, longitude) float64 ... cin (surface, number, time, latitude, longitude) float64 ... dlwrf (surface, number, time, latitude, longitude) float64 ... dswrf (surface, number, time, latitude, longitude) float64 ... gflux (surface, number, time, latitude, longitude) float64 ... ... ... tp (surface, number, time, latitude, longitude) float64 ... uflx (surface, number, time, latitude, longitude) float64 ... ulwrf (surface, number, time, latitude, longitude) float64 ... uswrf (surface, number, time, latitude, longitude) float64 ... vflx (surface, number, time, latitude, longitude) float64 ... watr (surface, number, time, latitude, longitude) float64 ... Attributes: centre: kwbc centreDescription: US National Weather Service - NCEP edition: 2 subCentre: 2
ds1 = return_ds(f'{groups_dir}/{Path(dates[-2]).stem}_surface.json')
ds1 = ds1.rename({'time':'forecast_reference_time'})
ds1 = ds1.rename({'valid_time':'time'})
dt = xarray_fmrc.from_model_runs([ds0, ds1])
dt
<xarray.DatasetView> Dimensions: (forecast_reference_time: 2, constant_forecast: 88, constant_offset: 80) Coordinates: * forecast_reference_time (forecast_reference_time) datetime64[ns] 2019-12... * constant_forecast (constant_forecast) datetime64[ns] 2019-12-29T03... * constant_offset (constant_offset) timedelta64[ns] 03:00:00 ... 1... Data variables: model_run_path (forecast_reference_time) <U29 'model_run/2019-1...
%%time
dt.fmrc.constant_offset("3h")
<xarray.Dataset> Dimensions: (latitude: 721, longitude: 1440, number: 5, step: 1, surface: 1, forecast_reference_time: 2, time: 2) Coordinates: * latitude (latitude) float64 90.0 89.75 89.5 ... -89.75 -90.0 * longitude (longitude) float64 0.0 0.25 0.5 ... 359.5 359.8 * number (number) int64 0 1 2 3 4 * step (step) timedelta64[ns] 03:00:00 * surface (surface) int64 0 * forecast_reference_time (forecast_reference_time) datetime64[ns] 2019-12... * time (time) datetime64[ns] 2019-12-29T03:00:00 2019-1... forecast_offset timedelta64[ns] 03:00:00 Data variables: (12/22) acpcp (time, surface, number, latitude, longitude) float64 ... cape (time, surface, number, latitude, longitude) float64 ... cin (time, surface, number, latitude, longitude) float64 ... dlwrf (time, surface, number, latitude, longitude) float64 ... dswrf (time, surface, number, latitude, longitude) float64 ... gflux (time, surface, number, latitude, longitude) float64 ... ... ... tp (time, surface, number, latitude, longitude) float64 ... uflx (time, surface, number, latitude, longitude) float64 ... ulwrf (time, surface, number, latitude, longitude) float64 ... uswrf (time, surface, number, latitude, longitude) float64 ... vflx (time, surface, number, latitude, longitude) float64 ... watr (time, surface, number, latitude, longitude) float64 ... Attributes: centre: kwbc centreDescription: US National Weather Service - NCEP edition: 2 subCentre: 2
%%time
dt.fmrc.best()
2023-04-25 08:20:29,278 - distributed.nanny - WARNING - Restarting worker
datasets = {}
for level in groups:
datasets[level] = return_ds(f'{groups_dir}/{date}_{level}.json')
dt = DataTree.from_dict(datasets)
dt.groups
ds_surface = dt['surface'].squeeze('surface',drop=True).squeeze('step',drop=True)
ds_surface
da = ds_surface['t']
da = da.assign_coords(longitude=(((ds.longitude + 180) % 360) - 180)).sortby('longitude')
da
for coord in da.indexes:
print(coord)
pn.pane.HoloViews.widgets[coord]=pn.widgets.Select
%%time
das = da.sel(valid_time='2020-01-09 18:00', method='nearest').load()
das.hvplot(x='longitude', y='latitude', rasterize=True, alpha=0.65, cmap='viridis',
global_extent=True, geo=True, tiles='OSM')
dt['potentialVorticity']
dt
group = 'surface'
json_list = fs_local.glob(f"{groups_dir}/*_{group}.json")
json_list
return_ds(json_list[1])
dates_dir = 'dates'
try:
fs_local.rm(dates_dir, recursive = True)
except:
pass
fs_local.mkdirs(dates_dir)
def combine_dates(json_list, group):
mzz = MultiZarrToZarr(json_list,
concat_dims = 'time',
remote_protocol='s3',
remote_options=dict(anon=True),
identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step'])
name = f'{dates_dir}/{date}_{group}.json'
with fs_local.open(name, 'w') as f:
f.write(ujson.dumps(mzz.translate()))
combine_dates(json_list[:2], 'surface')
json_list = fs_local.glob(f'{dates_dir}/*.json')
json_list
ds_surface = return_ds(json_list[0])