#!/usr/bin/env python # coding: utf-8 # # Explore the National Water Model Reanalysis # Use [Xarray](http://xarray.pydata.org/en/stable/), [Dask](https://dask.org) and [hvPlot](https://hvplot.holoviz.org) from the [HoloViz](https://holoviz.org) tool suite to explore the National Water Modle Reanalysis Version 2. We read from a cloud-optimized [Zarr](https://zarr.readthedocs.io/en/stable/) dataset that is part of the [AWS Open Data Program](https://aws.amazon.com/opendata/), and we use a Dask cluster to parallelize computation and reading of data chunks. # In[1]: import xarray as xr import fsspec import numpy as np # In[2]: import hvplot.pandas import hvplot.xarray import geoviews as gv from holoviews.operation.datashader import rasterize import cartopy.crs as ccrs # ### Start a Dask cluster # This is not required, but speeds up computations. Once can start a local cluster by just doing: # ``` # from dask.distributed import Client # client = Client() # ``` # but there are [many other ways to set up Dask clusters](https://docs.dask.org/en/latest/setup.html) that can scale larger than this. # # Since we used [Qhub](https://www.quansight.com/post/announcing-qhub) to install JupyterHub with a Dask Gateway running on Kubernetes, we can start a Dask cluster (with a specified environment and worker profile), scale it, and connect to it thusly: # In[3]: from dask_gateway import Gateway from dask.distributed import Client gateway = Gateway() # see Gateway options to use in new_cluster by doing: gateway.cluster_options() cluster = gateway.new_cluster(environment='pangeo', profile='Pangeo Worker') cluster.scale(20) client = Client(cluster) cluster #client.close();cluster.shutdown() # shutdown client and cluster # Open Zarr datasets in Xarray using a mapper from fsspec. We use `anon=True` for free-access public buckets like the AWS Open Data Program, and `requester_pays=True` for requester-pays public buckets. # In[4]: url = 's3://noaa-nwm-retro-v2-zarr-pds' # In[5]: get_ipython().run_cell_magic('time', '', 'ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)\n') # In[6]: var='streamflow' # In[7]: ds[var] # In[8]: print(f'Variable size: {ds[var].nbytes/1e12:.1f} TB') # ### Find the site with the largest streamflow on June 1, 2017 # In[9]: get_ipython().run_cell_magic('time', '', "imax = ds[var].sel(time='2017-06-01 00:00:00').argmax().values\n") # Let's plot the whole hindcast time series at that location # In[10]: get_ipython().run_cell_magic('time', '', 'ds[var][:,imax].hvplot(grid=True)\n') # ### Compute mean discharge during April 2010 on all rivers # In[11]: streamflow_April_2010 = ds[var].sel(time=slice('2010-04-01 00:00','2010-04-30 23:00')) # In[12]: print(f'Variable size: {streamflow_April_2010.nbytes/1e9:.1f} GB') # In[13]: get_ipython().run_cell_magic('time', '', "var_mean = streamflow_April_2010.mean(dim='time').compute()\n") # ### Visualize the mean discharge with hvplot # Convert Xarray to Pandas dataframe so we can use hvplot.points for visualization # In[14]: df = var_mean.to_pandas().to_frame() # The dataframe just has streamflow, so add longitude and latitude as columns # In[15]: df = df.assign(latitude=ds['latitude']) df = df.assign(longitude=ds['longitude']) df.rename(columns={0: "transport"}, inplace=True) # In[16]: p = df.hvplot.points('longitude', 'latitude', crs=ccrs.PlateCarree(), c='transport', colorbar=True, size=14) # We don't want to plot all the 2.7M points individually, so aggregate to 0.02 degree resolution and rasterize with datashader. Use a log scale for visualization since there is a large dynamic range in streamflow. # In[17]: g = rasterize(p, aggregator='mean', x_sampling=0.02, y_sampling=0.02, width=500).opts(tools=['hover'], aspect='equal', logz=True, cmap='viridis', clim=(1e-2, np.nan)) # Plot the rasterized streamflow data on an OpenStreetMap tile service basemap # In[18]: g * gv.tile_sources.OSM # In[ ]: # client.close(); cluster.shutdown()