#!/usr/bin/env python # coding: utf-8 # ## Scale with Dask # # The [Planetary Computer Hub](http://planetarycomputer.microsoft.com/compute) is a JupyterHub paired with [Dask Gateway](https://gateway.dask.org/) for easily creating Dask clusters to distribute your computation on a cluster of machines. # ### Creating a cluster # # Use `dask_gateway.GatewayCluster` to quickly create a Dask cluster. # In[1]: import dask_gateway cluster = dask_gateway.GatewayCluster() client = cluster.get_client() cluster.scale(4) print(cluster.dashboard_link) # **Don't forget the `client = cluster.get_client()` line.** That's what actually ensures the cluster will be used for computations using Dask. Otherwise, you'll end up using Dask's [local scheduler](https://docs.dask.org/en/stable/10-minutes-to-dask.html#scheduling). This will run the computation on using multiple threads on a single machine, rather than the cluster. When you're using a cluster, make sure to always use [the Dashboard](https://docs.dask.org/en/stable/dashboard.html) (more below). If you aren't seeing any tasks in the dashboard, you might have forgotten to create a Dask client. # # # ### Open the dashboard # # The [Dask Dashboard](https://docs.dask.org/en/latest/diagnostics-distributed.html) provides invaluable information on the activity of your cluster. Clicking the "Dashboard" link above will open the Dask dashboard a new browser tab. # # Dask Dashboard in a new tab. # # We also include the [dask-labextension](https://github.com/dask/dask-labextension) for laying out the Dask dashboard as tabs in the Jupyterlab workspace. # # Dask Dashboard in jupyterlab. # # To using the dask-labextension, copy the "Dashboard" address from the cluster repr, click the orange Dask logo on the lefthand navigation bar, and paste the dashboard address # # You can close your cluster, freeing up its resources, by calling `cluster.close()`. # In[2]: cluster.close() # ### Autoscale the cluster to your workload # # Dask Clusters can [automatically adapt](https://docs.dask.org/en/latest/setup/adaptive.html) the cluster size based on the size of the workload. Use `cluster.adapt(minimum, maximum)` to enable adaptive mode. # In[3]: import dask_gateway cluster = dask_gateway.GatewayCluster() client = cluster.get_client() cluster.adapt(minimum=2, maximum=50) # Dask will add workers as necessary when a computation is submitted. As an example, we'll compute the minimum daily temperature averaged over all of Hawaii, using the [Daymet dataset](http://aka.ms/ai4edata-daymet). # In[4]: import pystac_client import planetary_computer import xarray as xr account_name = "daymeteuwest" container_name = "daymet-zarr" catalog = pystac_client.Client.open( "https://planetarycomputer.microsoft.com/api/stac/v1", modifier=planetary_computer.sign_inplace, ) asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"] ds = xr.open_zarr( asset.href, **asset.extra_fields["xarray:open_kwargs"], storage_options=asset.extra_fields["xarray:storage_options"] ) ds # The `.compute()` call in the next cell is what triggers computation and causes Dask to scale the cluster up to a dozen or so workers. # In[5]: timeseries = ds["tmin"].mean(dim=["x", "y"]).compute() import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(12, 6)) timeseries.plot(ax=ax); # In[6]: cluster.close() # ### Customize your cluster # # `dask_gateway.GatewayCluster` creates a cluster with some default settings, which might not be appropriate for your workload. For example, we might have a memory-intensive workload which requires more memory per CPU core. Or we might need to set environment variables on the workers. # # To customize your cluster, create a `Gateway` object and then customize the options. # In[7]: import dask_gateway gateway = dask_gateway.Gateway() cluster_options = gateway.cluster_options() # Cluster options. # # In a Jupyter Notebook, you can use the HTML widget to customize the options. Or using Python you can adjust the values programmatically. We'll ask for 16GiB of memory per worker. # In[8]: cluster_options["worker_memory"] = 16 # Now create your cluster. **Make sure to pass the `cluster_options` object to `gateway.new_cluster`**. # In[9]: cluster = gateway.new_cluster(cluster_options) client = cluster.get_client() cluster.scale(2) # ### Learn more # # The [Dask documentation](https://docs.dask.org/en/latest/) has much more information on using Dask for scalable computing. This JupyterHub deployment uses [Dask Gateway](https://gateway.dask.org/) to manage creating Dask clusters.