#!/usr/bin/env python # coding: utf-8 # # Processing CMIP6 data in Zarr format with Dask & AWS Fargate # # This notebook demonstrates how to work with the CMIP6 Zarr data available as part of the AWS Open Data Program (https://registry.opendata.aws/cmip6/). # This notebook utilizies Amazon SageMaker & AWS Fargate for providing an environment with a Jupyter notebook and Dask cluster. There is an example AWS CloudFormation template available at https://github.com/awslabs/amazon-asdi/tree/main/examples/dask for quickly creating this environment in your own AWS account to run this notebook. # ## Python Imports # In[1]: get_ipython().run_line_magic('matplotlib', 'inline') import boto3 import botocore import datetime import matplotlib.pyplot as plt import matplotlib import xarray as xr import numpy as np import s3fs import fsspec import dask from dask.distributed import performance_report, Client, progress font = {'family' : 'sans-serif', 'weight' : 'normal', 'size' : 18} matplotlib.rc('font', **font) # ## Scale out Dask Workers # In[2]: ecs = boto3.client('ecs') resp = ecs.list_clusters() clusters = resp['clusterArns'] if len(clusters) > 1: print("Please manually select your cluster") cluster = clusters[0] cluster # In[3]: # Scale up the Fargate cluster numWorkers=70 ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers) ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker']) # # Set up the Dask Client to talk to our Fargate Dask Distributed Cluster # In[4]: client = Client('Dask-Scheduler.local-dask:8786') client # ## Open 2-m air temperature for a single ensemble member from the GFDL-ESM4 # In[5]: @dask.delayed def s3open(path): fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False, config_kwargs = {'max_pool_connections': 50}) return s3fs.S3Map(path, s3=fs) # In[6]: get_ipython().run_cell_magic('time', '', "files_mapper = [s3open('s3://cmip6-pds/CMIP6/ScenarioMIP/NOAA-GFDL/GFDL-ESM4/ssp119/r1i1p1f1/day/tas/gr1/v20180701/')]\nds = xr.open_mfdataset(files_mapper, engine='zarr', parallel=True)\n") # In[7]: print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9)) ds.info # The `ds.info` output above shows us that there are four dimensions to the data: lat, lon, and time0; and two data variables: air_temperature_at_2_metres, and air_pressure_at_mean_sea_level. # ## Convert units to F from K # In[8]: ds['tas'] = (ds.tas - 273.15) * 9.0 / 5.0 + 32.0 ds.tas.attrs['units'] = 'F' # ## Calculate the mean 2-m air temperature for all times # In[9]: # calculates the mean along the time dimension temp_mean = ds['tas'].mean(dim='time') # The expressions above didn’t actually compute anything. They just build the dask task graph. To do the computations, we call the `compute` method: # In[10]: temp_mean = temp_mean.persist() progress(temp_mean) # ### Plot Average Surface Temperature # In[11]: temp_mean.compute() temp_mean.plot(figsize=(20, 10)) plt.title('2015-2100 Mean 2-m Air Temperature') # ### Repeat for standard deviation # In[12]: temp_std = ds['tas'].std(dim='time') # In[13]: get_ipython().run_line_magic('time', 'temp_std = temp_std.compute()') # In[14]: temp_std.plot(figsize=(20, 10)) plt.title('2015-2100 Standard Deviation 2-m Air Temperature') # ## Plot temperature time series for points # In[15]: # location coordinates locs = [ {'name': 'Santa Barbara', 'lon': -119.70, 'lat': 34.42}, {'name': 'Colorado Springs', 'lon': -104.82, 'lat': 38.83}, {'name': 'Honolulu', 'lon': -157.84, 'lat': 21.29}, {'name': 'Seattle', 'lon': -122.33, 'lat': 47.61}, ] # convert westward longitudes to degrees east for l in locs: if l['lon'] < 0: l['lon'] = 360 + l['lon'] locs # In[16]: ds_locs = xr.Dataset() air_temp_ds = ds # interate through the locations and create a dataset # containing the temperature values for each location for l in locs: name = l['name'] lon = l['lon'] lat = l['lat'] var_name = name ds2 = air_temp_ds.sel(lon=lon, lat=lat, method='nearest') lon_attr = '%s_lon' % name lat_attr = '%s_lat' % name ds2.attrs[lon_attr] = ds2.lon.values.tolist() ds2.attrs[lat_attr] = ds2.lat.values.tolist() ds2 = ds2.rename({'tas' : var_name}).drop(('lat', 'lon', 'height', 'lat_bnds', 'lon_bnds')) ds_locs = xr.merge([ds_locs, ds2], compat='override') ds_locs.data_vars # ### Convert to dataframe # In[17]: df_f = ds_locs.to_dataframe() df_f.describe() # ## Plot the entire time series of temperatures # In[18]: ax = df_f.plot(figsize=(20, 10), title="GFDL-ESM4/ssp119/r1i1p1f1", grid=1) ax.set(xlabel='Date', ylabel='2-m Air Temperature (deg F)') plt.show() # ## Cluster scale down # # When we are temporarily done with the cluster we can scale it down to save on costs # In[19]: numWorkers=0 ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers) ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker']) # In[ ]: