import time
import datetime as dt
import glob
import logging
import dask
import fsspec
import ujson
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.grib2 import scan_grib
from tqdm import tqdm
fs = fsspec.filesystem('', skip_instance_cache = True, use_listings_cache=False)
fs_dd = fsspec.filesystem('https')
afilter = {"typeOfLevel": "heightAboveGround"}
# fsspec object where to put the actual location for kerchunk output (json)
fs_write = fsspec.filesystem('')
local_json_dir = 'cmc-hrdps'
try:
fs.rm(local_json_dir, recursive = True)
except:
pass
fs.mkdirs(local_json_dir, exist_ok=True)
def make_json_name_cmc(file_url, grib_message_number, temp_dir): #create a unique name for each reference file
items = file_url.split('/')
name = items[-1]
return f'{temp_dir}/{name}_m{grib_message_number:03d}.json'
def gen_json_cmc(file_url, json_dir):
out = scan_grib(file_url)
for i, message in enumerate(out): # scan_grib outputs a list containing one reference file per grib message
out_file_name = make_json_name_cmc(file_url, i, json_dir) #get name
with fs.open(out_file_name, "w") as f:
f.write(ujson.dumps(message)) #write to file
return out_file_name
# url_base = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/"
url_base = "https://hpfx.collab.science.gc.ca/20230503/WXO-DD/model_hrdps/continental/2.5km/"
MR_FH = "00/003/"
var_list = ["*WIND_ISBL_*.grib2", "*WDIR_ISBL_*.grib2", "*TMP_ISBL_*.grib2", "*DEPR_ISBL_*.grib2"]
%%time
dd_gribs = []
# for var in var_list:
# dd_gribs.append(fs_dd.glob(url_base + MR_FH + var,recursive=True))
for var in tqdm(var_list):
dd_gribs += fs_dd.glob(url_base + MR_FH + var,recursive=True)
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 7.24it/s]
CPU times: user 80.8 ms, sys: 13.7 ms, total: 94.5 ms Wall time: 569 ms
len(dd_gribs)
112
%%time
# kerchunk grib2 files globbed on dd TO json files on the local filesystem
# NO DASK
dd_reference_jsons = []
for f in tqdm(dd_gribs):
# tic = time.perf_counter()
dd_reference_jsons.append(gen_json_cmc(f, local_json_dir))
# toc = time.perf_counter()
# print(f"Kerchunked {f} in {toc - tic:0.4f} seconds")
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████| 112/112 [12:21<00:00, 6.62s/it]
CPU times: user 9min 1s, sys: 4.76 s, total: 9min 6s Wall time: 12min 21s
# dd_reference_jsons
wind_list = dd_reference_jsons[:28]
len(wind_list)
28
wdir_list = dd_reference_jsons[28:56]
len(wdir_list)
28
tmp_list = dd_reference_jsons[56:84]
len(tmp_list)
28
depr_list = dd_reference_jsons[84:]
len(tmp_list)
28
# First temperature file
tmp1 = dd_reference_jsons[56]
#open dataset as zarr object using fsspec reference file system and xarray
fs_tmp1 = fsspec.filesystem("reference", fo=tmp1, remote_protocol='https')
m_tmp1 = fs_tmp1.get_mapper("")
ds_tmp1 = xr.open_dataset(m_tmp1, engine="zarr", backend_kwargs=dict(consolidated=False), chunks='auto')
ds_tmp1
<xarray.Dataset> Dimensions: (isobaricInhPa: 1, y: 1290, x: 2540, step: 1, time: 1, valid_time: 1) Coordinates: * isobaricInhPa (isobaricInhPa) int64 50 * step (step) timedelta64[ns] 03:00:00 * time (time) datetime64[ns] 2023-05-03 * valid_time (valid_time) datetime64[ns] 2023-05-03T03:00:00 Dimensions without coordinates: y, x Data variables: latitude (y, x) float64 dask.array<chunksize=(1290, 2540), meta=np.ndarray> longitude (y, x) float64 dask.array<chunksize=(1290, 2540), meta=np.ndarray> t (y, x) float64 dask.array<chunksize=(1290, 2540), meta=np.ndarray> Attributes: centre: cwao centreDescription: Canadian Meteorological Service - Montreal edition: 2 subCentre: 0
%%time
ds_tmp1['t'].loc[720,1720].values # around 4s !
CPU times: user 993 ms, sys: 43.6 ms, total: 1.04 s Wall time: 1.97 s
array(217.19422103)
Combine all jsons (reference zarrs) into one
#combine individual references into a single consolidated reference
mzz_tmp = MultiZarrToZarr(tmp_list,
concat_dims = ['isobaricInhPa'],
identical_dims=['latitude', 'longitude', 'valid_time', 'step'] )
d_tmp = mzz_tmp.translate()
#open dataset as zarr object using fsspec reference file system and xarray
fs_tmp = fsspec.filesystem("reference", fo=d_tmp, remote_protocol='https')
m_tmp = fs_tmp.get_mapper("")
ds_tmp = xr.open_dataset(m_tmp, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={})
ds_tmp
<xarray.Dataset> Dimensions: (isobaricInhPa: 28, y: 1290, x: 2540, step: 1, time: 1, valid_time: 1) Coordinates: * isobaricInhPa (isobaricInhPa) int64 50 100 150 175 ... 970 985 1000 1015 * step (step) timedelta64[ns] 03:00:00 * time (time) datetime64[ns] 2023-05-03 * valid_time (valid_time) datetime64[ns] 2023-05-03T03:00:00 Dimensions without coordinates: y, x Data variables: latitude (y, x) float64 dask.array<chunksize=(1290, 2540), meta=np.ndarray> longitude (y, x) float64 dask.array<chunksize=(1290, 2540), meta=np.ndarray> t (isobaricInhPa, y, x) float64 dask.array<chunksize=(1, 1290, 2540), meta=np.ndarray> Attributes: centre: cwao centreDescription: Canadian Meteorological Service - Montreal edition: 2 subCentre: 0
Time series at one point for all 28 *TMP* grib2 files - https
%%time
ds_tmp['t'].loc[:,720,1720].values # Wall time is about 28 * wall_time_of{ds_tmp1['t'].loc[720,1720].values} above
CPU times: user 44.9 s, sys: 1.04 s, total: 46 s Wall time: 1min 1s
array([217.19422103, 217.05503728, 223.39968592, 227.79916232, 228.71105117, 226.28093133, 223.36361208, 221.29990511, 222.47892497, 230.28447378, 237.37581923, 243.44962139, 248.84367273, 253.9872789 , 258.71164498, 262.8021069 , 266.13688732, 269.28591677, 270.76037277, 270.596149 , 272.31026406, 274.4411197 , 276.58866119, 278.60517044, 279.33524897, 279.76682893, 280.38654675, 280.98604342])