My latest code for optimized extraction of long hindcast fields
The local extraction method uses Dask and xarray.open_mfdataset based on Doug's optimized workflow in dask-expts.
I get about 1 h extraction time per year of hourly surface fields across 12 Salish workers using this method.
The ERDDAP method is much slower but publically accessible and requires no user-side CPU resources.
Extraction time is about 12 h per year of hourly surface fields.
Both methods extract in monthy chunks and save to yearly files. The code is designed to combine variables on the same model grid (temperature and nitrate for example) but not across different grids (so velocity separate from temperate separate from wind fields, run multiple times to extract these separately).
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import dask
import os
from dask.distributed import Client
from datetime import datetime, timedelta
from calendar import monthrange
from contextlib import ExitStack
from tqdm.notebook import tqdm
from salishsea_tools import grid_tools
def make_prefix(date, paths, model='NEMO', res='1h'):
"""Construct path prefix for local SalishSeaCast results given date object
and paths dict. e.g.,
/results/SalishSea/hindcast.201905/ddmmmyy/SalishSea_1h_yyyymmdd_yyyymmdd
"""
if model == 'NEMO':
if (date.year >= 2013) or (date.year <= 2016): path = paths['NEMO1']
else: path = paths['NEMO2']
datestr = '_'.join(np.repeat(date.strftime('%Y%m%d'), 2))
prefix = os.path.join(path, date.strftime('%d%b%y').lower(), f'SalishSea_{res}_{datestr}')
elif model == 'HRDPS':
if date > datetime(2014, 9, 11):
prefix = os.path.join(paths['HRDPS'], 'operational', 'ops_' + date.strftime('y%Ym%md%d'))
else:
prefix = os.path.join(paths['HRDPS'], 'gemlam', 'gemlam_' + date.strftime('y%Ym%md%d'))
else:
return ValueError(f'Unknown model: {model}')
return prefix
def load_paths():
"""
"""
paths = {
'NEMO1': '/results/SalishSea/hindcast.201905',
'NEMO2': '/results2/SalishSea/nowcast-green.201905',
'HRDPS': '/results/forcing/atmospheric/GEM2.5',
'erddap': 'https://salishsea.eos.ubc.ca/erddap/griddap',
'out': '/ocean/bmoorema/research/MEOPAR/analysis-ben/data/SalishSeaCast',
}
return paths
def load_netCDF_keys(filesystem='errdap'):
"""
"""
# NetCDF file keys master dict
if filesystem == 'errdap':
key_dict = {
'temperature': 'g3DTracer',
'salinity': 'g3DTracer',
'nitrate': 'g3DBiology',
'uVelocity': 'g3DuGrid',
'vVelocity': 'g3DvGrid',
'u_wind': 'aSurfaceAtmosphere',
'v_wind': 'aSurfaceAtmosphere',
}
elif filesystem == 'local':
key_dict = {
'votemper': 'grid_T',
'vosaline': 'grid_T',
'nitrate': 'ptrc_T',
'vozocrtx': 'grid_U',
'vomecrty': 'grid_V',
'u_wind': 'ops',
'v_wind': 'ops',
}
return key_dict
def extract_variables(
data_vars, ds, variables, key_dict, dates=[None], dim='time',
indices={'gridX': slice(None), 'gridY': slice(None)}
):
"""
"""
# Define time index dict
tindex = {dim: slice(*dates)}
# Loop through variables
for var in variables:
# Initialize data array
if not var in data_vars:
data_vars[var] = ds[key_dict[var]][var].isel(indices).sel(tindex).load()
# Concatenate data arrays
else:
data_vars[var] = xr.concat(
(data_vars[var], ds[key_dict[var]][var].isel(indices).sel(tindex).load()), dim=dim,
)
return data_vars
Master function
def extract_hindcast(
client, variables, daterange, mask=None, res='1h', version='19-05',
model='NEMO', filesystem='local', n_workers=12,
indices={'x': slice(None), 'y': slice(None)},
cnk={'time_counter': None, 'y': None, 'x': None},
):
"""
"""
# Prepare variable definitions
years, months, days = [[getattr(date, key) for date in daterange] for key in ['year', 'month', 'day']]
paths = load_paths()
key_dict = load_netCDF_keys(filesystem=filesystem)
if any(var in variables for var in ['u_wind', 'v_wind']): res, version = '', '1'
ds, keys = {}, list(set([key_dict[var] for var in variables]))
encoding = dict(zip(variables, np.repeat({'zlib': True}, len(variables))))
prefix_out = os.path.join(paths['out'], filesystem, model)
# Initiate loading protocol based on filesystem
if filesystem == 'local':
prefix_out = f'{prefix_out}_{key_dict[variables[0]]}_'
meshmask = (['y', 'x'], mask[indices['y'], indices['x']])
kwg = {'combine': 'nested', 'concat_dim': 'time_counter', 'parallel': True}
elif filesystem == 'errdap':
prefix_out = f'{prefix_out}_{key_dict[variables[0]][1:]}_'
meshmask = (['gridY', 'gridX'], mask[indices['gridY'], indices['gridX']])
for key in keys:
ds[key] = xr.open_dataset(os.path.join(paths['erddap'], f'ubcSS{key}Fields{res}V{version}'))
attrs = ds[key_dict[variables[0]]].attrs
else:
raise ValueError(f'Unknown filesystem: {filesystem}')
# Loop through years
for year in range(years[0], years[1] + 1):
# Initialize data_vars dict and parse months
data_vars = {}
monthday = [[1, 1], [12, 31]]
monthspan = [1, 13]
if year == years[0]: monthspan[0] = months[0]
if year == years[1]: monthspan[1] = months[1] + 1
# Extract data month by month
for month in tqdm(range(*monthspan), desc=f'Loading {year}'):
# Parse daterange
day, monthdays = 1, monthrange(year, month)[1]
if (year == years[0]) and (month == months[0]):
day = days[0]
monthdays = monthdays - day + 1
monthday[0] = [month, day]
if (year == years[1]) and (month == months[1]):
monthdays = days[1]
monthday[1] = [month, monthdays]
startdate = datetime(year, month, day)
# Load variables from local filesystem using xarray.mfdataset and dask
if filesystem == 'local':
prefixes = [
make_prefix(startdate + timedelta(days=day), paths, model=model, res=res)
for day in range(monthdays)
]
with ExitStack() as stack:
for key in keys:
tag = key
if model == 'HRDPS': tag = ''
flist = [prefix + tag + '.nc' for prefix in prefixes]
ds[key] = stack.enter_context(xr.open_mfdataset(flist, chunks=cnk, **kwg))
attrs = ds[key_dict[variables[0]]].attrs
data_vars = extract_variables(data_vars, ds, variables, key_dict, dim='time_counter', indices=indices)
# Load variables from ERDDAP using specified month range
elif filesystem == 'errdap':
dates = [startdate, startdate + timedelta(monthdays)]
data_vars = extract_variables(data_vars, ds, variables, key_dict, dates=dates, indices=indices)
# Raise ValueError if filesystem is defined incorrectly
else:
raise ValueError(f'Unknown filesystem: {filesystem}')
# Save year's worth of data as netCDF file
if mask is not None: data_vars['meshmask'] = meshmask
datestr = '_'.join(datetime(year, *md).strftime('%Y%m%d') for md in monthday)
with xr.Dataset(data_vars=data_vars, attrs=attrs) as obj:
obj.to_netcdf(prefix_out + datestr + '.nc', encoding=encoding)
# Define indices and variables
paths = load_paths()
#indices = {'gridX': slice(115, 360), 'gridY': slice(310, 788), 'depth': 0} # NEMO
#cnk = {'time_counter': 3, 'deptht': 40*3, 'y': 898*3, 'x': 398*3}
indices = {'x': slice(100, 170), 'y': slice(110, 190)} # HRDPS
cnk = {'time_counter': 3, 'y': 266*3, 'x': 256*3}
variables = ['u_wind', 'v_wind']
mask_NEMO = xr.open_dataset('/data/bmoorema/MEOPAR/grid/mesh_mask201702.nc')
grid_NEMO = xr.open_dataset('/data/bmoorema/MEOPAR/grid/coordinates_seagrid_SalishSea201702.nc', decode_times=False)
# Build HRDPS masks
mask_HRDPS = {}
dates = [datetime(2011, 9, 21), datetime(2011, 9, 22), datetime(2014, 9, 12)]
for date in dates:
with xr.open_dataset(make_prefix(date, paths, model='HRDPS') + '.nc') as grid_HRPDS:
mask_HRDPS[date] = grid_tools.build_HRDPS_mask(grid_HRPDS, grid_NEMO, mask_NEMO.tmask[0, 0, ...])
100%|██████████| 68096/68096 [06:29<00:00, 174.65it/s] 100%|██████████| 68096/68096 [06:28<00:00, 175.20it/s] 100%|██████████| 68096/68096 [06:33<00:00, 173.20it/s]
client = Client(n_workers=12, threads_per_worker=2, processes=True)
dateranges = [
(datetime(2010, 1, 1), dates[0]),
(dates[1], dates[2]-timedelta(days=1)),
(dates[2], datetime(2019, 12, 31)),
]
for daterange, maskdate in zip(dateranges, mask_HRDPS):
extract_hindcast(client, variables, daterange, mask=mask_HRDPS[maskdate], model='HRDPS', indices=indices, cnk=cnk)
client.close()
HBox(children=(IntProgress(value=0, description='Loading 2010', max=12, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=0, description='Loading 2011', max=9, style=ProgressStyle(description_width='…
HBox(children=(IntProgress(value=0, description='Loading 2011', max=4, style=ProgressStyle(description_width='…
HBox(children=(IntProgress(value=0, description='Loading 2012', max=12, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=0, description='Loading 2013', max=12, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=0, description='Loading 2014', max=9, style=ProgressStyle(description_width='…
HBox(children=(IntProgress(value=0, description='Loading 2014', max=4, style=ProgressStyle(description_width='…
HBox(children=(IntProgress(value=0, description='Loading 2015', max=12, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=0, description='Loading 2016', max=12, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=0, description='Loading 2017', max=12, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=0, description='Loading 2018', max=12, style=ProgressStyle(description_width=…
HBox(children=(IntProgress(value=0, description='Loading 2019', max=12, style=ProgressStyle(description_width=…