adaptive_scheduler.scheduler module¶

Scheduler classes for Adaptive Scheduler.

class adaptive_scheduler.scheduler.BaseScheduler(cores, *, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='')[source]¶

Bases: ABC

Base object for a Scheduler.

  • cores (int | tuple[int, ...]) – Number of cores per job (so per learner.)

  • python_executable (str | None) – The Python executable that should run adaptive-scheduler. By default it uses the same Python as where this function is called.

  • log_folder (str | Path) – The folder in which to put the log-files.

  • mpiexec_executable (str | None) – mpiexec executable. By default mpiexec will be used (so probably from conda).

  • executor_type (Union[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], ...]]) – The executor that is used, by default concurrent.futures.ProcessPoolExecutor is used. One can use "ipyparallel", "dask-mpi", "mpi4py", "loky", "sequential", or "process-pool".

  • num_threads (int | tuple[int, ...]) – MKL_NUM_THREADS, OPENBLAS_NUM_THREADS, OMP_NUM_THREADS, and NUMEXPR_NUM_THREADS will be set to this number.

  • extra_scheduler (list[str] | tuple[list[str], ...] | None) – Extra #SLURM (depending on scheduler type) arguments, e.g. ["--exclusive=user", "--time=1"].

  • extra_env_vars (list[str] | tuple[list[str], ...] | None) – Extra environment variables that are exported in the job script. e.g. ["TMPDIR='/scratch'", "PYTHONPATH='my_dir:$PYTHONPATH'"].

  • extra_script (str | tuple[str, ...] | None) – Extra script that will be executed after any environment variables are set, but before the main scheduler is run.

  • batch_folder (str | Path) – The folder in which to put the batch files.

Return type:

BaseScheduler object.


The filename of the job script.

Return type:


cancel(job_names, *, with_progress_bar=True, max_tries=5)[source]¶

Cancel all jobs in job_names.

  • job_names (list[str]) – List of job names.

  • with_progress_bar (bool) – Display a progress bar using tqdm.

  • max_tries (int) – Maximum number of attempts to cancel a job.

Return type:


property ext: str¶

The extension of the job script.

extra_env_vars(*, index=None)[source]¶

Environment variables that need to exist in the job script.

Return type:


extra_scheduler(*, index=None)[source]¶

Scheduler options that go in the job script.

Return type:


extra_script(*, index=None)[source]¶

Script that will be run before the main scheduler.

Return type:



Get the job_ids from the job_names in the queue.

Return type:


abstract job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:



  • job_script – A job script that can be submitted to the scheduler.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs.

property launcher: Path¶

The filename of the log (with JOB_ID_VARIABLE).

Return type:



Scheduler output file names (with JOB_ID_VARIABLE).

Return type:


abstract queue(*, me_only=True)[source]¶

Get the current running and pending jobs.


me_only (bool) – Only see your jobs.


Mapping of job_id -> dict with name and state, for example {job_id: {"job_name": "TEST_JOB-1", "state": "R" or "Q"}}.

Return type:



This function might return extra information about the job, however this is not used elsewhere in this package.


Get the current running and pending jobs as a pandas.DataFrame.

Return type:


static sanatize_job_id(job_id)[source]¶

Sanatize the job_id.

Return type:


property single_job_script: bool¶
start_job(name, *, index=None)[source]¶

Writes a job script and submits it to the scheduler.

Return type:


property submit_cmd: str¶

Command to start a job, e.g. qsub fname.batch or sbatch fname.sbatch.

write_job_script(name, options, index=None)[source]¶

Writes a job script.

Return type:



alias of SLURM

class adaptive_scheduler.scheduler.LocalMockScheduler(cores, *, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='', mock_scheduler_kwargs=None)[source]¶

Bases: BaseScheduler

A scheduler that can be used for testing and runs locally.

extra_scheduler(*, index=None)[source]¶

Get the extra scheduler options.

Return type:


job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:



  • job_script – A job script that can be submitted to PBS.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs.


Currently, there is a problem that this will not properly cleanup. for example ipengine … & will be detached and go on, normally a scheduler will take care of this.

queue(*, me_only=True)[source]¶

Get the queue of the scheduler.

Return type:

dict[str, dict]

start_job(name, *, index=None)[source]¶

Start a job.

Return type:


class adaptive_scheduler.scheduler.PBS(cores, *, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='', cores_per_node=None)[source]¶

