adaptive_scheduler.scheduler module¶

Scheduler classes for Adaptive Scheduler.

class adaptive_scheduler.scheduler.BaseScheduler(cores, *, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='')[source]¶

Bases: ABC

Base object for a Scheduler.

Parameters:
  • cores (int | tuple[int, ...]) – Number of cores per job (so per learner.)

  • python_executable (str | None) – The Python executable that should run adaptive-scheduler. By default it uses the same Python as where this function is called.

  • log_folder (str | Path) – The folder in which to put the log-files.

  • mpiexec_executable (str | None) – mpiexec executable. By default mpiexec will be used (so probably from conda).

  • executor_type (Union[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], ...]]) – The executor that is used, by default concurrent.futures.ProcessPoolExecutor is used. One can use "ipyparallel", "dask-mpi", "mpi4py", "loky", "sequential", or "process-pool".

  • num_threads (int | tuple[int, ...]) – MKL_NUM_THREADS, OPENBLAS_NUM_THREADS, OMP_NUM_THREADS, and NUMEXPR_NUM_THREADS will be set to this number.

  • extra_scheduler (list[str] | tuple[list[str], ...] | None) – Extra #SLURM (depending on scheduler type) arguments, e.g. ["--exclusive=user", "--time=1"].

  • extra_env_vars (list[str] | tuple[list[str], ...] | None) – Extra environment variables that are exported in the job script. e.g. ["TMPDIR='/scratch'", "PYTHONPATH='my_dir:$PYTHONPATH'"].

  • extra_script (str | tuple[str, ...] | None) – Extra script that will be executed after any environment variables are set, but before the main scheduler is run.

  • batch_folder (str | Path) – The folder in which to put the batch files.

Return type:

BaseScheduler object.

batch_fname(name)[source]¶

The filename of the job script.

Return type:

Path

cancel(job_names, *, with_progress_bar=True, max_tries=5)[source]¶

Cancel all jobs in job_names.

Parameters:
  • job_names (list[str]) – List of job names.

  • with_progress_bar (bool) – Display a progress bar using tqdm.

  • max_tries (int) – Maximum number of attempts to cancel a job.

Return type:

None

property ext: str¶

The extension of the job script.

extra_env_vars(*, index=None)[source]¶

Environment variables that need to exist in the job script.

Return type:

str

extra_scheduler(*, index=None)[source]¶

Scheduler options that go in the job script.

Return type:

str

extra_script(*, index=None)[source]¶

Script that will be run before the main scheduler.

Return type:

str

job_names_to_job_ids(*job_names)[source]¶

Get the job_ids from the job_names in the queue.

Return type:

list[str]

abstract job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:

str

Returns:

  • job_script – A job script that can be submitted to the scheduler.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs.

property launcher: Path¶
log_fname(name)[source]¶

The filename of the log (with JOB_ID_VARIABLE).

Return type:

Path

output_fnames(name)[source]¶

Scheduler output file names (with JOB_ID_VARIABLE).

Return type:

list[Path]

abstract queue(*, me_only=True)[source]¶

Get the current running and pending jobs.

Parameters:

me_only (bool) – Only see your jobs.

Returns:

Mapping of job_id -> dict with name and state, for example {job_id: {"job_name": "TEST_JOB-1", "state": "R" or "Q"}}.

Return type:

queue

Notes

This function might return extra information about the job, however this is not used elsewhere in this package.

queue_df()[source]¶

Get the current running and pending jobs as a pandas.DataFrame.

Return type:

DataFrame

static sanatize_job_id(job_id)[source]¶

Sanatize the job_id.

Return type:

str

property single_job_script: bool¶
start_job(name, *, index=None)[source]¶

Writes a job script and submits it to the scheduler.

Return type:

None

property submit_cmd: str¶

Command to start a job, e.g. qsub fname.batch or sbatch fname.sbatch.

write_job_script(name, options, index=None)[source]¶

Writes a job script.

Return type:

None

adaptive_scheduler.scheduler.DefaultScheduler¶

alias of SLURM

class adaptive_scheduler.scheduler.LocalMockScheduler(cores, *, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='', mock_scheduler_kwargs=None)[source]¶

Bases: BaseScheduler

A scheduler that can be used for testing and runs locally.

extra_scheduler(*, index=None)[source]¶

Get the extra scheduler options.

Return type:

str

job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:

str

Returns:

  • job_script – A job script that can be submitted to PBS.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs.

Notes

Currently, there is a problem that this will not properly cleanup. for example ipengine … & will be detached and go on, normally a scheduler will take care of this.

queue(*, me_only=True)[source]¶

Get the queue of the scheduler.

Return type:

dict[str, dict]

