#!/usr/bin/env python # coding: utf-8 # In[1]: import time import datetime as dt import glob import logging import dask import fsspec import ujson import xarray as xr from distributed import Client from kerchunk.combine import MultiZarrToZarr from kerchunk.grib2 import scan_grib from tqdm import tqdm # In[2]: fs = fsspec.filesystem('', skip_instance_cache = True, use_listings_cache=False) fs_dd = fsspec.filesystem('https') # In[3]: afilter = {"typeOfLevel": "heightAboveGround"} # fsspec object where to put the actual location for kerchunk output (json) fs_write = fsspec.filesystem('') local_json_dir = 'cmc-hrdps' try: fs.rm(local_json_dir, recursive = True) except: pass fs.mkdirs(local_json_dir, exist_ok=True) # In[4]: def make_json_name_cmc(file_url, grib_message_number, temp_dir): #create a unique name for each reference file items = file_url.split('/') name = items[-1] return f'{temp_dir}/{name}_m{grib_message_number:03d}.json' def gen_json_cmc(file_url, json_dir): out = scan_grib(file_url) for i, message in enumerate(out): # scan_grib outputs a list containing one reference file per grib message out_file_name = make_json_name_cmc(file_url, i, json_dir) #get name with fs.open(out_file_name, "w") as f: f.write(ujson.dumps(message)) #write to file return out_file_name # # HRDPS grib file selection # ## Create Input File List # In[5]: # url_base = "https://dd.weather.gc.ca/model_hrdps/continental/2.5km/" url_base = "https://hpfx.collab.science.gc.ca/20230503/WXO-DD/model_hrdps/continental/2.5km/" MR_FH = "00/003/" var_list = ["*WIND_ISBL_*.grib2", "*WDIR_ISBL_*.grib2", "*TMP_ISBL_*.grib2", "*DEPR_ISBL_*.grib2"] # In[6]: get_ipython().run_cell_magic('time', '', 'dd_gribs = []\n# for var in var_list:\n# dd_gribs.append(fs_dd.glob(url_base + MR_FH + var,recursive=True))\nfor var in tqdm(var_list):\n dd_gribs += fs_dd.glob(url_base + MR_FH + var,recursive=True)\n') # In[8]: len(dd_gribs) # # Kerchunk files on local filesystem # In[9]: get_ipython().run_cell_magic('time', '', '# kerchunk grib2 files globbed on dd TO json files on the local filesystem\n# NO DASK\ndd_reference_jsons = []\nfor f in tqdm(dd_gribs):\n # tic = time.perf_counter()\n dd_reference_jsons.append(gen_json_cmc(f, local_json_dir))\n # toc = time.perf_counter()\n # print(f"Kerchunked {f} in {toc - tic:0.4f} seconds")\n \n') # In[11]: # dd_reference_jsons # In[12]: wind_list = dd_reference_jsons[:28] len(wind_list) # In[13]: wdir_list = dd_reference_jsons[28:56] len(wdir_list) # In[14]: tmp_list = dd_reference_jsons[56:84] len(tmp_list) # In[15]: depr_list = dd_reference_jsons[84:] len(tmp_list) # In[16]: # First temperature file tmp1 = dd_reference_jsons[56] # In[17]: #open dataset as zarr object using fsspec reference file system and xarray fs_tmp1 = fsspec.filesystem("reference", fo=tmp1, remote_protocol='https') m_tmp1 = fs_tmp1.get_mapper("") ds_tmp1 = xr.open_dataset(m_tmp1, engine="zarr", backend_kwargs=dict(consolidated=False), chunks='auto') # In[18]: ds_tmp1 # In[19]: get_ipython().run_cell_magic('time', '', "ds_tmp1['t'].loc[720,1720].values # around 4s !\n") # Combine all jsons (reference zarrs) into one # In[20]: #combine individual references into a single consolidated reference mzz_tmp = MultiZarrToZarr(tmp_list, concat_dims = ['isobaricInhPa'], identical_dims=['latitude', 'longitude', 'valid_time', 'step'] ) # In[21]: d_tmp = mzz_tmp.translate() # In[22]: #open dataset as zarr object using fsspec reference file system and xarray fs_tmp = fsspec.filesystem("reference", fo=d_tmp, remote_protocol='https') m_tmp = fs_tmp.get_mapper("") ds_tmp = xr.open_dataset(m_tmp, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={}) # In[24]: ds_tmp # Time series at one point for all 28 \*TMP\* grib2 files - https # In[23]: get_ipython().run_cell_magic('time', '', "ds_tmp['t'].loc[:,720,1720].values # Wall time is about 28 * wall_time_of{ds_tmp1['t'].loc[720,1720].values} above\n") # In[ ]: