import matplotlib
matplotlib.use('nbagg')
import pandas as pd
import sys
import os
import numpy as np
import matplotlib.pyplot as plt
import xarray as xr
import zarr
import warnings
from textwrap import wrap
from mpl_toolkits import mplot3d
import random
from scipy.ndimage.filters import uniform_filter
from scipy.ndimage.measurements import variance
from scipy.ndimage import label
from scipy.ndimage.morphology import binary_closing
from skimage.filters import gaussian, threshold_otsu
from skimage import measure
from re import split
import dask
from ipywidgets import interact
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress, wait, fire_and_forget
from dask import delayed, compute, visualize
from dask_image.ndfilters import uniform_filter as uf
from dask_image.ndfilters import gaussian_filter
from dask_image.ndmeasure import variance as varian
import dask_image.ndmeasure as da_measure
import dask.array as da
import time
warnings.filterwarnings('ignore')
sys.path.insert(1, f"{os.path.abspath(os.path.join(os.path.abspath(''), '../'))}")
from src.utils import time_3d, get_pars_from_ini
location = split(', |_|-|!', os.popen('hostname').read())[0].replace("\n", "")
path_data = get_pars_from_ini(campaign='loc')[location]['path_data']
path_proj = get_pars_from_ini(campaign='loc')[location]['path_proj']
# @dask.delayed
def multiple_plot(ds):
if sum(ds.zhh14.shape) > 0:
plt.close('all')
fig, axs = plt.subplots(3,2, figsize=(11,20), sharey=True)
var = {'zhh14': {'cmp': 'jet', 'vim':0, 'vmax':40, 'name': 'Reflectivity'},
'zhh14SP': {'cmp': 'jet', 'vim':0, 'vmax':40, 'name': 'Reflectivity'},
'vel14': {'cmp': 'hsv', 'vim':-15, 'vmax':15, 'name': 'Velocity'},
'vel14SP': {'cmp': 'hsv', 'vim':-15, 'vmax':15, 'name': 'Velocity'},
'ldrhh14': {'cmp': 'seismic', 'vim':-60, 'vmax':-20, 'name': 'LDR'},
'ldrhh14SP': {'cmp': 'seismic', 'vim':-60, 'vmax':-20, 'name': 'LDR'}}
x = ds.range * np.sin(np.deg2rad(ds.azimuth)) * ds.DR.values[0]
y = ds.alt3D
try:
ax1 = axs[0][0].pcolormesh(x[:, :], y[:, :], ds.zhh14[:, :], cmap=var['zhh14']['cmp'],
vmin=var['zhh14']['vim'], vmax=var['zhh14']['vmax'], shading='auto')
ax2 = axs[0][1].pcolormesh(x[:, :], y[:, :], ds.zhh14SP[:, :], cmap=var['zhh14SP']['cmp'],
vmin=var['zhh14SP']['vim'], vmax=var['zhh14SP']['vmax'], shading='auto')
ax3 = axs[1][0] .pcolormesh(x[:, :], y[:, :], ds.vel14[:, :], cmap=var['vel14']['cmp'],
vmin=var['vel14']['vim'], vmax=var['vel14']['vmax'], shading='flat', rasterized=True)
ax4 = axs[1][1].pcolormesh(x[:, :], y[:, :], ds.vel14SP[:, :], cmap=var['vel14SP']['cmp'],
vmin=var['vel14SP']['vim'], vmax=var['vel14SP']['vmax'], shading='flat', rasterized=True)
ax5 = axs[2][0].pcolormesh(x[:, :], y[:, :], ds.ldrhh14[:, :], cmap=var['ldrhh14']['cmp'],
vmin=var['ldrhh14']['vim'], vmax=var['ldrhh14']['vmax'], shading='flat', rasterized=True)
ax6 = axs[2][1].pcolormesh(x[:, :], y[:, :], ds.ldrhh14SP[:, :], cmap=var['ldrhh14SP']['cmp'],
vmin=var['ldrhh14SP']['vim'], vmax=var['ldrhh14SP']['vmax'], shading='flat', rasterized=True)
plt.colorbar(ax1, ax=[axs[0][0], axs[0][1]], pad=0.05, label=f"{var['zhh14']['name']} ({ds['zhh14'].units})")
plt.colorbar(ax3, ax=[axs[1][0], axs[1][1]], pad=0.05, label=f"{var['vel14']['name']} ({ds['vel14'].units})")
plt.colorbar(ax5, ax=[axs[2][0], axs[2][1]], pad=0.05, label=f"{var['ldrhh14']['name']} ({ds['ldrhh14'].units})")
save = f"{np.datetime_as_string(ds.time.values, unit='s').replace(':','-')}"
t = ds.time.values
except ValueError:
ax1 = axs[0][0].pcolormesh(x[:, :, 0], y[:, :, 0], ds.zhh14[:, :, 0], cmap=var['zhh14']['cmp'],
vmin=var['zhh14']['vim'], vmax=var['zhh14']['vmax'], shading='auto')
ax2 = axs[0][1].pcolormesh(x[:, :, 0], y[:, :, 0], ds.zhh14SP[:, :, 0], cmap=var['zhh14SP']['cmp'],
vmin=var['zhh14SP']['vim'], vmax=var['zhh14SP']['vmax'], shading='nearest')
ax3 = axs[1][0] .pcolormesh(x[:, :, 0], y[:, :, 0], ds.vel14[:, :, 0], cmap=var['vel14']['cmp'],
vmin=var['vel14']['vim'], vmax=var['vel14']['vmax'], shading='nearest', rasterized=True)
ax4 = axs[1][1].pcolormesh(x[:, :, 0], y[:, :, 0], ds.vel14SP[:, :, 0], cmap=var['vel14SP']['cmp'],
vmin=var['vel14SP']['vim'], vmax=var['vel14SP']['vmax'], shading='nearest', rasterized=True)
ax5 = axs[2][0].pcolormesh(x[:, :, 0], y[:, :, 0], ds.ldrhh14[:, :, 0], cmap=var['ldrhh14']['cmp'],
vmin=var['ldrhh14']['vim'], vmax=var['ldrhh14']['vmax'], shading='nearest', rasterized=True)
ax6 = axs[2][1].pcolormesh(x[:, :, 0], y[:, :, 0], ds.ldrhh14SP[:, :, 0], cmap=var['ldrhh14SP']['cmp'],
vmin=var['ldrhh14SP']['vim'], vmax=var['ldrhh14SP']['vmax'], shading='nearest', rasterized=True)
plt.colorbar(ax1, ax=[axs[0][0], axs[0][1]], pad=0.05, label=f"{var['zhh14']['name']} ({ds['zhh14'].units})")
plt.colorbar(ax3, ax=[axs[1][0], axs[1][1]], pad=0.05, label=f"{var['vel14']['name']} ({ds['vel14'].units})")
plt.colorbar(ax5, ax=[axs[2][0], axs[2][1]], pad=0.05, label=f"{var['ldrhh14']['name']} ({ds['ldrhh14'].units})")
save = f"{np.datetime_as_string(ds.time.values[0], unit='s').replace(':','-')}"
t = ds.time
# for i, ax in enumerate(axs.flat):
# key = list(var.keys())
# x = ds.range * np.sin(np.deg2rad(ds.azimuth)) * ds.DR.values[0]
# y = ds.alt3D
# data = ds[key[i]]
# data = data.where(data != -9999.)
# im = ax.pcolormesh(x[:, :, 0], y[:, :, 0], data[:, :, 0], cmap=var[key[i]]['cmp'],
# vmin=var[key[i]]['vim'], vmax=var[key[i]]['vmax'], shading='auto')
# ax.set_ylabel('Height (m)')
# ax.set_xlabel('Cross track (m)')
# ax.set_title("\n".join(wrap(ds[key[i]].notes, 30)))
# ax.set_ylim(0, 8000)
# if i % 2 != 0:
# plt.colorbar(im, ax=[axs.flat[i-1:i+1]], pad=0.05,
# label=f"{var[key[i]]['name']} ({ds[key[i]].units})")
title = f"{np.datetime_as_string(ds.time.values, unit='s')}\n"
fig.suptitle(title, fontsize=16, y=.92)
fig.savefig(f"{path_proj}/results/plots/test/{save}.jpg", format='jpg')
plt.close('all')
del ds
return t
# @dask.delayed
def load(_time):
ds_xr = xr.open_zarr(f'{path_data}/zarr_rckd/KUsKAs_Wn/lores.zarr')
ds_xr = ds_xr.sel(time=~ds_xr.get_index("time").duplicated())
var2load = ['range','alt3D', 'azimuth', 'zhh14', 'zhh14SP', 'vel14',
'vel14SP', 'ldrhh14', 'ldrhh14SP', 'DR']
w = ds_xr[var2load].sel(time=_time)
return w
!scancel -u alfonso8
cluster = SLURMCluster(queue="seseml",
memory='100GB',
cores=40,
processes=10,
walltime='24:40:00',
scheduler_options={'host': '172.22.179.3:7222',
'dashboard_address': '7999',
},
)
cluster.scale(jobs=8)
# cluster.adapt(maximum_jobs=8)
cluster
Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…
%%bash
squeue -u alfonso8
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 548708 seseml dask-wor alfonso8 PD 0:00 1 (None) 548707 seseml dask-wor alfonso8 PD 0:00 1 (None) 548706 seseml dask-wor alfonso8 PD 0:00 1 (None) 548705 seseml dask-wor alfonso8 PD 0:00 1 (None)
client = Client(cluster)
client
Client-6341ef73-786a-11ec-8e86-b083fed70206
Connection method: Cluster object | Cluster type: dask_jobqueue.SLURMCluster |
Dashboard: http://172.22.179.3:7999/status |
b555944c
Dashboard: http://172.22.179.3:7999/status | Workers: 0 |
Total threads: 0 | Total memory: 0 B |
Scheduler-04877f8c-12aa-4656-9326-38d34bed8c9f
Comm: tcp://172.22.179.3:7222 | Workers: 0 |
Dashboard: http://172.22.179.3:7999/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
ds_prop = pd.read_csv(f'{path_proj}/results/all_px_202111070706.csv')
# ds_prop.dates = pd.to_datetime(ds_prop.dates)
ds_prop.rename(columns={'Unnamed: 0': 'dates'}, inplace=True)
times = list(ds_prop.dates)
print(len(times))
11366
%%time
ds_xr = xr.open_zarr(f'{path_data}/zarr_rckd/KUsKAs_Wn/lores.zarr')
# ds_xr = xr.open_zarr(f'{path_data}/zarr/KUsKAs_Wn/lores.zarr')
ds_xr = ds_xr.sel(time=~ds_xr.get_index("time").duplicated())
len(ds_xr.time.values)
78800
CPU times: user 1.28 s, sys: 128 ms, total: 1.41 s Wall time: 1.32 s
time_sel = slice('2019-09-15 03:11:00','2019-09-17 03:50:00')
# time_sel = slice('2019-09-16 03:11:00','2019-09-16 10:50:00')
ds_xr = ds_xr.sel(time=time_sel)
len(ds_xr.time.values)
11059
var2load = ['range','alt3D', 'azimuth', 'zhh14', 'zhh14SP', 'vel14',
'vel14SP', 'ldrhh14', 'ldrhh14SP', 'DR']
tasks = ds_xr[var2load].chunk({'time':1, 'cross_track':-1, 'range':-1})\
.map_blocks(multiple_plot, template=xr.ones_like(ds_xr.time).chunk({"time": 1}))
%%time
res = tasks.compute()
## entra en el Except
CPU times: user 4min 32s, sys: 22.8 s, total: 4min 55s Wall time: 4min 54s
task_delayed = [delayed(multiple_plot)(ds_xr[var2load].sel(time=i)) for i in ds_xr.time]
len(task_delayed)
%%time
res = dask.compute(*task_delayed)