start_job(name, *, index=None)[source]¶

Start a job.

Return type:

None

class adaptive_scheduler.scheduler.PBS(cores, *, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='', cores_per_node=None)[source]¶

Bases: BaseScheduler

PBS scheduler.

job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:

str

Returns:

  • job_script – A job script that can be submitted to PBS.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs. Currently not implemented for PBS!

output_fnames(name)[source]¶

Get the output filenames.

Return type:

list[Path]

queue(*, me_only=True)[source]¶

Get the status of all jobs in the queue.

Return type:

dict[str, dict]

static sanatize_job_id(job_id)[source]¶

Changes ‘91722.hpc05.hpc’ into ‘91722’.

Return type:

str

start_job(name, *, index=None)[source]¶

Writes a job script and submits it to the scheduler.

Return type:

None

class adaptive_scheduler.scheduler.SLURM(*, cores=None, nodes=None, cores_per_node=None, partition=None, exclusive=True, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='')[source]¶

Bases: BaseScheduler

Base object for a Scheduler.

cores, nodes, cores_per_node, extra_scheduler, executor_type, extra_script, exclusive, extra_env_vars, num_threads and partition can be either a single value or a tuple of values. If a tuple is given, then the length of the tuple should be the same as the number of learners (jobs) that are run. This allows for different resources for different jobs.

Parameters:
  • cores (int | tuple[int, ...] | None) – Number of cores per job (so per learner.) Either use cores or nodes and cores_per_node.

  • nodes (int | tuple[int, ...] | None) – Number of nodes per job (so per learner.) Either nodes and cores_per_node or use cores.

  • cores_per_node (int | tuple[int, ...] | None) – Number of cores per node. Either nodes and cores_per_node or use cores.

  • partition (str | tuple[str, ...] | None) – The SLURM partition to submit the job to.

  • exclusive (bool | tuple[bool, ...]) – Whether to use exclusive nodes (e.g., if SLURM it adds --exclusive as option).

  • log_folder (str | Path) – The folder in which to put the log-files.

  • mpiexec_executable (str | None) – mpiexec executable. By default mpiexec will be used (so probably from conda).

  • executor_type (Union[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'], ...]]) – The executor that is used, by default concurrent.futures.ProcessPoolExecutor is used. One can use "ipyparallel", "dask-mpi", "mpi4py", "loky", "sequential", or "process-pool".

  • num_threads (int | tuple[int, ...]) – MKL_NUM_THREADS, OPENBLAS_NUM_THREADS, OMP_NUM_THREADS, and NUMEXPR_NUM_THREADS will be set to this number.

  • extra_scheduler (list[str] | tuple[list[str], ...] | None) – Extra #SLURM (depending on scheduler type) arguments, e.g. ["--exclusive=user", "--time=1"] or a tuple of lists, e.g. (["--time=10"], ["--time=20"]]) for two jobs.

  • extra_env_vars (list[str] | tuple[list[str], ...] | None) – Extra environment variables that are exported in the job script. e.g. ["TMPDIR='/scratch'", "PYTHONPATH='my_dir:$PYTHONPATH'"].

  • extra_script (str | tuple[str, ...] | None) – Extra script that will be executed after any environment variables are set, but before the main scheduler is run.

static cancel_jobs(name, *, dry=False)[source]¶

Cancel jobs with names matching the pattern ‘{name}-{i}’ where i is an integer.

Parameters:
  • name (str) – The base name of the jobs to cancel. Jobs with names that start with ‘{name}-’ followed by an integer will be canceled.

  • dry (bool) – If True, perform a dry run and print the job IDs that would be canceled without actually canceling them. Default is False.

Raises:

RuntimeError – If there is an error while canceling the jobs.

Return type:

None

Examples

>>> SLURM.cancel_jobs("my_job")
# Cancels all running jobs with names like "my_job-1", "my_job-2", etc.
>>> SLURM.cancel_jobs("my_job", dry=True)
# Prints the job IDs that would be canceled without actually canceling them.
job_script(options, *, index=None)[source]¶

Get a jobscript in string form.

Return type:

str

Returns:

  • job_script – A job script that can be submitted to SLURM.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs.

property partitions: dict[str, int]¶

Get the partitions of the SLURM scheduler.

static queue(*, me_only=True)[source]¶

Get the queue of jobs.

Return type:

dict[str, dict[str, str]]

start_job(name, *, index=None)[source]¶

Writes a job script and submits it to the scheduler.

Return type:

None

adaptive_scheduler.scheduler.slurm_partitions(*, timeout=5, with_ncores=True)[source]¶

Get the available slurm partitions, raises subprocess.TimeoutExpired after timeout.

Return type:

list[str] | dict[str, int | None]