Python API

Python API

Lifecycle

dask_ctl.lifecycle.get_cluster(name[, ...])

Get a cluster by name.

dask_ctl.lifecycle.create_cluster([...])

Create a cluster from a spec file.

dask_ctl.lifecycle.scale_cluster(name, n_workers)

Scale a cluster by name.

dask_ctl.lifecycle.delete_cluster(name)

Close a cluster by name.

dask_ctl.lifecycle.list_clusters()

List all clusters.

dask_ctl.lifecycle.get_cluster(name: str, asynchronous=False) distributed.deploy.cluster.Cluster

Get a cluster by name.

name

Name of cluster to get a cluster manager for.

asynchronous

Return an awaitable instead of starting a loop.

Cluster

Cluster manager representing the named cluster.

>>> from dask.distributed import LocalCluster  
>>> cluster = LocalCluster(scheduler_port=8786)  
>>> get_cluster("proxycluster-8786")  
ProxyCluster(proxycluster-8786, 'tcp://localhost:8786', workers=4, threads=12, memory=17.18 GB)
dask_ctl.lifecycle.create_cluster(spec_path: Optional[str] = None, local_fallback: bool = False, asynchronous: bool = False) distributed.deploy.cluster.Cluster

Create a cluster from a spec file.

spec_path

Path to a cluster spec file. Defaults to dask-cluster.yaml.

local_fallback

Create a LocalCluster if spec file not found.

asynchronous

Start the cluster in asynchronous mode

Cluster

Cluster manager representing the spec.

With the spec:

# /path/to/spec.yaml
version: 1
module: "dask.distributed"
class: "LocalCluster"
>>> create_cluster("/path/to/spec.yaml")  
LocalCluster(b3973c71, 'tcp://127.0.0.1:8786', workers=4, threads=12, memory=17.18 GB)
dask_ctl.lifecycle.scale_cluster(name: str, n_workers: int) None

Scale a cluster by name.

Constructs a cluster manager for the named cluster and calls .scale(n_workers) on it.

name

Name of cluster to scale.

n_workers

Number of workers to scale to

>>> scale_cluster("mycluster", 10)  
dask_ctl.lifecycle.delete_cluster(name: str) None

Close a cluster by name.

Constructs a cluster manager for the named cluster and calls .close() on it.

name

Name of cluster to close.

>>> delete_cluster("mycluster")  
dask_ctl.lifecycle.list_clusters() List[distributed.deploy.cluster.Cluster]

List all clusters.

Discover clusters and return a list of cluster managers representing each one.

list

List of cluster manager classes for each discovered cluster.

>>> from dask.distributed import LocalCluster  
>>> cluster = LocalCluster(scheduler_port=8786)  
>>> list_clusters()  
[ProxyCluster(proxycluster-8786, 'tcp://localhost:8786', workers=4, threads=12, memory=17.18 GB)]
dask_ctl.lifecycle.get_snippet(name: str) str

Get a code snippet for connecting to a cluster.

name

Name of cluster to get a snippet for.

str

Code snippet.

>>> from dask.distributed import LocalCluster  
>>> cluster = LocalCluster(scheduler_port=8786)  
>>> get_snippet("proxycluster-8786")  
from dask.distributed import Client
from dask_ctl.proxy import ProxyCluster

cluster = ProxyCluster.from_name(“proxycluster-8786”) client = Client(cluster)

Discovery

dask_ctl.discovery.discover_cluster_names([...])

Generator to discover cluster names.

dask_ctl.discovery.discover_clusters([discovery])

Generator to discover clusters.

dask_ctl.discovery.list_discovery_methods()

Lists registered discovery methods.

async dask_ctl.discovery.discover_cluster_names(discovery: Optional[str] = None) AsyncIterator[Tuple[str, Callable]]

Generator to discover cluster names.

Cluster discovery methods are asynchronous. This async generator iterates through each discovery method and then iterates through each cluster name discovered.

Can also be restricted to a specific disovery method.

discovery

Discovery method to use, as listed in list_discovery_methods(). Default is None which uses all discovery methods.

tuple

Each tuple contains the name of the cluster and a class which can be used to represent it.

>>> from dask.distributed import LocalCluster  
>>> cluster = LocalCluster(scheduler_port=8786)  
>>> [name async for name in discover_cluster_names()]  
[('proxycluster-8786', dask_ctl.proxy.ProxyCluster)]
async dask_ctl.discovery.discover_clusters(discovery=None) AsyncIterator[distributed.deploy.spec.SpecCluster]

Generator to discover clusters.

This generator takes the names and classes output from discover_cluster_names() and constructs the cluster object using the cls.from_name(name) classmethod.

Can also be restricted to a specific disovery method.

discovery

Discovery method to use, as listed in list_discovery_methods(). Default is None which uses all discovery methods.

Cluster

Cluster manager classes for each discovered cluster.

>>> from dask.distributed import LocalCluster  
>>> cluster = LocalCluster(scheduler_port=8786)  
>>> [name async for name in discover_clusters()]  
[ProxyCluster(proxycluster-8786, 'tcp://localhost:8786', workers=4, threads=12, memory=17.18 GB)]
dask_ctl.discovery.list_discovery_methods() Dict[str, Callable]

Lists registered discovery methods.

Dask cluster discovery methods are registered via the dask_cluster_discovery entrypoint. This function lists all methods registered via that entrypoint.

dict

A mapping of discovery methods containing the functions themselves and metadata about where they came from.

>>> list_discovery_methods()  
{'proxycluster': {
    'discover': <function dask_ctl.proxy.discover()>,
    'package': 'dask-ctl',
    'version': '<package version>',
    'path': '<path to package>'}
}
>>> list(list_discovery_methods())  
['proxycluster']