As we covered at the beginning Dask has the ability to run work on multiple machines using the distributed scheduler.
Until now we have actually been using the distributed scheduler for our work, but just on a single machine.
When we instantiate a Client()
object with no arguments it will attempt to locate a Dask cluster. It will check your local Dask config and environment variables to see if connection information has been specified. If not it will create an instance of LocalCluster
and use that.
Specifying connection information in config is useful for system administrators to provide access to their users. We do this in the Dask Helm Chart for Kubernetes, the chart installs a multi-node Dask cluster and a Jupyter server on a Kubernetes cluster and Jupyter is preconfigured to discover the distributed cluster.
Let's explore the LocalCluster
object ourselves and see what it is doing.
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
cluster
Creating a cluster object will create a Dask scheduler and a number of Dask workers. If no arguments are specified then it will autodetect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that.
You can also specify these arguments yourself. Let's have a look at the docstring to see the options we have available.
These arguments can also be passed to Client
and in the case where it creates a LocalCluster
they will just be passed on down the line.
?LocalCluster
Our cluster object has attributes and methods which we can use to access information about our cluster. For instance we can get the log output from the scheduler and all the workers with the get_logs()
method.
cluster.get_logs()
We can access the url that the Dask dashboard is being hosted at.
cluster.dashboard_link
In order for Dask to use our cluster we still need to create a Client
object, but as we have already created a cluster we can pass that directly to our client.
client = Client(cluster)
client
del client, cluster
A common way to distribute your work onto multiple machines is via SSH. Dask has a cluster manager which will handle creating SSH connections for you called SSHCluster
.
from dask.distributed import SSHCluster
When constructing this cluster manager we need to pass a list of addresses, either hostnames or IP addresses, which we will SSH into and attempt to start a Dask scheduler or worker on.
cluster = SSHCluster(["localhost", "hostA", "hostB"])
cluster
When we create our SSHCluster
object we have given a list of three hostnames.
The first host in the list will be used as the scheduler, all other hosts will be used as workers. If you're on the same network it wouldn't be unreasonable to set your local machine as the scheduler and then use other machines as workers.
If your servers are remote to you, in the cloud for instance, you may want the scheduler to be a remote machine too to avoid network bottlenecks.
Both of the clusters we have seen so far are fixed size clusters. We are either running locally and using all the resources in our machine, or we are using an explicit number of other machines via SSH.
With some cluster managers it is possible to increase and descrease the number of workers either by calling cluster.scale(n)
in your code where n
is the desired number of workers. Or you can let Dask do this dynamically by calling cluster.adapt(minimum=1, maximum=100)
where minimum and maximum are your preferred limits for Dask to abide to.
It is always good to keep your minimum to at least 1 as Dask will start running work on a single worker in order to profile how long things take and extrapolate how many additional workers it thinks it needs. Getting new workers may take time depending on your setup so keeping this at 1 or above means this profilling will start immediately.
We currently have cluster managers for Kubernetes, Hadoop/Yarn, cloud platforms and batch systems including PBS, SLURM and SGE.
These cluster managers allow users who have access to resources such as these to bootstrap Dask clusters on to them. If an institution wishes to provide a central service that users can request Dask clusters from there is also Dask Gateway.
The minimum requirements for a functioning Dask cluster is a scheduler process and one worker process.
We can start these processes manually via the CLI. Let's start with the scheduler.
$ dask-scheduler
2022-07-07 14:11:35,661 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-07 14:11:37,405 - distributed.scheduler - INFO - State start
2022-07-07 14:11:37,408 - distributed.scheduler - INFO - -----------------------------------------------
2022-07-07 14:11:37,409 - distributed.scheduler - INFO - Clear task state
2022-07-07 14:11:37,409 - distributed.scheduler - INFO - Scheduler at: tcp://10.51.100.80:8786
2022-07-07 14:11:37,409 - distributed.scheduler - INFO - dashboard at: :8787
Then we can connect a worker on the address that the scheduler is listening on.
$ dask-worker tcp://10.51.100.80:8786 --nworkers=auto
2022-07-07 14:12:53,915 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.51.100.80:58051'
2022-07-07 14:12:53,922 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.51.100.80:58052'
2022-07-07 14:12:53,924 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.51.100.80:58053'
2022-07-07 14:12:53,925 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.51.100.80:58054'
2022-07-07 14:12:55,222 - distributed.worker - INFO - Start worker at: tcp://10.51.100.80:58065
2022-07-07 14:12:55,222 - distributed.worker - INFO - Listening to: tcp://10.51.100.80:58065
2022-07-07 14:12:55,223 - distributed.worker - INFO - dashboard at: 10.51.100.80:58068
2022-07-07 14:12:55,223 - distributed.worker - INFO - Waiting to connect to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,223 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,223 - distributed.worker - INFO - Threads: 3
2022-07-07 14:12:55,223 - distributed.worker - INFO - Memory: 4.00 GiB
2022-07-07 14:12:55,224 - distributed.worker - INFO - Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-hlvac6m5
2022-07-07 14:12:55,225 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,227 - distributed.worker - INFO - Start worker at: tcp://10.51.100.80:58066
2022-07-07 14:12:55,227 - distributed.worker - INFO - Listening to: tcp://10.51.100.80:58066
2022-07-07 14:12:55,227 - distributed.worker - INFO - dashboard at: 10.51.100.80:58070
2022-07-07 14:12:55,227 - distributed.worker - INFO - Waiting to connect to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,227 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,227 - distributed.worker - INFO - Threads: 3
2022-07-07 14:12:55,228 - distributed.worker - INFO - Memory: 4.00 GiB
2022-07-07 14:12:55,228 - distributed.worker - INFO - Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-e1suf_7o
2022-07-07 14:12:55,229 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,231 - distributed.worker - INFO - Start worker at: tcp://10.51.100.80:58063
2022-07-07 14:12:55,233 - distributed.worker - INFO - Listening to: tcp://10.51.100.80:58063
2022-07-07 14:12:55,233 - distributed.worker - INFO - dashboard at: 10.51.100.80:58067
2022-07-07 14:12:55,233 - distributed.worker - INFO - Waiting to connect to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,233 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,234 - distributed.worker - INFO - Threads: 3
2022-07-07 14:12:55,234 - distributed.worker - INFO - Memory: 4.00 GiB
2022-07-07 14:12:55,235 - distributed.worker - INFO - Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-oq39ihb4
2022-07-07 14:12:55,236 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,246 - distributed.worker - INFO - Registered to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,246 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,249 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,264 - distributed.worker - INFO - Registered to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,264 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,267 - distributed.worker - INFO - Registered to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,267 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,267 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,269 - distributed.core - INFO - Starting established connection
2022-07-07 14:12:55,273 - distributed.worker - INFO - Start worker at: tcp://10.51.100.80:58064
2022-07-07 14:12:55,273 - distributed.worker - INFO - Listening to: tcp://10.51.100.80:58064
2022-07-07 14:12:55,273 - distributed.worker - INFO - dashboard at: 10.51.100.80:58069
2022-07-07 14:12:55,273 - distributed.worker - INFO - Waiting to connect to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,274 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,274 - distributed.worker - INFO - Threads: 3
2022-07-07 14:12:55,275 - distributed.worker - INFO - Memory: 4.00 GiB
2022-07-07 14:12:55,275 - distributed.worker - INFO - Local Directory: /Users/jtomlinson/Projects/dask/dask-tutorial/dask-worker-space/worker-zfie55ku
2022-07-07 14:12:55,276 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,299 - distributed.worker - INFO - Registered to: tcp://10.51.100.80:8786
2022-07-07 14:12:55,300 - distributed.worker - INFO - -------------------------------------------------
2022-07-07 14:12:55,302 - distributed.core - INFO - Starting established connection
Then in Python we can connect a client to this cluster and submit some work.
>>> from dask.distributed import Client
>>> client = Client("tcp://10.51.100.80:8786")
>>> client.submit(lambda: 1+1)
We can also do this in Python by importing cluster components and creating them directly.
from dask.distributed import Scheduler, Worker, Client
async with Scheduler() as scheduler:
async with Worker(scheduler.address) as worker:
async with Client(scheduler.address, asynchronous=True) as client:
print(await client.submit(lambda: 1 + 1))
Most of the time we never have to create these components ourselves and instead can rely on cluster manager objects to do this for us. But in some situations it can be useful to be able to contruct a cluster yourself manually.
You may also see a Nanny
process being referenced from time to time. This is a wrapper for the worker that handles restarting the process if it is killed. When we run dask-worker
via the CLI a nanny is automatically created for us.
By default Dask uses a custom TCP based remote procedure call protocol to communicate between processes. The scheduler and workers all listen on TCP ports for communication.
When you start a scheduler it typically listens on port 8786
. When a worker is created it listens on a random high port and communicates that port to the scheduler when it first connects.
The scheduler maintains a list of all workers and their address which can be accessed via the workers, therefore both the scheduler and any of the workers can open connections to any other worker at any time. Connections are closed automatically when not in use.
The Client
will only ever connect to the scheduler and all communication to the workers will pass through it. This means that when deploying Dask clusters the scheduler and workers must typically be on the same network and able to access each other via IP and port directly. But the client can run wherever as long as it can access the scheduler communication port. It is common to configure firewall rules or load balancers to provide access to just the scheduler port.
Dask also supports other network protocols such as TLS, websockets and UCX.
Dask cluster components can use certificates to mutually authenticate and communicate securely if run in an untrusted envronment. You can either generate certificates for the scheduler, worker and client automatically and distribute those or you can generate temporary credentials.
Some cluster managers such as dask-cloudprovider
will automatically enable TLS and generate one-time certificates when exposing clusters to the internet from the public cloud.
from dask.distributed import Scheduler, Worker, Client
from distributed.security import Security
security = Security.temporary()
async with Scheduler(security=security) as scheduler:
async with Worker(scheduler.address, security=security) as worker:
async with Client(
scheduler.address, security=security, asynchronous=True
) as client:
print(await client.submit(lambda: 1 + 1))
Dask can also communicate via websockets instead of TCP. There is a very small performance overhead to doing this but it means that the dashboard and communication happen on the same port and can be reverse proxied by a layer 7 proxy like nginx. This is necessary for some deployment scenarios where you cannot exprt ports but you can proxy web services.
On systems with high performance networking such as Infiniband or NVLink Dask can also leverage UCX which provides a unified communication protocol that automatically upgrades communication to use the fastest hardware available. This is vital for good performance on HPC systems with Infiniband or systems with multiple GPU workers.