Bases: BaseScheduler

PBS scheduler.

job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:



  • job_script – A job script that can be submitted to PBS.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs. Currently not implemented for PBS!


Get the output filenames.

Return type:


queue(*, me_only=True)[source]¶

Get the status of all jobs in the queue.

Return type:

dict[str, dict]

static sanatize_job_id(job_id)[source]¶

Changes ‘91722.hpc05.hpc’ into ‘91722’.

Return type:


start_job(name, *, index=None)[source]¶

Writes a job script and submits it to the scheduler.

Return type:


class adaptive_scheduler.scheduler.SLURM(*, cores=None, nodes=None, cores_per_node=None, partition=None, exclusive=True, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='')[source]¶

Bases: BaseScheduler

Base object for a Scheduler.

cores, nodes, cores_per_node, extra_scheduler, executor_type, extra_script, exclusive, extra_env_vars, num_threads and partition can be either a single value or a tuple of values. If a tuple is given, then the length of the tuple should be the same as the number of learners (jobs) that are run. This allows for different resources for different jobs.

  • cores (int | tuple[int, ...] | None) – Number of cores per job (so per learner.) Either use cores or nodes and cores_per_node.

  • nodes (int | tuple[int, ...] | None) – Number of nodes per job (so per learner.) Either nodes and cores_per_node or use cores.

  • cores_per_node (int | tuple[int, ...] | None) – Number of cores per node. Either nodes and cores_per_node or use cores.

  • partition (str | tuple[str, ...] | None) – The SLURM partition to submit the job to.

  • exclusive (bool | tuple[bool, ...]) – Whether to use exclusive nodes (e.g., if SLURM it adds --exclusive as option).

  • log_folder (str | Path) – The folder in which to put the log-files.

  • mpiexec_executable (str | None) – mpiexec executable. By default mpiexec will be used (so probably from conda).

  • executor_type (Union[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], ...]]) – The executor that is used, by default concurrent.futures.ProcessPoolExecutor is used. One can use "ipyparallel", "dask-mpi", "mpi4py", "loky", "sequential", or "process-pool".

  • num_threads (int | tuple[int, ...]) – MKL_NUM_THREADS, OPENBLAS_NUM_THREADS, OMP_NUM_THREADS, and NUMEXPR_NUM_THREADS will be set to this number.

  • extra_scheduler (list[str] | tuple[list[str], ...] | None) – Extra #SLURM (depending on scheduler type) arguments, e.g. ["--exclusive=user", "--time=1"] or a tuple of lists, e.g. (["--time=10"], ["--time=20"]]) for two jobs.

  • extra_env_vars (list[str] | tuple[list[str], ...] | None) – Extra environment variables that are exported in the job script. e.g. ["TMPDIR='/scratch'", "PYTHONPATH='my_dir:$PYTHONPATH'"].

  • extra_script (str | tuple[str, ...] | None) – Extra script that will be executed after any environment variables are set, but before the main scheduler is run.

static cancel_jobs(name, *, dry=False)[source]¶

Cancel jobs with names matching the pattern ‘{name}-{i}’ where i is an integer.

  • name (str) – The base name of the jobs to cancel. Jobs with names that start with ‘{name}-’ followed by an integer will be canceled.

  • dry (bool) – If True, perform a dry run and print the job IDs that would be canceled without actually canceling them. Default is False.


RuntimeError – If there is an error while canceling the jobs.

Return type:



>>> SLURM.cancel_jobs("my_job")
# Cancels all running jobs with names like "my_job-1", "my_job-2", etc.
>>> SLURM.cancel_jobs("my_job", dry=True)
# Prints the job IDs that would be canceled without actually canceling them.
job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:



  • job_script – A job script that can be submitted to SLURM.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs.

property partitions: dict[str, int]¶

Get the partitions of the SLURM scheduler.

static queue(*, me_only=True)[source]¶

Get the queue of jobs.

Return type:

dict[str, dict[str, str]]

start_job(name, *, index=None)[source]¶

Writes a job script and submits it to the scheduler.

Return type:


adaptive_scheduler.scheduler.slurm_partitions(*, timeout=5, with_ncores=True)[source]¶

Get the available slurm partitions, raises subprocess.TimeoutExpired after timeout.

Return type:

list[str] | dict[str, int | None]