#!/usr/bin/env python # coding: utf-8 # # Use Kerchunk to open a GEFS reforecast collection as an xarray-datatree # Data contained in these grib2 files described here: https://www.nco.ncep.noaa.gov/pmb/products/gens/ # # Approach: # * create a list of grib files to process # * use scan_grib to create JSON references for each message in each grib file. # * combine JSONs along the `valid_time` dimension (the forecast hour, or `tau` dimension) # * combine JSONs along the `ensemble dimension` # * combine JSONs for all variables (messages) with similar vertical coordinates # * combine along the `time` dimension (the daily forecast date, 00 each day) # * read the datasets (each with a different vertical coordinate) into a DataTree # # In[1]: get_ipython().run_line_magic('xmode', 'minimal') # In[2]: import os import fsspec from datetime import datetime, timedelta import xarray as xr import ujson import kerchunk from kerchunk.grib2 import scan_grib from kerchunk.combine import MultiZarrToZarr from pathlib import Path import dask from dask.distributed import LocalCluster, Client, performance_report import dask.bag as db from datatree import DataTree import panel as pn import hvplot.xarray # In[3]: xr.__version__ # Create a set of fsspec file systems to read the latest GEFS forecast and write the reference files locally # In[4]: fs_local = fsspec.filesystem('', skip_instance_cache = True, use_listings_cache=False) fs_s3 = fsspec.filesystem('s3', anon = True) # **s3://noaa-gefs-pds/gefs.{date}/{cycle}/atmos/pgrb2ap5/gep{ensemble_member}.t{cycle}.pgrb2a.0p50.f{forecast_hour}** # In[5]: bucket = 's3://noaa-gefs-retrospective' flist = fs_s3.ls(bucket) flist # In[6]: dates = fs_s3.glob(f'{bucket}/GEFSv12/reforecast/2019/??????????') print(len(dates)) print(dates[0]) print(dates[-1]) # In[7]: fs_s3.ls(dates[-1]) # ## Select a date # In[8]: date = Path(dates[-3]).name print(date) print(date[:4]) # ## Determine number of ensemble members # search for a specific cycle, forecast hour, variable group and date # In[9]: cycle = '00' # using the 00z cycle (cycles at 00z, 06z, 12z, 18z, but only 00 is distributed) tau = '000' #date = '2019123000' year = date[:4] f = fs_s3.glob(f's3://noaa-gefs-retrospective/GEFSv12/reforecast/{year}/{date}/???/Days:1-10/weasd_sfc_{date}_???.grib2') np_ensembles = len(f) print(np_ensembles) f # In[10]: ensembles = ['c00', 'p01', 'p02', 'p03', 'p04'] # ## Determine number of variables for each forecast # search for a ensemble member and date # In[11]: flist = sorted(fs_s3.glob(f's3://noaa-gefs-retrospective/GEFSv12/reforecast/{year}/{date}/c00/Days:1-10/*.grib2')) flist = [f's3://{f}' for f in flist] len(flist) # Make a list of variable names (extracting them from the grib file names) # In[12]: varnames = [] for f in flist: file = Path(f).parts[8] p = file.split('_') pp = p[:-2] var = '_'.join(pp) varnames.append(var) # In[13]: varnames # In[14]: n_ensembles = len(ensembles) # In[15]: s3_so = { 'anon': True, 'skip_instance_cache': True } # Try scan_grib on one grib file to deterine the number of messages # # Scanning one file works if they all have the same number, but not all of the reforecast files do. It turns out that in these grib files, each file is for a specific variable, and the messages contain not only all the tau values but also the different levels. So for surface vars, there are 80 messages, but for variables with 4 levels there are 320 messages, etc. # In[17]: get_ipython().run_cell_magic('time', '', 'out = scan_grib(flist[0], storage_options= s3_so)\n') # In[18]: n_messages = len(out) n_messages # In[19]: messages = [f'{i:03d}' for i in range(n_messages)] # In[ ]: #client.close(); cluster.close() # In[20]: individual_dir = 'individual_retro' # In[21]: try: fs_local.rm(individual_dir, recursive = True) except: pass fs_local.mkdirs(individual_dir, exist_ok=True) # In[22]: Path(flist[0]).parts # In[23]: def make_json_name(url, grib_message_number, json_dir): p = Path(url).parts return f'{json_dir}/{Path(p[8]).stem}_m{grib_message_number:03d}.json' # test make_json_name on one grib file # In[24]: make_json_name(flist[0], 0, individual_dir) # Define function to generate single JSONs # In[25]: def gen_json(file_url, json_dir): out = scan_grib(file_url, storage_options=s3_so) for i, message in enumerate(out): # scan_grib outputs a list containing one reference file per grib message out_file_name = make_json_name(file_url, i, json_dir) #get name with fs_local.open(out_file_name, "w") as f: f.write(ujson.dumps(message)) #write to file # Process all the messages in one grib file as a test # In[26]: get_ipython().run_cell_magic('time', '', 'gen_json(flist[0], individual_dir)\nflist[0]\n') # In[27]: json_list = sorted(fs_local.ls(individual_dir)) len(json_list) # In[28]: def return_ds(d): fs_ = fsspec.filesystem("reference", fo=d, remote_protocol='s3', remote_options={'anon':True}) m = fs_.get_mapper("") return xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) # In[29]: json_list[-1] # In[30]: return_ds(json_list[-1]) # Use dask.distributed to create Jsons in parallel # In[ ]: #client.close(); cluster.close() # In[ ]: n_workers = 31 threads_per_worker = 2 cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) client = Client(cluster) # In[36]: flist = sorted(fs_s3.glob(f's3://noaa-gefs-retrospective/GEFSv12/reforecast/{year}/{date}/???/Days:1-10/*.grib2')) flist = [f's3://{f}' for f in flist] len(flist) # In[38]: b = db.from_sequence(flist, npartitions=n_workers*threads_per_worker) # In[39]: b1 = b.map(gen_json, individual_dir) # In[40]: get_ipython().run_cell_magic('time', '', 'with performance_report(filename="dask-report.html"):\n _ = b1.compute(retries=10)\n') # In[41]: json_list = sorted(fs_local.ls(individual_dir)) print(len(json_list)) print(json_list[0]) print(json_list[-1]) # In[42]: json_list[-1] # In[43]: return_ds(json_list[-1]) # ## Combine along the forecast time (tau) "valid_time" dimension # join along `valid_time` time dimension for each message in each ensemble member for a specific date # In[44]: valid_dir = 'valid' # In[45]: try: fs_local.rm(valid_dir, recursive=True) except: pass fs_local.mkdirs(valid_dir, exist_ok=True) # In[46]: json_list[0] # In[47]: #fs_local.glob('individual_retro/hgt_hybr_2019123000_*.json') # In[48]: #return_ds('/home/rsignell/EarthMap/Projects/notebooks/gefs/individual_retro/hgt_hybr_2019123000_c00_m004.json') # In[49]: fs_local.ls(individual_dir)[0] # In[50]: ensemble='c00' # In[51]: pattern = f'{individual_dir}/{varnames[0]}_{date}_{ensemble}_m???.json' print(pattern) json_list = fs_local.glob(pattern) len(json_list) # In[52]: def var80(var): pattern = f'{individual_dir}/{var}_{date}_{ensemble}_m???.json' json_list = fs_local.glob(pattern) if len(json_list)==80: v = var else: v = 'null' return v # In[53]: get_ipython().run_cell_magic('time', '', "ensemble='c00'\n\na = dask.compute(*[dask.delayed(var80)(v) for v in varnames], retries=10);\n\nv80 = [v for v in a if 'null' not in v]\n") # In[1]: len(v80) # In[55]: def combine_valid_time(date, ensemble): for var in v80: json_list = fs_local.glob(f'{individual_dir}/{var}_{date}_{ensemble}_*.json') mzz = MultiZarrToZarr(json_list, concat_dims = ['valid_time'], remote_protocol='s3', remote_options=dict(anon=True), identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step']) name = f'{valid_dir}/{Path(json_list[0]).parts[-1]}' with fs_local.open(name, 'w') as f: f.write(ujson.dumps(mzz.translate())) # In[56]: json_list[0] # In[57]: get_ipython().run_cell_magic('time', '', '# Try one ensemble as a test\ncombine_valid_time(date, ensembles[0])\n') # In[58]: json_list = sorted(fs_local.ls(valid_dir)) len(json_list) # In[59]: ds = return_ds(json_list[0]) # In[60]: ds # ## Do ensembles in parallel # In[61]: ensembles # In[62]: get_ipython().run_cell_magic('time', '', '_ = dask.compute(*[dask.delayed(combine_valid_time)(date,ensemble) for ensemble in ensembles], retries=10);\n') # In[63]: json_list = sorted(fs_local.ls(valid_dir)) len(json_list) # ## Combine across ensemble members # In[64]: get_ipython().run_cell_magic('time', '', "ensemble_dir = 'ensemble'\n\ntry:\n fs_local.rm(ensemble_dir, recursive = True)\nexcept:\n pass\nfs_local.mkdirs(ensemble_dir)\n") # In[67]: def combine_ensemble(date): for var in v80: json_list = fs_local.glob(f'{valid_dir}/{var}_{date}_???_m000.json') mzz = MultiZarrToZarr(json_list, concat_dims = ['number'], remote_protocol='s3', remote_options=dict(anon=True), identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step']) name = f'{ensemble_dir}/{Path(json_list[0]).parts[-1]}' with fs_local.open(name, 'w') as f: f.write(ujson.dumps(mzz.translate())) # In[68]: get_ipython().run_cell_magic('time', '', 'combine_ensemble(date)\n') # In[86]: json_list = fs_local.glob(f'{ensemble_dir}/*.json') len(json_list) # In[70]: return_ds(json_list[0]) # ### Determine which variables can be combined into a single dataset # In[ ]: # Here we open each reference message and determine what type of vertical level it contains. We will use this later to combine the messages alongs these levels # In[87]: get_ipython().run_cell_magic('time', '', "typeoflevel = {}\nfor i,ref in enumerate(json_list):\n try:\n ds = return_ds(ref)\n dim = [dim for dim in list(ds.dims) if dim not in ['valid_time', 'x', 'y', 'step', 'time','latitude', 'longitude', 'number']] #I manually figure out which dims are common\n typeoflevel[i] = dim[0]\n except:\n print(i, 'not included')\n pass\n") # In[88]: get_ipython().run_cell_magic('time', '', 'groups = {}\nfor key, value in sorted(typeoflevel.items()):\n groups.setdefault(value, []).append(key)\n') # In[89]: groups # We can now use this groups dictionary to combine the compatible messages # In[90]: groups_dir = 'groups' # In[91]: #try: # fs_local.rm(groups_dir, recursive = True) #except: # pass #fs_local.mkdirs(groups_dir) # In[92]: def combine_groups(json_list, group): mzz = MultiZarrToZarr(json_list, concat_dims = group, remote_protocol='s3', remote_options=dict(anon=True), identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step']) name = f'{groups_dir}/{date}_{group}.json' with fs_local.open(name, 'w') as f: f.write(ujson.dumps(mzz.translate())) # In[77]: for group,glist in groups.items(): print(group,glist) # In[94]: json_list[0] # In[97]: for group,igroup in groups.items(): json_list=[] print(group, len(igroup)) for i in igroup: json_list.append(f'{ensemble_dir}/{v80[i]}_{date}_c00_m000.json') combine_groups(json_list, group) # This leaves us with 6 reference files which we can now use to open the GEFS data as an xarray-datatree # In[82]: json_list = fs_local.glob(f"{groups_dir}/*.json") json_list # In[98]: import xarray_fmrc # In[101]: Path(dates[-3]).stem # In[111]: ds0 = return_ds(f'{groups_dir}/{Path(dates[-3]).stem}_surface.json') # In[112]: ds0 = ds0.rename({'time':'forecast_reference_time'}) ds0 = ds0.rename({'valid_time':'time'}) # In[113]: ds0 # In[114]: ds1 = return_ds(f'{groups_dir}/{Path(dates[-2]).stem}_surface.json') ds1 = ds1.rename({'time':'forecast_reference_time'}) ds1 = ds1.rename({'valid_time':'time'}) # In[115]: dt = xarray_fmrc.from_model_runs([ds0, ds1]) # In[118]: dt # In[119]: get_ipython().run_cell_magic('time', '', 'dt.fmrc.constant_offset("3h")\n') # In[ ]: get_ipython().run_cell_magic('time', '', 'dt.fmrc.best()\n') # In[ ]: datasets = {} for level in groups: datasets[level] = return_ds(f'{groups_dir}/{date}_{level}.json') # In[ ]: dt = DataTree.from_dict(datasets) # In[ ]: dt.groups # In[ ]: ds_surface = dt['surface'].squeeze('surface',drop=True).squeeze('step',drop=True) # In[ ]: ds_surface # In[ ]: da = ds_surface['t'] # In[ ]: da = da.assign_coords(longitude=(((ds.longitude + 180) % 360) - 180)).sortby('longitude') # In[ ]: da # In[ ]: for coord in da.indexes: print(coord) pn.pane.HoloViews.widgets[coord]=pn.widgets.Select # In[ ]: get_ipython().run_cell_magic('time', '', "das = da.sel(valid_time='2020-01-09 18:00', method='nearest').load()\n") # In[ ]: das.hvplot(x='longitude', y='latitude', rasterize=True, alpha=0.65, cmap='viridis', global_extent=True, geo=True, tiles='OSM') # In[ ]: dt['potentialVorticity'] # In[ ]: dt # In[ ]: # In[ ]: group = 'surface' # In[ ]: json_list = fs_local.glob(f"{groups_dir}/*_{group}.json") json_list # In[ ]: return_ds(json_list[1]) # In[ ]: dates_dir = 'dates' # In[ ]: try: fs_local.rm(dates_dir, recursive = True) except: pass fs_local.mkdirs(dates_dir) # In[ ]: def combine_dates(json_list, group): mzz = MultiZarrToZarr(json_list, concat_dims = 'time', remote_protocol='s3', remote_options=dict(anon=True), identical_dims=['latitude', 'longitude', 'heightAboveGround', 'step']) name = f'{dates_dir}/{date}_{group}.json' with fs_local.open(name, 'w') as f: f.write(ujson.dumps(mzz.translate())) # In[ ]: # In[ ]: combine_dates(json_list[:2], 'surface') # In[ ]: json_list = fs_local.glob(f'{dates_dir}/*.json') json_list # In[ ]: ds_surface = return_ds(json_list[0])