#!/usr/bin/env python # coding: utf-8 # # Master hindcast extractor # # My latest code for optimized extraction of long hindcast fields # # The local extraction method uses [Dask](https://dask.org) and [xarray.open_mfdataset](http://xarray.pydata.org/en/stable/io.html#reading-multi-file-datasets) based on Doug's optimized workflow in [dask-expts](https://nbviewer.jupyter.org/urls/bitbucket.org/salishsea/analysis-doug/raw/default/notebooks/dask-expts/dask_expts.ipynb). \ # *I get about 1 h extraction time per year of hourly surface fields across 12 Salish workers using this method.* # # The ERDDAP method is much slower but publically accessible and requires no user-side CPU resources. \ # *Extraction time is about 12 h per year of hourly surface fields.* # # Both methods extract in monthy chunks and save to yearly files. The code is designed to combine variables on the *same* model grid (temperature and nitrate for example) but not across different grids (so velocity separate from temperate separate from wind fields, run multiple times to extract these separately). # # *** # In[1]: import numpy as np import xarray as xr import matplotlib.pyplot as plt import dask import os from dask.distributed import Client from datetime import datetime, timedelta from calendar import monthrange from contextlib import ExitStack from tqdm.notebook import tqdm from salishsea_tools import grid_tools # *** # # ### Functions # # Helper functions # In[2]: def make_prefix(date, paths, model='NEMO', res='1h'): """Construct path prefix for local SalishSeaCast results given date object and paths dict. e.g., /results/SalishSea/hindcast.201905/ddmmmyy/SalishSea_1h_yyyymmdd_yyyymmdd """ if model == 'NEMO': if (date.year >= 2013) or (date.year <= 2016): path = paths['NEMO1'] else: path = paths['NEMO2'] datestr = '_'.join(np.repeat(date.strftime('%Y%m%d'), 2)) prefix = os.path.join(path, date.strftime('%d%b%y').lower(), f'SalishSea_{res}_{datestr}') elif model == 'HRDPS': if date > datetime(2014, 9, 11): prefix = os.path.join(paths['HRDPS'], 'operational', 'ops_' + date.strftime('y%Ym%md%d')) else: prefix = os.path.join(paths['HRDPS'], 'gemlam', 'gemlam_' + date.strftime('y%Ym%md%d')) else: return ValueError(f'Unknown model: {model}') return prefix def load_paths(): """ """ paths = { 'NEMO1': '/results/SalishSea/hindcast.201905', 'NEMO2': '/results2/SalishSea/nowcast-green.201905', 'HRDPS': '/results/forcing/atmospheric/GEM2.5', 'erddap': 'https://salishsea.eos.ubc.ca/erddap/griddap', 'out': '/ocean/bmoorema/research/MEOPAR/analysis-ben/data/SalishSeaCast', } return paths def load_netCDF_keys(filesystem='errdap'): """ """ # NetCDF file keys master dict if filesystem == 'errdap': key_dict = { 'temperature': 'g3DTracer', 'salinity': 'g3DTracer', 'nitrate': 'g3DBiology', 'uVelocity': 'g3DuGrid', 'vVelocity': 'g3DvGrid', 'u_wind': 'aSurfaceAtmosphere', 'v_wind': 'aSurfaceAtmosphere', } elif filesystem == 'local': key_dict = { 'votemper': 'grid_T', 'vosaline': 'grid_T', 'nitrate': 'ptrc_T', 'vozocrtx': 'grid_U', 'vomecrty': 'grid_V', 'u_wind': 'ops', 'v_wind': 'ops', } return key_dict def extract_variables( data_vars, ds, variables, key_dict, dates=[None], dim='time', indices={'gridX': slice(None), 'gridY': slice(None)} ): """ """ # Define time index dict tindex = {dim: slice(*dates)} # Loop through variables for var in variables: # Initialize data array if not var in data_vars: data_vars[var] = ds[key_dict[var]][var].isel(indices).sel(tindex).load() # Concatenate data arrays else: data_vars[var] = xr.concat( (data_vars[var], ds[key_dict[var]][var].isel(indices).sel(tindex).load()), dim=dim, ) return data_vars # Master function # In[21]: def extract_hindcast( client, variables, daterange, mask=None, res='1h', version='19-05', model='NEMO', filesystem='local', n_workers=12, indices={'x': slice(None), 'y': slice(None)}, cnk={'time_counter': None, 'y': None, 'x': None}, ): """ """ # Prepare variable definitions years, months, days = [[getattr(date, key) for date in daterange] for key in ['year', 'month', 'day']] paths = load_paths() key_dict = load_netCDF_keys(filesystem=filesystem) if any(var in variables for var in ['u_wind', 'v_wind']): res, version = '', '1' ds, keys = {}, list(set([key_dict[var] for var in variables])) encoding = dict(zip(variables, np.repeat({'zlib': True}, len(variables)))) prefix_out = os.path.join(paths['out'], filesystem, model) # Initiate loading protocol based on filesystem if filesystem == 'local': prefix_out = f'{prefix_out}_{key_dict[variables[0]]}_' meshmask = (['y', 'x'], mask[indices['y'], indices['x']]) kwg = {'combine': 'nested', 'concat_dim': 'time_counter', 'parallel': True} elif filesystem == 'errdap': prefix_out = f'{prefix_out}_{key_dict[variables[0]][1:]}_' meshmask = (['gridY', 'gridX'], mask[indices['gridY'], indices['gridX']]) for key in keys: ds[key] = xr.open_dataset(os.path.join(paths['erddap'], f'ubcSS{key}Fields{res}V{version}')) attrs = ds[key_dict[variables[0]]].attrs else: raise ValueError(f'Unknown filesystem: {filesystem}') # Loop through years for year in range(years[0], years[1] + 1): # Initialize data_vars dict and parse months data_vars = {} monthday = [[1, 1], [12, 31]] monthspan = [1, 13] if year == years[0]: monthspan[0] = months[0] if year == years[1]: monthspan[1] = months[1] + 1 # Extract data month by month for month in tqdm(range(*monthspan), desc=f'Loading {year}'): # Parse daterange day, monthdays = 1, monthrange(year, month)[1] if (year == years[0]) and (month == months[0]): day = days[0] monthdays = monthdays - day + 1 monthday[0] = [month, day] if (year == years[1]) and (month == months[1]): monthdays = days[1] monthday[1] = [month, monthdays] startdate = datetime(year, month, day) # Load variables from local filesystem using xarray.mfdataset and dask if filesystem == 'local': prefixes = [ make_prefix(startdate + timedelta(days=day), paths, model=model, res=res) for day in range(monthdays) ] with ExitStack() as stack: for key in keys: tag = key if model == 'HRDPS': tag = '' flist = [prefix + tag + '.nc' for prefix in prefixes] ds[key] = stack.enter_context(xr.open_mfdataset(flist, chunks=cnk, **kwg)) attrs = ds[key_dict[variables[0]]].attrs data_vars = extract_variables(data_vars, ds, variables, key_dict, dim='time_counter', indices=indices) # Load variables from ERDDAP using specified month range elif filesystem == 'errdap': dates = [startdate, startdate + timedelta(monthdays)] data_vars = extract_variables(data_vars, ds, variables, key_dict, dates=dates, indices=indices) # Raise ValueError if filesystem is defined incorrectly else: raise ValueError(f'Unknown filesystem: {filesystem}') # Save year's worth of data as netCDF file if mask is not None: data_vars['meshmask'] = meshmask datestr = '_'.join(datetime(year, *md).strftime('%Y%m%d') for md in monthday) with xr.Dataset(data_vars=data_vars, attrs=attrs) as obj: obj.to_netcdf(prefix_out + datestr + '.nc', encoding=encoding) # *** # # ### Perform the extraction # In[4]: # Define indices and variables paths = load_paths() #indices = {'gridX': slice(115, 360), 'gridY': slice(310, 788), 'depth': 0} # NEMO #cnk = {'time_counter': 3, 'deptht': 40*3, 'y': 898*3, 'x': 398*3} indices = {'x': slice(100, 170), 'y': slice(110, 190)} # HRDPS cnk = {'time_counter': 3, 'y': 266*3, 'x': 256*3} variables = ['u_wind', 'v_wind'] mask_NEMO = xr.open_dataset('/data/bmoorema/MEOPAR/grid/mesh_mask201702.nc') grid_NEMO = xr.open_dataset('/data/bmoorema/MEOPAR/grid/coordinates_seagrid_SalishSea201702.nc', decode_times=False) # In[7]: # Build HRDPS masks mask_HRDPS = {} dates = [datetime(2011, 9, 21), datetime(2011, 9, 22), datetime(2014, 9, 12)] for date in dates: with xr.open_dataset(make_prefix(date, paths, model='HRDPS') + '.nc') as grid_HRPDS: mask_HRDPS[date] = grid_tools.build_HRDPS_mask(grid_HRPDS, grid_NEMO, mask_NEMO.tmask[0, 0, ...]) # In[16]: client = Client(n_workers=12, threads_per_worker=2, processes=True) # In[22]: dateranges = [ (datetime(2010, 1, 1), dates[0]), (dates[1], dates[2]-timedelta(days=1)), (dates[2], datetime(2019, 12, 31)), ] for daterange, maskdate in zip(dateranges, mask_HRDPS): extract_hindcast(client, variables, daterange, mask=mask_HRDPS[maskdate], model='HRDPS', indices=indices, cnk=cnk) client.close() # In[ ]: