This is a version of a notebook I (Elise) made to try out some of the things I learned in Jinhui Qin's dask tutorial on calculations I would normally do. I'm posting a version that was improved by Doug. I am guessing this part might be most helpful to people:
client = dask.distributed.Client( n_workers=2, threads_per_worker=2, processes=True)
!lscpu
!cat /proc/meminfo | head -1
from datetime import datetime, timedelta
import functools
import sys
import time
import dask
import dask.distributed
import matplotlib.pyplot as plt
import netCDF4
import numpy
import xarray
from salishsea_tools import evaltools
%matplotlib inline
print(sys.version)
print(xarray.__version__)
print(dask.__version__)
print(netCDF4.__version__)
print(numpy.__version__)
j0, j1, i0, i1 = 230, 470, 0, 200
with netCDF4.Dataset("/ocean/eolson/MEOPAR/NEMO-forcing/grid/mesh_mask201702_noLPE.nc") as mesh:
tmask=numpy.copy(mesh.variables['tmask'][:,:,j0:j1,i0:i1])
e3t0=numpy.copy(mesh.variables['e3t_0'][:,:,j0:j1,i0:i1])
flistmuZ = evaltools.index_model_files(
datetime(2015, 6, 1), datetime(2015, 6, 9),
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/",
"long", 10, "ptrc_T", 1
)
flistmuZ['t_0'][0], flistmuZ['t_n'][len(flistmuZ)-1]
def calc_depth_avgs(flist, tmask, e3t0):
sum1 = numpy.empty((len(flist)*24*10, j1-j0, i1-i0))
sum1uZ = numpy.empty((len(flist)*24*10, j1-j0, i1-i0))
for ind, row in flist.iterrows():
ds = netCDF4.Dataset(row['paths'])
diatoms = ds.variables['diatoms'][:,:,j0:j1,i0:i1]
uZ = ds.variables['microzooplankton'][:,:,j0:j1,i0:i1]
sum1[(24*10*ind):(24*10*(ind+1)),:,:] = numpy.sum(tmask*e3t0*diatoms, 1)
sum1uZ[(24*10*ind):(24*10*(ind+1)),:,:] = numpy.sum(tmask*e3t0*uZ, 1)
diatom_int = numpy.mean(sum1, 0)
microzoo_int = numpy.mean(sum1uZ, 0)
return diatom_int, microzoo_int
t0=datetime.now()
diatoms_muZ_Int, uZ_muZ_Int = calc_depth_avgs(flistmuZ, tmask, e3t0)
t1=datetime.now()
print(t1-t0)
def calc_depth_avgs_no_loop(flist, tmask, e3t0):
with netCDF4.Dataset(flist) as ds:
diatoms = ds.variables['diatoms'][:,:,j0:j1,i0:i1]
diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0)
uZ = ds.variables['microzooplankton'][:,:,j0:j1,i0:i1]
uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0)
return diatoms_int, uZ_int
t0=datetime.now()
diatoms_int, uZ_int = calc_depth_avgs_no_loop(
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc",
tmask, e3t0)
numpy.testing.assert_equal(diatoms_int, diatoms_muZ_Int)
numpy.testing.assert_equal(uZ_int, uZ_muZ_Int)
t1=datetime.now()
print(t1-t0)
with xarray.open_dataset("/ocean/eolson/MEOPAR/NEMO-forcing/grid/mesh_mask201702_noLPE.nc") as mesh:
x_tmask = mesh.tmask.isel(y=slice(j0, j1), x=slice(i0, i1)).data
x_e3t0 = mesh.e3t_0.isel(y=slice(j0, j1), x=slice(i0, i1)).data
def calc_depth_avgs_xarray(flist, x_tmask, x_e3t0):
with xarray.open_dataset(flist) as ds:
diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1))
diatoms_int = (diatoms * x_tmask * x_e3t0).sum(axis=1).mean(axis=0)
uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1))
uZ_int = (uZ * x_tmask * x_e3t0).sum(axis=1).mean(axis=0)
return diatoms_int, uZ_int
t0=datetime.now()
x_diatoms_int, x_uZ_int = calc_depth_avgs_xarray(
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc",
tmask, e3t0)
numpy.testing.assert_allclose(x_diatoms_int, diatoms_muZ_Int)
numpy.testing.assert_allclose(x_uZ_int, uZ_muZ_Int)
t1=datetime.now()
print(t1-t0)
client = dask.distributed.Client(
n_workers=2, threads_per_worker=2, processes=True)
client
def calc_depth_avgs_dask(flist, chunks, tmask, e3t0):
with xarray.open_dataset(flist, chunks=chunks) as ds:
diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1))
diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0)
uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1))
uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0)
return diatoms_int, uZ_int
def check_results(diatoms_calc, diatoms_exp, uZ_calc, uZ_exp):
numpy.testing.assert_allclose(diatoms_calc, diatoms_exp)
numpy.testing.assert_allclose(uZ_calc, uZ_exp)
chunks = {"time_counter": 1, "deptht": 40, "y": 898, "x": 398}
d_diatoms_int, d_uZ_int = calc_depth_avgs_dask(
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc",
chunks, tmask, e3t0)
check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int)
chunks = {"time_counter": 3, "deptht": 40*3, "y": 898*3, "x": 398*3}
d_diatoms_int, d_uZ_int = calc_depth_avgs_dask(
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc",
chunks, tmask, e3t0)
check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int)
chunks = {"time_counter": 3, "deptht": 40*3, "y": 898*3, "x": 398*3}
d_diatoms_int, d_uZ_int = calc_depth_avgs_dask(
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc",
chunks, tmask, e3t0)
check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int)
chunks = {"time_counter": 4, "deptht": 40*4, "y": 898*4, "x": 398*4}
d_diatoms_int, d_uZ_int = calc_depth_avgs_dask(
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc",
chunks, tmask, e3t0)
check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int)
flistmuZ = evaltools.index_model_files(
datetime(2015, 6, 1), datetime(2015, 6, 19),
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/",
"long", 10, "ptrc_T", 1
)
flistmuZ
t0=datetime.now()
m_diatoms_muZ_Int, m_uZ_muZ_Int = calc_depth_avgs(flistmuZ, tmask, e3t0)
t1=datetime.now()
print(t1-t0)
def calc_depth_avgs_mfdataset(flist, chunks, tmask, e3t0):
kwargs = dict(combine="nested", concat_dim="time_counter", parallel=True)
with xarray.open_mfdataset(flist, chunks=chunks, **kwargs) as ds:
diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1))
diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0)
uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1))
uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0)
return diatoms_int, uZ_int
chunks = {"time_counter": 4, "deptht": 40*4, "y": 898*4, "x": 398*4}
mf_diatoms_int, mf_uZ_int = calc_depth_avgs_mfdataset(
["/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc",
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150610-20150619.nc",],
chunks, tmask, e3t0)
check_results(mf_diatoms_int, m_diatoms_muZ_Int, mf_uZ_int, m_uZ_muZ_Int)
client.close()
Final:
with xarray.open_dataset("/ocean/eolson/MEOPAR/NEMO-forcing/grid/mesh_mask201702_noLPE.nc") as mesh:
x_tmask = mesh.tmask.isel(y=slice(j0, j1), x=slice(i0, i1)).data
x_e3t0 = mesh.e3t_0.isel(y=slice(j0, j1), x=slice(i0, i1)).data
flistmuZ = evaltools.index_model_files(
datetime(2015, 6, 1), datetime(2015, 9, 1),
"/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/",
"long", 10, "ptrc_T", 1
)
flistmuZ
flistHC = evaltools.index_model_files(
flistmuZ['t_0'][0], flistmuZ['t_n'][len(flistmuZ)-1] - timedelta(days=1),
'/results/SalishSea/hindcast.201905/', 'nowcast', 1, 'ptrc_T', 1)
flistHC
def calc_depth_avgs_mfdataset(flist, chunks, tmask, e3t0):
kwargs = dict(combine="nested", concat_dim="time_counter", parallel=True)
with xarray.open_mfdataset(flist, chunks=chunks, **kwargs) as ds:
diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1))
diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0)
uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1))
uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0)
return diatoms_int, uZ_int
client = dask.distributed.Client(
n_workers=2, threads_per_worker=2, processes=True)
client
chunks = {"time_counter": 3, "deptht": 40*3, "y": 898*3, "x": 398*3}
x_diatoms_muZ_int, x_uZ_muZ_int = calc_depth_avgs_mfdataset(
[row["paths"] for _, row in flistmuZ.iterrows()], chunks, x_tmask, x_e3t0
)
x_diatomsHC_int, x_uZHC_int = calc_depth_avgs_mfdataset(
[row["paths"] for _, row in flistHC.iterrows()], chunks, x_tmask, x_e3t0
)
t0=datetime.now()
diatomsmuZInt = x_diatoms_muZ_int.compute()
t1=datetime.now()
print(t1-t0)
t0=datetime.now()
diatomsHCInt = x_diatomsHC_int.compute()
t1=datetime.now()
print(t1-t0)
fig, ax = plt.subplots(1, 3, figsize=(15, 4))
m0 = ax[0].pcolormesh(diatomsmuZInt)
plt.colorbar(m0, ax=ax[0])
ax[0].set_title('muZ Diatoms')
m1 = ax[1].pcolormesh(diatomsHCInt)
plt.colorbar(m1,ax=ax[1])
ax[1].set_title('HC Diatoms')
m2 = ax[2].pcolormesh(diatomsmuZInt - diatomsHCInt)
plt.colorbar(m2,ax=ax[2])
ax[2].set_title('Diff')
t0=datetime.now()
uZmuZInt = x_uZ_muZ_int.compute()
t1=datetime.now()
print(t1-t0)
t0=datetime.now()
uZHCInt = x_uZHC_int.compute()
t1=datetime.now()
print(t1-t0)
fig, ax = plt.subplots(1, 3, figsize=(15, 4))
m0 = ax[0].pcolormesh(uZmuZInt)
plt.colorbar(m0, ax=ax[0])
ax[0].set_title('muZ microZ')
m1 = ax[1].pcolormesh(uZHCInt)
plt.colorbar(m1,ax=ax[1])
ax[1].set_title('HC microZ')
m2 = ax[2].pcolormesh(uZmuZInt - uZHCInt)
plt.colorbar(m2,ax=ax[2])
ax[2].set_title('Diff')
client.close()