Skip to content

Multi-node execution #51

@cpelley

Description

@cpelley

Options available

Manual running

module load scitools

# start scheduler
dask-scheduler --no-dashboard --protocol tcp &

# start workers
dask worker <scheduler-ip>:<port> --nworkers 4 --nthreads 1 &

# start client
python dask_computation.py --address <scheduler-ip>:<port>

Summary: This approach is highly beneficial in terms of an approach that separates creating the scheduler, workers and client.
Such an approach is likely the only out-of-the box approach when considering other schedulers (e.g. ray). The downside of this approach is that it is it does not enable the dynamic scaling of the cluster.

using dask-mpi (mpirun/mpiexec)

Not yet tested.

pip install dask-mpi
mpiexec -n 4 python dask_computation.py --mpi

Summary: I didn't bother with this approach since other schedulers (i.e. not dask) are unlikely to provide such mechanisms.

Using dask-jobqueue

pip install dask-jobqueue
total_memory = 60  # Total memory per worker job/node in GB
num_workers = 8  # Number of workers per worker job/node
num_nodes = 2
walltime = '00:10:00'  # Maximum runtime of the workers
# set cores and processes to be the same for single-threaded
cluster = PBSCluster(
    protocol="tcp",  # unincrypted
    cores=num_workers,  # Number of cores per worker == processes by default
    processes=num_workers,  # Number of processes per worker job/node
    memory=f"{total_memory} GB",  # Amount of memory total per worker job i.e. node
    walltime=walltime,  # Maximum runtime of the worker job
)
cluster.scale(jobs=num_nodes)  # Scale the number of worker jobs to the number of nodes
    # Parameters
    # ----------
    # n : int
    #    Target number of workers
    # jobs : int
    #    Target number of jobs
    # memory : str
    #    Target amount of memory
    # cores : int
    #    Target number of cores

or

cluster.scale(<number of workers>)

or

cluster.adapt(minimum=1, maximum=10)  # Automatically scale the number of workers based on the workload
    # Parameters
    # ----------
    # minimum : int
    #    Minimum number of workers to keep around for the cluster
    # maximum : int
    #    Maximum number of workers to keep around for the cluster
    # minimum_memory : str
    #    Minimum amount of memory for the cluster
    # maximum_memory : str
    #    Maximum amount of memory for the cluster
    # minimum_jobs : int
    #    Minimum number of jobs
    # maximum_jobs : int
    #    Maximum number of jobs

Summary: This approach provides greatest flexibility in terms of a dynamic cluster that can scale to requirements. However it does mean having the scheduler, workers and client managed centrally. This means less than ideal for sharing a clusters amongst independent programs. Also, another downside here is that other schedulers don't necessarily allow managing the cluster setup in this same way (ray).

Testing these approaches with a script

https://gist.github.com/cpelley/ff537e9dd5fb97ee681fa7207575330b

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions