import os
import time
import glob
import numpy as np
import xarray as xr
os.environ['ROOK_URL'] = 'http://rook.dkrz.de/wps'
# os.environ['ROOK_URL'] = 'http://localhost:5000/wps'
from rooki import rooki
import rooki.operators as ops
rooki.url
def time_it(func, *args, **kwargs):
s = time.time()
print(f'[INFO] Running: {func.__name__} with {args}, {kwargs}')
resp = func(*args, **kwargs)
e = time.time()
print(f'[INFO] Call to {func.__name__} with {args}, {kwargs}')
print(f' took: {e-s:.2f} seconds')
return resp
def _check_response(resp, var_id=None, maximum=None):
assert resp.size > 100
ds = resp.datasets()[0]
assert isinstance(ds, xr.Dataset)
if var_id and maximum:
mx = float(ds[var_id].max())
# assert np.isclose(mx, maximum)
if not np.isclose(mx, maximum):
print(f"[Warning] max={mx}, expected={maximum}")
def test_workflow_subset_time_time_c3s_cmip5_small_method1():
wf = ops.Subset(
ops.Subset(
ops.Input(
'tas', ['c3s-cmip5.output1.ICHEC.EC-EARTH.historical.day.atmos.day.r1i1p1.tas.latest']
),
time="1867-01-01/1867-12-30",
),
time="1867-04-01/1867-06-30",
)
resp = wf.orchestrate()
_check_response(resp, 'tas', 313.928)
time_it(test_workflow_subset_time_time_c3s_cmip5_small_method1)
# Took (s):
# CEDA SLURM: 55, 51, 46, 46, 40, 45, 39
# DKRZ VM: 78, 180
def test_workflow_subset_time_time_c3s_cmip5_small_method2():
wf = ops.Input('tas', ['c3s-cmip5.output1.ICHEC.EC-EARTH.historical.day.atmos.day.r1i1p1.tas.latest'])
wf = ops.Subset(wf, time='1867-01-01/1867-12-30')
wf = ops.Subset(wf, time='1867-04-01/1867-06-30')
resp = wf.orchestrate()
_check_response(resp, 'tas', 313.928)
time_it(test_workflow_subset_time_time_c3s_cmip5_small_method2)
# Took (s):
# CEDA: 46, 48
# DKRZ: 115, 130
def test_workflow_subset_time_time_c3s_cmip5_large():
wf = ops.Subset(
ops.Subset(
ops.Input(
'tas', ['c3s-cmip5.output1.ICHEC.EC-EARTH.historical.day.atmos.day.r1i1p1.tas.latest']
),
time="1850-01-01/1861-12-30",
),
time="1861-01-01/1861-02-15",
)
resp = wf.orchestrate()
_check_response(resp, 'tas', 310.861)
time_it(test_workflow_subset_time_time_c3s_cmip5_large)
# Took (s):
# CEDA:
# DKRZ: 88, 93, 91
Clean up if needed
def cleanup_tmpdir():
for dr in glob.glob('/tmp/tmp*'):
try:
os.removedirs(dr)
print(f'REMOVED tmpdir: {dr}')
except:
pass # did not delete content owned by others
# cleanup_tmpdir()