#!/usr/bin/env python # coding: utf-8 # This is a version of a notebook I (Elise) made to try out some of the things I learned in Jinhui Qin's dask tutorial on calculations I would normally do. I'm posting a version that was improved by Doug. I am guessing this part might be most helpful to people: # # client = dask.distributed.Client( # n_workers=2, threads_per_worker=2, processes=True) # In[ ]: get_ipython().system('lscpu') # In[ ]: get_ipython().system('cat /proc/meminfo | head -1') # In[ ]: from datetime import datetime, timedelta import functools import sys import time import dask import dask.distributed import matplotlib.pyplot as plt import netCDF4 import numpy import xarray from salishsea_tools import evaltools get_ipython().run_line_magic('matplotlib', 'inline') # In[ ]: print(sys.version) print(xarray.__version__) print(dask.__version__) print(netCDF4.__version__) print(numpy.__version__) # In[ ]: j0, j1, i0, i1 = 230, 470, 0, 200 # In[ ]: with netCDF4.Dataset("/ocean/eolson/MEOPAR/NEMO-forcing/grid/mesh_mask201702_noLPE.nc") as mesh: tmask=numpy.copy(mesh.variables['tmask'][:,:,j0:j1,i0:i1]) e3t0=numpy.copy(mesh.variables['e3t_0'][:,:,j0:j1,i0:i1]) # In[ ]: flistmuZ = evaltools.index_model_files( datetime(2015, 6, 1), datetime(2015, 6, 9), "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/", "long", 10, "ptrc_T", 1 ) # In[ ]: flistmuZ['t_0'][0], flistmuZ['t_n'][len(flistmuZ)-1] # In[ ]: def calc_depth_avgs(flist, tmask, e3t0): sum1 = numpy.empty((len(flist)*24*10, j1-j0, i1-i0)) sum1uZ = numpy.empty((len(flist)*24*10, j1-j0, i1-i0)) for ind, row in flist.iterrows(): ds = netCDF4.Dataset(row['paths']) diatoms = ds.variables['diatoms'][:,:,j0:j1,i0:i1] uZ = ds.variables['microzooplankton'][:,:,j0:j1,i0:i1] sum1[(24*10*ind):(24*10*(ind+1)),:,:] = numpy.sum(tmask*e3t0*diatoms, 1) sum1uZ[(24*10*ind):(24*10*(ind+1)),:,:] = numpy.sum(tmask*e3t0*uZ, 1) diatom_int = numpy.mean(sum1, 0) microzoo_int = numpy.mean(sum1uZ, 0) return diatom_int, microzoo_int # In[ ]: t0=datetime.now() diatoms_muZ_Int, uZ_muZ_Int = calc_depth_avgs(flistmuZ, tmask, e3t0) t1=datetime.now() print(t1-t0) # In[ ]: def calc_depth_avgs_no_loop(flist, tmask, e3t0): with netCDF4.Dataset(flist) as ds: diatoms = ds.variables['diatoms'][:,:,j0:j1,i0:i1] diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0) uZ = ds.variables['microzooplankton'][:,:,j0:j1,i0:i1] uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0) return diatoms_int, uZ_int # In[ ]: t0=datetime.now() diatoms_int, uZ_int = calc_depth_avgs_no_loop( "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc", tmask, e3t0) numpy.testing.assert_equal(diatoms_int, diatoms_muZ_Int) numpy.testing.assert_equal(uZ_int, uZ_muZ_Int) t1=datetime.now() print(t1-t0) # In[ ]: with xarray.open_dataset("/ocean/eolson/MEOPAR/NEMO-forcing/grid/mesh_mask201702_noLPE.nc") as mesh: x_tmask = mesh.tmask.isel(y=slice(j0, j1), x=slice(i0, i1)).data x_e3t0 = mesh.e3t_0.isel(y=slice(j0, j1), x=slice(i0, i1)).data # In[ ]: def calc_depth_avgs_xarray(flist, x_tmask, x_e3t0): with xarray.open_dataset(flist) as ds: diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1)) diatoms_int = (diatoms * x_tmask * x_e3t0).sum(axis=1).mean(axis=0) uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1)) uZ_int = (uZ * x_tmask * x_e3t0).sum(axis=1).mean(axis=0) return diatoms_int, uZ_int # In[ ]: t0=datetime.now() x_diatoms_int, x_uZ_int = calc_depth_avgs_xarray( "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc", tmask, e3t0) numpy.testing.assert_allclose(x_diatoms_int, diatoms_muZ_Int) numpy.testing.assert_allclose(x_uZ_int, uZ_muZ_Int) t1=datetime.now() print(t1-t0) # In[ ]: client = dask.distributed.Client( n_workers=2, threads_per_worker=2, processes=True) client # In[ ]: def calc_depth_avgs_dask(flist, chunks, tmask, e3t0): with xarray.open_dataset(flist, chunks=chunks) as ds: diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1)) diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0) uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1)) uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0) return diatoms_int, uZ_int # In[ ]: def check_results(diatoms_calc, diatoms_exp, uZ_calc, uZ_exp): numpy.testing.assert_allclose(diatoms_calc, diatoms_exp) numpy.testing.assert_allclose(uZ_calc, uZ_exp) # In[ ]: chunks = {"time_counter": 1, "deptht": 40, "y": 898, "x": 398} d_diatoms_int, d_uZ_int = calc_depth_avgs_dask( "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc", chunks, tmask, e3t0) check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int) # In[ ]: chunks = {"time_counter": 3, "deptht": 40*3, "y": 898*3, "x": 398*3} d_diatoms_int, d_uZ_int = calc_depth_avgs_dask( "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc", chunks, tmask, e3t0) check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int) # In[ ]: chunks = {"time_counter": 3, "deptht": 40*3, "y": 898*3, "x": 398*3} d_diatoms_int, d_uZ_int = calc_depth_avgs_dask( "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc", chunks, tmask, e3t0) check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int) # In[ ]: chunks = {"time_counter": 4, "deptht": 40*4, "y": 898*4, "x": 398*4} d_diatoms_int, d_uZ_int = calc_depth_avgs_dask( "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc", chunks, tmask, e3t0) check_results(d_diatoms_int, diatoms_muZ_Int, d_uZ_int, uZ_muZ_Int) # In[ ]: flistmuZ = evaltools.index_model_files( datetime(2015, 6, 1), datetime(2015, 6, 19), "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/", "long", 10, "ptrc_T", 1 ) # In[ ]: flistmuZ # In[ ]: t0=datetime.now() m_diatoms_muZ_Int, m_uZ_muZ_Int = calc_depth_avgs(flistmuZ, tmask, e3t0) t1=datetime.now() print(t1-t0) # In[ ]: def calc_depth_avgs_mfdataset(flist, chunks, tmask, e3t0): kwargs = dict(combine="nested", concat_dim="time_counter", parallel=True) with xarray.open_mfdataset(flist, chunks=chunks, **kwargs) as ds: diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1)) diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0) uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1)) uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0) return diatoms_int, uZ_int # In[ ]: chunks = {"time_counter": 4, "deptht": 40*4, "y": 898*4, "x": 398*4} mf_diatoms_int, mf_uZ_int = calc_depth_avgs_mfdataset( ["/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150531-20150609.nc", "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/SalishSea_1h_20150531_20150709_ptrc_T_20150610-20150619.nc",], chunks, tmask, e3t0) check_results(mf_diatoms_int, m_diatoms_muZ_Int, mf_uZ_int, m_uZ_muZ_Int) # In[ ]: client.close() # Final: # In[ ]: with xarray.open_dataset("/ocean/eolson/MEOPAR/NEMO-forcing/grid/mesh_mask201702_noLPE.nc") as mesh: x_tmask = mesh.tmask.isel(y=slice(j0, j1), x=slice(i0, i1)).data x_e3t0 = mesh.e3t_0.isel(y=slice(j0, j1), x=slice(i0, i1)).data # In[ ]: flistmuZ = evaltools.index_model_files( datetime(2015, 6, 1), datetime(2015, 9, 1), "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/", "long", 10, "ptrc_T", 1 ) # In[ ]: flistmuZ # In[ ]: flistHC = evaltools.index_model_files( flistmuZ['t_0'][0], flistmuZ['t_n'][len(flistmuZ)-1] - timedelta(days=1), '/results/SalishSea/hindcast.201905/', 'nowcast', 1, 'ptrc_T', 1) # In[ ]: flistHC # In[ ]: def calc_depth_avgs_mfdataset(flist, chunks, tmask, e3t0): kwargs = dict(combine="nested", concat_dim="time_counter", parallel=True) with xarray.open_mfdataset(flist, chunks=chunks, **kwargs) as ds: diatoms = ds.diatoms.isel(y=slice(j0, j1), x=slice(i0, i1)) diatoms_int = (diatoms * tmask * e3t0).sum(axis=1).mean(axis=0) uZ = ds.microzooplankton.isel(y=slice(j0, j1), x=slice(i0, i1)) uZ_int = (uZ * tmask * e3t0).sum(axis=1).mean(axis=0) return diatoms_int, uZ_int # In[ ]: client = dask.distributed.Client( n_workers=2, threads_per_worker=2, processes=True) client # In[ ]: chunks = {"time_counter": 3, "deptht": 40*3, "y": 898*3, "x": 398*3} # In[ ]: x_diatoms_muZ_int, x_uZ_muZ_int = calc_depth_avgs_mfdataset( [row["paths"] for _, row in flistmuZ.iterrows()], chunks, x_tmask, x_e3t0 ) # In[ ]: x_diatomsHC_int, x_uZHC_int = calc_depth_avgs_mfdataset( [row["paths"] for _, row in flistHC.iterrows()], chunks, x_tmask, x_e3t0 ) # In[ ]: t0=datetime.now() diatomsmuZInt = x_diatoms_muZ_int.compute() t1=datetime.now() print(t1-t0) # In[ ]: t0=datetime.now() diatomsHCInt = x_diatomsHC_int.compute() t1=datetime.now() print(t1-t0) # In[ ]: fig, ax = plt.subplots(1, 3, figsize=(15, 4)) m0 = ax[0].pcolormesh(diatomsmuZInt) plt.colorbar(m0, ax=ax[0]) ax[0].set_title('muZ Diatoms') m1 = ax[1].pcolormesh(diatomsHCInt) plt.colorbar(m1,ax=ax[1]) ax[1].set_title('HC Diatoms') m2 = ax[2].pcolormesh(diatomsmuZInt - diatomsHCInt) plt.colorbar(m2,ax=ax[2]) ax[2].set_title('Diff') # In[ ]: t0=datetime.now() uZmuZInt = x_uZ_muZ_int.compute() t1=datetime.now() print(t1-t0) # In[ ]: t0=datetime.now() uZHCInt = x_uZHC_int.compute() t1=datetime.now() print(t1-t0) # In[ ]: fig, ax = plt.subplots(1, 3, figsize=(15, 4)) m0 = ax[0].pcolormesh(uZmuZInt) plt.colorbar(m0, ax=ax[0]) ax[0].set_title('muZ microZ') m1 = ax[1].pcolormesh(uZHCInt) plt.colorbar(m1,ax=ax[1]) ax[1].set_title('HC microZ') m2 = ax[2].pcolormesh(uZmuZInt - uZHCInt) plt.colorbar(m2,ax=ax[2]) ax[2].set_title('Diff') # In[ ]: client.close() # In[ ]: # In[ ]: # In[ ]: