Access the 63,341 NetCDF 64-bit offset files from the HYbrid Coordinate Ocean Model (HYCOM) Global Ocean Forecast System Reanalysis (1994-2015) on AWS Open Data as a single Kerchunk-generated virtual dataset
import fsspec
import xarray as xr
import hvplot.xarray
import intake
Open using Intake:
cat = intake.open_catalog('https://ncsa.osn.xsede.org/esip/rsignell/hycom.yaml')
%%time
ds = cat['gofs-3pt1'].read()
CPU times: user 2.54 s, sys: 414 ms, total: 2.96 s Wall time: 6.2 s
# The much more verbose way non-Intake way to open the dataset using the xarray kerchunk engine:
# combined_parquet_aws = 's3://esip/rsignell/hycom.parq'
# so = dict(anon=True) # data stored on AWS Open Data S3
# to = dict(anon=True, # refs stored on OSN
# client_kwargs={'endpoint_url': 'https://ncsa.osn.xsede.org'})
# ds = xr.open_dataset(combined_parquet_aws, engine='kerchunk', chunks={},
# backend_kwargs=dict(storage_options=dict(target_options=to,
# remote_protocol='s3', lazy=True, remote_options=so)))
Case 1: Read a 3D field at a specific time step.
We don't need a cluster for this since just one chunk of data (24MB) is actually loaded
ds['water_temp'].sel(depth=0, time='2012-10-29 17:00', method='nearest')
<xarray.DataArray 'water_temp' (lat: 3251, lon: 4500)> Size: 117MB dask.array<getitem, shape=(3251, 4500), dtype=float64, chunksize=(3251, 4500), chunktype=numpy.ndarray> Coordinates: depth float64 8B 0.0 * lat (lat) float64 26kB -80.0 -79.96 -79.92 -79.88 ... 89.92 89.96 90.0 * lon (lon) float64 36kB -180.0 -179.9 -179.8 ... 179.8 179.8 179.9 time datetime64[ns] 8B 2012-10-29T18:00:00 Attributes: NAVO_code: 15 comment: in-situ temperature long_name: Water Temperature standard_name: sea_water_temperature units: degC
%%time
da = ds['water_temp'].sel(depth=0, time='2012-10-29 17:00', method='nearest').load()
CPU times: user 3.27 s, sys: 617 ms, total: 3.88 s Wall time: 6.05 s
da.hvplot.quadmesh(x='lon', y='lat', geo=True, global_extent=True, tiles='ESRI', cmap='viridis', rasterize=True)
Case 2: Load a time series at a specific location.
Because each chunk only contains one time value, we want to read chunks in parallel using a Dask cluster. Here we use coiled.io to generate a cluster on AWS in the same region as the data. We also specify a cheap ARM instance type to keep the cost low.
cluster_type = 'Coiled'
if cluster_type == 'Local':
from dask.distributed import Client
client = Client()
if cluster_type == 'Coiled':
import coiled
cluster = coiled.Cluster(
region="us-west-2",
arm=True,
worker_vm_types=["t4g.small"], # cheap, small ARM instances, 2cpus, 2GB RAM
worker_options={'nthreads':2},
n_workers=100,
wait_for_workers=True,
compute_purchase_option="spot_with_fallback",
name='hycom',
software='esip-pangeo-arm',
workspace='esip-lab',
timeout=300 # leave cluster running for 5 min in case we want to use it again
)
client = cluster.get_client()
Output()
Could not get token from client GCP session. This is not a concern unless you're planning to use forwarded GCP credentials on your cluster. The error was: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started
We open the dataset again and tell Dask to load 20 time values (20 chunks) for each task. Loading multiple time steps means we only incur object storage latency once for each task. Each task will use more memory, however. We picked 20 because it fits within memory on the 2GB ARM t4g.small
instance types (and larger than 20 doesn't give significant performance benefit).
ds = cat['gofs-3pt1'].read(chunks={"time": 20}) # Intake way to re-open the dataset with different Dask chunks
# non-Intake way to re-open the dataset with different Dask chunks:
# ds = xr.open_dataset(combined_parquet_aws, engine='kerchunk', chunks={'time':20},
# backend_kwargs=dict(storage_options=dict(target_options=to,
# remote_protocol='s3', lazy=True, remote_options=so)))
Extract the time series:
%%time
da = ds['water_temp'].isel(depth=0).sel(lon=-69.6, lat=42.5, method='nearest').load(retries=10)
CPU times: user 2.21 s, sys: 259 ms, total: 2.47 s Wall time: 2min 30s
Interactive visualization of the time series:
da.hvplot(x='time', grid=True)