#!/usr/bin/env python # coding: utf-8 # # Processing ERA5 data in NetCDF Format # # This notebook demonstrates how to work with the ECMWF ERA5 reanalysis available as part of the AWS Public Dataset Program (https://registry.opendata.aws/ecmwf-era5/). # This notebook utilizes Amazon SageMaker & AWS Fargate for providing an environment with a Jupyter notebook and Dask cluster. There is an example AWS CloudFormation template available at https://github.com/awslabs/amazon-asdi/tree/main/examples/dask for quickly creating this environment in your own AWS account to run this notebook. # ## Python Imports # In[1]: get_ipython().run_line_magic('matplotlib', 'inline') import boto3 import botocore import datetime import matplotlib.pyplot as plt import matplotlib import xarray as xr import numpy as np import s3fs import fsspec import dask from dask.distributed import performance_report, Client, progress font = {'family' : 'sans-serif', 'weight' : 'normal', 'size' : 18} matplotlib.rc('font', **font) # ## Scale out Dask Workers # In[2]: ecs = boto3.client('ecs') resp = ecs.list_clusters() clusters = resp['clusterArns'] if len(clusters) > 1: print("Please manually select your cluster") cluster = clusters[0] cluster # You will need to update the `--cluster` option in the comamnd below to make your cluster name from above # In[13]: numWorkers=70 ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers) ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker']) # # Set up the Dask Client to talk to our Fargate Dask Distributed Cluster # In[14]: client = Client('Dask-Scheduler.local-dask:8786') client # In[15]: # Make garbage collection explicit to prevent deadlocks import gc from distributed import WorkerPlugin class WorkerExplicitGC(WorkerPlugin): def setup(self, worker): self.worker = worker gc.disable() def transition(self, key, start, finish, *args, **kwargs): if finish == 'executing': self.worker._throttled_gc.collect() if gc.isenabled(): gc.disable() plugin = WorkerExplicitGC() client.register_worker_plugin(plugin) # ## Open an Example File and Check the Native Chunking # # We want to chunk in a similar way for maximum performance # In[16]: url = 's3://era5-pds/2010/01/data/air_temperature_at_2_metres.nc' ncfile = fsspec.open(url) ds = xr.open_dataset(ncfile.open()) ds.air_temperature_at_2_metres.encoding # ## Open 2-m air temperature as a single dataset # In[17]: start_year = 1979 end_year = 2020 years = list(np.arange(start_year, end_year+1, 1)) months = ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"] file_pattern = 's3://era5-pds/{year}/{month}/data/air_temperature_at_2_metres.nc' @dask.delayed def s3open(path): fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False) return fs.open(path) files_mapper = [s3open(file_pattern.format(year=year,month=month)) for year in years for month in months] # In[18]: get_ipython().run_cell_magic('time', '', "\nds = xr.open_mfdataset(files_mapper, engine='h5netcdf', chunks={'lon':200,'lat':200,'time0':720}, concat_dim='time0', combine='nested', coords='minimal', compat='override', parallel=True)\n") # In[19]: print('ds size in GB {:0.2f}\n'.format(ds.nbytes / 1e9)) ds.info # The `ds.info` output above shows us that there are four dimensions to the data: lat, lon, and time0; and two data variables: air_temperature_at_2_metres, and air_pressure_at_mean_sea_level. # ## Convert units to F from K # In[20]: ds['air_temperature_at_2_metres'] = (ds.air_temperature_at_2_metres - 273.15) * 9.0 / 5.0 + 32.0 ds.air_temperature_at_2_metres.attrs['units'] = 'F' # ## Calculate the mean 2-m air temperature for all times # In[21]: # calculates the mean along the time dimension temp_mean = ds['air_temperature_at_2_metres'].mean(dim='time0') # The expressions above didn’t actually compute anything. They just build the dask task graph. To do the computations, we call the `persist` method: # In[22]: temp_mean = temp_mean.persist() progress(temp_mean) # ### Plot Average Surface Temperature # In[23]: temp_mean.compute() temp_mean.plot(figsize=(20, 10)) plt.title('1979-2020 Mean 2-m Air Temperature') # ### Repeat for standard deviation # In[24]: temp_std = ds['air_temperature_at_2_metres'].std(dim='time0') # In[25]: temp_std = temp_std.persist() progress(temp_std) # In[15]: temp_std.compute() temp_std.plot(figsize=(20, 10)) plt.title('1979-2020 Standard Deviation 2-m Air Temperature') # ## Plot temperature time series for points # In[26]: # location coordinates locs = [ {'name': 'Santa Barbara', 'lon': -119.70, 'lat': 34.42}, {'name': 'Colorado Springs', 'lon': -104.82, 'lat': 38.83}, {'name': 'Honolulu', 'lon': -157.84, 'lat': 21.29}, {'name': 'Seattle', 'lon': -122.33, 'lat': 47.61}, ] # convert westward longitudes to degrees east for l in locs: if l['lon'] < 0: l['lon'] = 360 + l['lon'] locs # In[27]: ds_locs = xr.Dataset() air_temp_ds = ds # interate through the locations and create a dataset # containing the temperature values for each location for l in locs: name = l['name'] lon = l['lon'] lat = l['lat'] var_name = name ds2 = air_temp_ds.sel(lon=lon, lat=lat, method='nearest') lon_attr = '%s_lon' % name lat_attr = '%s_lat' % name ds2.attrs[lon_attr] = ds2.lon.values.tolist() ds2.attrs[lat_attr] = ds2.lat.values.tolist() ds2 = ds2.rename({'air_temperature_at_2_metres' : var_name}).drop(('lat', 'lon')) ds_locs = xr.merge([ds_locs, ds2]) ds_locs.data_vars # ### Convert to dataframe # In[28]: df_f = ds_locs.to_dataframe() df_f.describe() # ## Plot temperature timeseries # In[29]: ax = df_f.plot(figsize=(20, 10), title="ERA5", grid=1) ax.set(xlabel='Date', ylabel='2-m Air Temperature (deg F)') plt.show() # ## Cluster scale down # # When we are temporarily done with the cluster we can scale it down to save on costs # In[30]: numWorkers=0 ecs.update_service(cluster=cluster, service='Dask-Worker', desiredCount=numWorkers) ecs.get_waiter('services_stable').wait(cluster=cluster, services=['Dask-Worker']) # In[ ]: