Source code for adaptive_scheduler._server_support.slurm_run

from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable

from adaptive_scheduler.scheduler import SLURM, slurm_partitions
from adaptive_scheduler.utils import _get_default_args

from .common import console
from .run_manager import RunManager

if TYPE_CHECKING:
    import adaptive

    from adaptive_scheduler.utils import _DATAFRAME_FORMATS, EXECUTOR_TYPES, GoalTypes


[docs] def slurm_run( learners: list[adaptive.BaseLearner], fnames: list[str] | list[Path], *, partition: str | tuple[str, ...] | None = None, nodes: int | tuple[int, ...] = 1, cores_per_node: int | tuple[int, ...] | None = None, goal: GoalTypes | None = None, folder: str | Path = "", name: str = "adaptive", num_threads: int | tuple[int, ...] = 1, save_interval: float = 300, log_interval: float = 300, cleanup_first: bool = True, save_dataframe: bool = True, dataframe_format: _DATAFRAME_FORMATS = "pickle", max_fails_per_job: int = 50, max_simultaneous_jobs: int = 100, exclusive: bool | tuple[bool, ...] = True, executor_type: EXECUTOR_TYPES | tuple[EXECUTOR_TYPES, ...] = "process-pool", extra_scheduler: list[str] | tuple[list[str], ...] | None = None, extra_run_manager_kwargs: dict[str, Any] | None = None, extra_scheduler_kwargs: dict[str, Any] | None = None, initializers: list[Callable[[], None]] | None = None, ) -> RunManager: """Run adaptive on a SLURM cluster. ``cores``, ``nodes``, ``cores_per_node``, ``extra_scheduler``, ``executor_type``, ``extra_script``, ``exclusive``, ``extra_env_vars``, ``num_threads`` and ``partition`` can be either a single value or a tuple of values. If a tuple is given, then the length of the tuple should be the same as the number of learners (jobs) that are run. This allows for different resources for different jobs. Parameters ---------- learners A list of learners. fnames A list of filenames to save the learners. partition The partition to use. If None, then the default partition will be used. (The one marked with a * in `sinfo`). Use `adaptive_scheduler.scheduler.slurm_partitions` to see the available partitions. nodes The number of nodes to use. cores_per_node The number of cores per node to use. If None, then all cores on the partition will be used. goal The goal of the adaptive run. If None, then the run will continue indefinitely. folder The folder to save the adaptive_scheduler files such as logs, database, and ``.sbatch`` files in. name The name of the job. num_threads The number of threads to use. save_interval The interval at which to save the learners. log_interval The interval at which to log the status of the run. cleanup_first Whether to clean up the folder before starting the run. save_dataframe Whether to save the `pandas.DataFrame`s with the learners data. dataframe_format The format to save the `pandas.DataFrame`s in. See `adaptive_scheduler.utils.save_dataframes` for more information. max_fails_per_job The maximum number of times a job can fail before it is cancelled. max_simultaneous_jobs The maximum number of simultaneous jobs. executor_type The executor that is used, by default `concurrent.futures.ProcessPoolExecutor` is used. One can use ``"ipyparallel"``, ``"dask-mpi"``, ``"mpi4py"``, ``"loky"``, ``"sequential"``, or ``"process-pool"``. exclusive Whether to use exclusive nodes, adds ``"--exclusive"`` if True. extra_scheduler Extra ``#SLURM`` (depending on scheduler type) arguments, e.g. ``["--exclusive=user", "--time=1"]`` or a tuple of lists, e.g. ``(["--time=10"], ["--time=20"]])`` for two jobs. extra_run_manager_kwargs Extra keyword arguments to pass to the `RunManager`. extra_scheduler_kwargs Extra keyword arguments to pass to the `adaptive_scheduler.scheduler.SLURM`. initializers List of functions that are called before the job starts, can populate a cache. Returns ------- RunManager """ if partition is None: partitions = slurm_partitions() assert isinstance(partitions, dict) partition, ncores = next(iter(partitions.items())) console.log( f"Using default partition {partition} (The one marked" f" with a '*' in `sinfo`) with {ncores} cores." " Use `adaptive_scheduler.scheduler.slurm_partitions`" " to see the available partitions.", ) if executor_type == "process-pool" and ( nodes > 1 if isinstance(nodes, int) else any(n > 1 for n in nodes) ): msg = ( "process-pool can maximally use a single node," " use e.g., ipyparallel for multi node.", ) raise ValueError(msg) folder = Path(folder) folder.mkdir(parents=True, exist_ok=True) if cores_per_node is None: partitions = slurm_partitions() assert isinstance(partitions, dict) cores_per_node = ( tuple(partitions[p] for p in partition) # type: ignore[misc] if isinstance(partition, tuple) else partitions[partition] ) if extra_scheduler_kwargs is None: extra_scheduler_kwargs = {} if extra_scheduler is not None: # "extra_scheduler" used to be passed via the extra_scheduler_kwargs # this ensures backwards compatibility assert "extra_scheduler" not in extra_scheduler_kwargs extra_scheduler_kwargs["extra_scheduler"] = extra_scheduler slurm_kwargs = dict( _get_default_args(SLURM), nodes=nodes, cores_per_node=cores_per_node, partition=partition, log_folder=folder / "logs", batch_folder=folder / "batch_scripts", executor_type=executor_type, num_threads=num_threads, exclusive=exclusive, **extra_scheduler_kwargs, ) scheduler = SLURM(**slurm_kwargs) # Below are the defaults for the RunManager kw = dict( _get_default_args(RunManager), scheduler=scheduler, learners=learners, fnames=fnames, goal=goal, save_interval=save_interval, log_interval=log_interval, move_old_logs_to=folder / "old_logs", db_fname=folder / f"{name}.db.json", job_name=name, cleanup_first=cleanup_first, save_dataframe=save_dataframe, dataframe_format=dataframe_format, max_fails_per_job=max_fails_per_job, max_simultaneous_jobs=max_simultaneous_jobs, initializers=initializers, ) if extra_run_manager_kwargs is None: extra_run_manager_kwargs = {} return RunManager(**dict(kw, **extra_run_manager_kwargs))