#!/usr/bin/env python # coding: utf-8 # # Xarray Dask Example Notebook optimized for terrabyte Portal use # This notebook can be run in the micromamba default Environment or you can create your own user Environment (called tutorialEnv in the example) using: # # `!micromamba install -y -n tutorialEnv jupyter xarray rioxarray odc-stac odc-geo pystac-client dask folium geopandas zarr jupyter-server-proxy` # # and then starting a new Jupyter Environment from the portal specifying the Name of your newly created env in the custom Environment Field # In[1]: #!micromamba install -y -n tutorialEnv jupyter xarray rioxarray odc-stac odc-geo pystac-client dask folium geopandas zarr jupyter-server-proxy # In[1]: import os import time import socket import xarray import rioxarray from odc import stac as odc_stac from odc.geo import geobox from pystac_client import Client as pystacclient import dask from dask.distributed import Client import folium import folium.plugins as folium_plugins import geopandas as gpd import numpy as np # In[3]: import pystac_client # In[4]: pystac_client.__version__ # In[5]: #for now we use the Home Folder ~ dir_out = '~/xarray-dask-tutorial' # Stac settings catalog_url = 'https://stac.terrabyte.lrz.de/public/api' collection = 'sentinel-2-c1-l2a' # input data settings year = 2023 resolution = 60 mgrs = 'MGRS-32UPU' max_cloud_cover = 60 bands = ['nir', 'red'] filename_base = f'S2_y{year}_res{resolution}_median_{mgrs}.zarr' filename = os.path.join(dir_out, filename_base) dask_tmpdir = os.path.join(dir_out, 'scratch', 'localCluster') dask_chunksize=1024 # from testins running without threads is the faster option dask_threads = 1 # # Explore the STAC catalog # In[13]: catalog = pystacclient.open(catalog_url) # list the IDs of all STAC catalog collections for coll in catalog.get_all_collections(): print(coll.id) # # Query data from the STAC catalog # In[15]: query = { 'eo:cloud_cover': { "gte": 0, "lte": max_cloud_cover }, 'grid:code': {'eq': mgrs} } start = f'{year}-01-01T00:00:00Z' end = f'{year}-12-31T23:59:59Z' resultsS2Stac = catalog.search(collections=[collection], datetime=[start, end], query=query, limit=500) items = list(resultsS2Stac.items()) print(f'Found {len(items)} Scenes') # # Visualize the covered area # In[8]: map = folium.Map() layer_control = folium.LayerControl(position='topright', collapsed=True) fullscreen = folium_plugins.Fullscreen() style = {'fillColor': '#00000000', "color": "#0000ff", "weight": 1} footprints = folium.GeoJson( gpd.GeoDataFrame.from_features([item.to_dict() for item in items]).to_json(), name='Stac Item footprints', style_function=lambda x: style, control=True ) footprints.add_to(map) layer_control.add_to(map) fullscreen.add_to(map) map.fit_bounds(map.get_bounds()) map # # Load the STAC items into xarray # In[9]: items[1] # In[16]: cube = odc_stac.load( items, bands=bands, resolution=resolution, groupby='solar_day', chunks={'x': dask_chunksize, 'y': dask_chunksize}, # spatial chunking anchor=geobox.AnchorEnum.FLOATING # preserve original pixel grid ) # temporal chunking cube = cube.chunk(chunks={'time': -1}) # write CF-compliant CRS representation #cube = cube.rio.write_crs(cube.coords['spatial_ref'].values) cube # In[17]: cube["ndvi"]=(cube.nir - cube.red)/(cube.nir + cube.red) cube # # Define the computation # In[18]: median = (cube.quantile(0.5, dim='time', skipna=True, keep_attrs=True) .rename({b: f'{b}_median' for b in list(cube.keys())})) median # # Start the dask client # Here we are starting the dask client for scaling the computation to the available resources. # Once started, a link to the dask dashboard will be shown which will display details on the dask computation status. # In[19]: host = os.getenv('host') jl_port = os.getenv('port') #create to URL to point to the jupyter-server-proxy dask_url = f'https://portal.terrabyte.lrz.de/node/{host}/{jl_port}'+'/proxy/{port}/status' #dask will insert the final port choosen by the Cluster dask.config.set({'temporary_directory': dask_tmpdir, 'distributed.dashboard.link': dask_url}) #some settings to increase network timeouts and allow the dashboard to plot larger graphs dask.config.set({'distributed.comm.timeouts.tcp': '180s', 'distributed.comm.timeouts.connect': '120s', 'distributed.dashboard.graph-max-items': 55000, 'distributed.deploy.lost-worker-timeout': '90s', 'distributed.scheduler.allowed-failures': 180, }) #we set the dashboard address for dask to choose a free random port, so there is no error with multiple dasks running on same node client = Client(threads_per_worker=dask_threads, dashboard_address="127.0.0.1:0") client # # Start the computation # Here the actual computation is started and the result written to the output file. # Check the dask dashboard for computation progress. # In[20]: get_ipython().run_cell_magic('time', '', "#ignore invalid value encountered in divide warning (ndvi divide by zero)\ndelayed = median.to_zarr(filename, mode='w', compute=False, consolidated=True)\ndask.compute(delayed)\n") # In[21]: client.cluster.close() time.sleep(5) client.close() # # Load and Visualize the result from File # In[22]: result = xarray.open_zarr(filename) result # In[23]: result.ndvi_median.plot(robust=True) # In[24]: result['red_median'].plot() # In[25]: result['nir_median'].plot()