API documentation

General

Scheduler specific

Adaptive Scheduler.

class adaptive_scheduler.MultiRunManager(run_managers=None)[source]

Bases: object

A manager that can contain multiple RunManagers.

Parameters:

run_managers (list[RunManager] | None) – Initial list of RunManagers to include.

run_managers

Dictionary of managed RunManagers, keyed by their names.

add_run_manager(run_manager, *, start=False, wait_for=None)[source]

Add a new RunManager to the MultiRunManager.

Parameters:
  • run_manager (RunManager) – The RunManager to add.

  • start (bool) – Whether to start the RunManager immediately after adding it.

  • wait_for (str | None) – The name of another RunManager to wait for before starting this one. Only applicable if start is True.

Raises:
  • ValueError – If a RunManager with the same name already exists.

  • KeyError – If the specified wait_for RunManager does not exist.

Return type:

None

cancel_all()[source]

Cancel all RunManagers.

Return type:

None

display()[source]

Display the widget.

Return type:

None

info()[source]

Display info about all RunManagers in a widget with an Update All button.

Return type:

VBox

remove_run_manager(name)[source]

Remove a RunManager from the MultiRunManager.

Parameters:

name (str) – The name of the RunManager to remove.

Raises:

KeyError – If no RunManager with the given name exists.

Return type:

None

start_all()[source]

Start all RunManagers.

Return type:

None

class adaptive_scheduler.PBS(cores, *, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='', cores_per_node=None)[source]

Bases: BaseScheduler

PBS scheduler.

job_script(options, *, index=None)[source]

Get a jobscript in string form.

Return type:

str

Returns:

  • job_script – A job script that can be submitted to PBS.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs. Currently not implemented for PBS!

output_fnames(name)[source]

Get the output filenames.

Return type:

list[Path]

queue(*, me_only=True)[source]

Get the status of all jobs in the queue.

Return type:

dict[str, dict]

static sanatize_job_id(job_id)[source]

Changes ‘91722.hpc05.hpc’ into ‘91722’.

Return type:

str

start_job(name, *, index=None)[source]

Writes a job script and submits it to the scheduler.

Return type:

None

class adaptive_scheduler.RunManager(scheduler, learners, fnames, *, goal=None, check_goal_on_start=True, dependencies=None, runner_kwargs=None, url=None, save_interval=300, log_interval=300, job_name='adaptive-scheduler', job_manager_interval=60, kill_interval=60, kill_on_error='srun: error:', move_old_logs_to='old_logs', db_fname=None, overwrite_db=True, job_manager_kwargs=None, kill_manager_kwargs=None, loky_start_method='loky', cleanup_first=False, save_dataframe=False, dataframe_format='pickle', max_log_lines=500, max_fails_per_job=50, max_simultaneous_jobs=100, initializers=None, quiet=False)[source]

Bases: BaseManager

A convenience tool that starts the job, database, and kill manager.

Parameters:
  • scheduler (BaseScheduler) – A scheduler instance from adaptive_scheduler.scheduler.

  • learners (list[BaseLearner]) – List of learners corresponding to fnames.

  • fnames (list[str] | list[Path]) – List of fnames corresponding to learners.

  • goal (Callable[[BaseLearner], bool] | int | float | datetime | timedelta | None) – The goal passed to the adaptive.Runner. Note that this function will be serialized and pasted in the job_script. Can be a smart-goal that accepts Callable[[adaptive.BaseLearner], bool] | float | datetime | timedelta | None. See adaptive_scheduler.utils.smart_goal for more information.

  • check_goal_on_start (bool) – Checks whether a learner is already done. Only works if the learner is loaded.

  • dependencies (dict[int, list[int]] | None) – Dictionary of dependencies, e.g., {1: [0]} means that the learners[1] depends on the learners[0]. This means that the learners[1] will only start when the learners[0] is done.

  • runner_kwargs (dict | None) – Extra keyword argument to pass to the adaptive.Runner. Note that this dict will be serialized and pasted in the job_script.

  • url (str | None) – The url of the database manager, with the format tcp://ip_of_this_machine:allowed_port.. If None, a correct url will be chosen.

  • save_interval (float) – Time in seconds between saving of the learners.

  • log_interval (float) – Time in seconds between log entries.

  • job_name (str) – From this string the job names will be created, e.g. ["adaptive-scheduler-1", "adaptive-scheduler-2", ...].

  • job_manager_interval (float) – Time in seconds between checking and starting jobs.

  • kill_interval (float) – Check for kill_on_error string inside the log-files every kill_interval seconds.

  • kill_on_error (str | Callable[[list[str]], bool] | None) – If error is a string and is found in the log files, the job will be cancelled and restarted. If it is a callable, it is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not. Set to None if no KillManager is needed.

  • move_old_logs_to (str | Path | None) – Move logs of killed jobs to this directory. If None the logs will be deleted.

  • db_fname (str | Path | None) – Filename of the database, e.g. ‘running.json’.

  • overwrite_db (bool) – Overwrite the existing database.

  • job_manager_kwargs (dict[str, Any] | None) – Keyword arguments for the JobManager function that aren’t set in __init__ here.

  • kill_manager_kwargs (dict[str, Any] | None) – Keyword arguments for the KillManager function that aren’t set in __init__ here.

  • loky_start_method (Literal['loky', 'loky_int_main', 'spawn', 'fork', 'forkserver']) – Loky start method, by default “loky”.

  • cleanup_first (bool) – Cancel all previous jobs generated by the same RunManager and clean logfiles.

  • save_dataframe (bool) – Whether to periodically save the learner’s data as a pandas.DataFame.

  • dataframe_format (Literal['parquet', 'csv', 'hdf', 'pickle', 'feather', 'excel', 'json']) – The format in which to save the pandas.DataFame. See the type hint for the options.

  • max_log_lines (int) – The maximum number of lines to display in the log viewer widget.

  • max_fails_per_job (int) – Maximum number of times that a job can fail. This is here as a fail switch because a job might fail instantly because of a bug inside your code. The job manager will stop when n_jobs * total_number_of_jobs_failed > max_fails_per_job is true.

  • max_simultaneous_jobs (int) – Maximum number of simultaneously running jobs. By default no more than 500 jobs will be running. Keep in mind that if you do not specify a runner.goal, jobs will run forever, resulting in the jobs that were not initially started (because of this max_simultaneous_jobs condition) to not ever start.

  • initializers (list[Callable[[], None]] | None) – List of functions that are called before the job starts, can populate a cache.

  • quiet (bool) – Whether to show a progress bar when creating learner files.

job_names

List of job_names. Generated with self.job_name.

Type:

list

database_manager

The database manager.

Type:

DatabaseManager

job_manager

The job manager.

Type:

JobManager

kill_manager

The kill manager.

Type:

KillManager or None

start_time

Time at which self.start() is called.

Type:

float or None

end_time

Time at which the jobs are all done or at which self.cancel() is called.

Type:

float or None

Examples

Here is an example of using the RunManager with a modified job_script_function.

>>> import adaptive_scheduler
>>> scheduler = adaptive_scheduler.scheduler.SLURM(cores=10)
>>> run_manager = adaptive_scheduler.server_support.RunManager(
...     scheduler=scheduler
... ).start()

Or an example using ipyparallel.Client.

>>> from functools import partial
>>> import adaptive_scheduler
>>> scheduler = adaptive_scheduler.scheduler.SLURM(
...     cores=10, executor_type="ipyparallel",
... )
>>> def goal(learner):
...     return learner.npoints > 2000
>>> run_manager = adaptive_scheduler.server_support.RunManager(
...     scheduler=scheduler,
...     goal=goal,
...     log_interval=30,
...     save_interval=30,
... )
>>> run_manager.start()
cancel()[source]

Cancel the manager tasks and the jobs in the queue.

Return type:

None

cleanup(*, remove_old_logs_folder=False)[source]

Cleanup the log and batch files.

Return type:

None

elapsed_time()[source]

Total time elapsed since the RunManager was started.

Return type:

float

get_database()[source]

Get the database as a pandas.DataFrame.

Return type:

DataFrame

info(format='widget')[source]

Get run manager information in different formats.

Parameters:

format ({"text", "widget", "data"}, default "text") –

  • “text”: Returns formatted string representation

  • ”widget”: Displays and returns interactive widget

  • ”data”: Returns structured RunManagerInfo object

Return type:

VBox | str | RunManagerInfo

job_starting_times()[source]

Return the starting times of the jobs.

Return type:

list[float]

load_dataframes()[source]

Load the `pandas.DataFrame`s with the most recently saved learners data.

Return type:

DataFrame

load_learners()[source]

Load the learners in parallel using adaptive_scheduler.utils.load_parallel.

Return type:

None

parse_log_files(*, only_last=True)[source]

Parse the log-files and convert it to a DataFrame.

Parameters:

only_last (bool) – Only look use the last printed status message.

Return type:

pandas.core.frame.DataFrame

remove_existing_data(*, move_to=None, force=False)[source]

Remove the existing data files.

Parameters:
  • move_to (str | Path | None) – Move the files to this directory. If None the files will be deleted.

  • force (bool) – Remove the files even if the RunManager is already started.

Return type:

None

start(wait_for=None)[source]

Start the RunManager and optionally wait for another RunManager to finish.

Return type:

RunManager

status()[source]

Return the current status of the RunManager.

Return type:

str

task_status()[source]

Print the stack of the asyncio.Tasks.

Return type:

None

class adaptive_scheduler.SLURM(*, cores=None, nodes=None, cores_per_node=None, partition=None, memory=None, exclusive=False, python_executable=None, log_folder='', mpiexec_executable=None, executor_type='process-pool', num_threads=1, extra_scheduler=None, extra_env_vars=None, extra_script=None, batch_folder='')[source]

Bases: BaseScheduler

Base object for a Scheduler.

cores, nodes, cores_per_node, extra_scheduler, executor_type, extra_script, exclusive, extra_env_vars, num_threads, memory and partition can be either a single value or a tuple of values. If a tuple is given, then the length of the tuple should be the same as the number of learners (jobs) that are run. This allows for different resources for different jobs. The tuple elements are also allowed to be callables without arguments, which will be called when the job is submitted. These callables should return the value that is needed. See the type hints for the allowed types.

Parameters:
  • cores (int | tuple[int | None | Callable[[], int | None], ...] | None) – Number of cores per job (so per learner.) Either use cores or nodes and cores_per_node.

  • nodes (int | tuple[int | None | Callable[[], int | None], ...] | None) – Number of nodes per job (so per learner.) Either nodes and cores_per_node or use cores.

  • cores_per_node (int | tuple[int | None | Callable[[], int | None], ...] | None) – Number of cores per node. Either nodes and cores_per_node or use cores.

  • partition (str | tuple[str | None | Callable[[], str | None], ...] | None) – The SLURM partition to submit the job to.

  • memory (str | tuple[str | None | Callable[[], str | None], ...] | None) – Memory per job, e.g. "4GB" or "500MB". Adds --mem to the SBATCH options.

  • exclusive (bool | tuple[bool | Callable[[], bool], ...]) – Whether to use exclusive nodes (e.g., if SLURM it adds --exclusive as option).

  • log_folder (str | Path) – The folder in which to put the log-files.

  • mpiexec_executable (str | None) – mpiexec executable. By default mpiexec will be used (so probably from conda).

  • executor_type (Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | Callable[[], Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential']], ...]) – The executor that is used, by default concurrent.futures.ProcessPoolExecutor is used. One can use "ipyparallel", "dask-mpi", "mpi4py", "loky", "sequential", or "process-pool".

  • num_threads (int | tuple[int | Callable[[], int], ...]) – MKL_NUM_THREADS, OPENBLAS_NUM_THREADS, OMP_NUM_THREADS, and NUMEXPR_NUM_THREADS will be set to this number.

  • extra_scheduler (list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None) – Extra #SLURM (depending on scheduler type) arguments, e.g. ["--exclusive=user", "--time=1"] or a tuple of lists, e.g. (["--time=10"], ["--time=20"]]) for two jobs.

  • extra_env_vars (list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None) – Extra environment variables that are exported in the job script. e.g. ["TMPDIR='/scratch'", "PYTHONPATH='my_dir:$PYTHONPATH'"].

  • extra_script (str | tuple[str | Callable[[], str], ...] | None) – Extra script that will be executed after any environment variables are set, but before the main scheduler is run.

static cancel_jobs(name, *, dry=False)[source]

Cancel jobs with names matching the pattern ‘{name}-{i}’ where i is an integer.

Parameters:
  • name (str) – The base name of the jobs to cancel. Jobs with names that start with ‘{name}-’ followed by an integer will be canceled.

  • dry (bool) – If True, perform a dry run and print the job IDs that would be canceled without actually canceling them. Default is False.

Raises:

RuntimeError – If there is an error while canceling the jobs.

Return type:

None

Examples

>>> SLURM.cancel_jobs("my_job")
# Cancels all running jobs with names like "my_job-1", "my_job-2", etc.
>>> SLURM.cancel_jobs("my_job", dry=True)
# Prints the job IDs that would be canceled without actually canceling them.
job_script(options, *, index=None)[source]

Get a jobscript in string form.

Return type:

str

Returns:

  • job_script – A job script that can be submitted to SLURM.

  • index – The index of the job that is being run. This is used when specifying different resources for different jobs.

property partitions: dict[str, int | None][source]

Get the partitions of the SLURM scheduler.

static queue(*, me_only=True)[source]

Get the queue of jobs.

Return type:

dict[str, dict[str, str]]

start_job(name, *, index=None)[source]

Writes a job script and submits it to the scheduler.

Return type:

None

class adaptive_scheduler.SlurmExecutor(name='adaptive-scheduler', folder=None, partition=None, nodes=1, cores_per_node=1, memory=None, num_threads=1, exclusive=False, executor_type='process-pool', extra_scheduler=None, extra_env_vars=None, goal=None, check_goal_on_start=True, runner_kwargs=None, url=None, save_interval=300, log_interval=300, job_manager_interval=60, kill_interval=60, kill_on_error='srun: error:', overwrite_db=True, job_manager_kwargs=None, kill_manager_kwargs=None, loky_start_method='loky', cleanup_first=True, save_dataframe=False, dataframe_format='pickle', max_log_lines=500, max_fails_per_job=50, max_simultaneous_jobs=100, quiet=True, extra_run_manager_kwargs=None, extra_scheduler_kwargs=None, size_per_learner=None, _sequences=<factory>, _sequence_mapping=<factory>, _disk_func_mapping=<factory>, _run_manager=None, _task_mapping=<factory>)[source]

Bases: AdaptiveSchedulerExecutorBase

An executor that runs jobs on SLURM.

Similar to concurrent.futures.Executor, but for SLURM. A key difference is that submit() returns a SLURMTask instead of a Future and that finalize() must be called in order to start the jobs.

Parameters:
  • name (str) – The name of the job.

  • folder (str | Path | None) – The folder to save the adaptive_scheduler files such as logs, database, .sbatch, pickled tasks, and results files in. If the folder exists and has results, the results will be loaded!

  • partition (str | tuple[str | Callable[[], str], ...] | None) – The partition to use. If None, then the default partition will be used. (The one marked with a * in sinfo). Use adaptive_scheduler.scheduler.slurm_partitions to see the available partitions.

  • nodes (int | tuple[int | None | Callable[[], int | None], ...] | None) – The number of nodes to use.

  • cores_per_node (int | tuple[int | None | Callable[[], int | None], ...] | None) – The number of cores per node to use. If None, then all cores on the partition will be used.

  • memory (str | tuple[str | None | Callable[[], str | None], ...] | None) – Memory per job, e.g. "4GB" or "500MB". Adds --mem to the SBATCH options.

  • num_threads (int | tuple[int | Callable[[], int], ...]) – The number of threads to use.

  • exclusive (bool | tuple[bool | Callable[[], bool], ...]) – Whether to use exclusive nodes, adds "--exclusive" if True.

  • executor_type (Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | Callable[[], Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential']], ...]) – The executor that is used, by default concurrent.futures.ProcessPoolExecutor is used. One can use "ipyparallel", "dask-mpi", "mpi4py", "loky", "sequential", or "process-pool".

  • extra_scheduler (list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None) – Extra #SLURM (depending on scheduler type) arguments, e.g. ["--exclusive=user", "--time=1"] or a tuple of lists, e.g. (["--time=10"], ["--time=20"]]) for two jobs.

  • extra_env_vars (list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None) – Extra environment variables that are exported in the job script. e.g. ["TMPDIR='/scratch'", "PYTHONPATH='my_dir:$PYTHONPATH'"].

  • goal (Callable[[BaseLearner], bool] | int | float | datetime | timedelta | None) – The goal passed to the adaptive.Runner. Note that this function will be serialized and pasted in the job_script. Can be a smart-goal that accepts Callable[[adaptive.BaseLearner], bool] | float | datetime | timedelta | None. See adaptive_scheduler.utils.smart_goal for more information.

  • check_goal_on_start (bool) – Checks whether a learner is already done. Only works if the learner is loaded.

  • runner_kwargs (dict | None) – Extra keyword argument to pass to the adaptive.Runner. Note that this dict will be serialized and pasted in the job_script.

  • url (str | None) – The url of the database manager, with the format tcp://ip_of_this_machine:allowed_port.. If None, a correct url will be chosen.

  • save_interval (float) – Time in seconds between saving of the learners.

  • log_interval (float) – Time in seconds between log entries.

  • job_manager_interval (float) – Time in seconds between checking and starting jobs.

  • kill_interval (float) – Check for kill_on_error string inside the log-files every kill_interval seconds.

  • kill_on_error (str | Callable[[list[str]], bool] | None) – If error is a string and is found in the log files, the job will be cancelled and restarted. If it is a callable, it is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not. Set to None if no KillManager is needed.

  • overwrite_db (bool) – Overwrite the existing database.

  • job_manager_kwargs (dict[str, Any] | None) – Keyword arguments for the JobManager function that aren’t set in __init__ here.

  • kill_manager_kwargs (dict[str, Any] | None) – Keyword arguments for the KillManager function that aren’t set in __init__ here.

  • loky_start_method (Literal['loky', 'loky_int_main', 'spawn', 'fork', 'forkserver']) – Loky start method, by default “loky”.

  • cleanup_first (bool) – Cancel all previous jobs generated by the same RunManager and clean logfiles.

  • save_dataframe (bool) – Whether to periodically save the learner’s data as a pandas.DataFame.

  • dataframe_format (Literal['parquet', 'csv', 'hdf', 'pickle', 'feather', 'excel', 'json']) – The format in which to save the pandas.DataFame. See the type hint for the options.

  • max_log_lines (int) – The maximum number of lines to display in the log viewer widget.

  • max_fails_per_job (int) – Maximum number of times that a job can fail. This is here as a fail switch because a job might fail instantly because of a bug inside your code. The job manager will stop when n_jobs * total_number_of_jobs_failed > max_fails_per_job is true.

  • max_simultaneous_jobs (int) – Maximum number of simultaneously running jobs. By default no more than 500 jobs will be running. Keep in mind that if you do not specify a runner.goal, jobs will run forever, resulting in the jobs that were not initially started (because of this max_simultaneous_jobs condition) to not ever start.

  • quiet (bool) – Whether to show a progress bar when creating learner files.

  • extra_run_manager_kwargs (dict[str, Any] | None) – Extra keyword arguments to pass to the RunManager.

  • extra_scheduler_kwargs (dict[str, Any] | None) – Extra keyword arguments to pass to the adaptive_scheduler.scheduler.SLURM.

  • size_per_learner (int | None) – The size of each learner. If None, the whole sequence is passed to the learner.

check_goal_on_start: bool = True
cleanup()[source]
Return type:

None

cleanup_first: bool = True
cores_per_node: int | tuple[int | None | Callable[[], int | None], ...] | None = 1
dataframe_format: Literal['parquet', 'csv', 'hdf', 'pickle', 'feather', 'excel', 'json'] = 'pickle'
exclusive: bool | tuple[bool | Callable[[], bool], ...] = False
executor_type: Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | Callable[[], Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential']], ...] = 'process-pool'
extra_env_vars: list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None = None
extra_run_manager_kwargs: dict[str, Any] | None = None
extra_scheduler: list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None = None
extra_scheduler_kwargs: dict[str, Any] | None = None
finalize(*, start=True)[source]

Finalize the executor and return the RunManager.

Returns None if no learners were submitted.

Return type:

RunManager | None

folder: str | Path | None = None
goal: Callable[[BaseLearner], bool] | int | float | datetime | timedelta | None = None
job_manager_interval: float = 60
job_manager_kwargs: dict[str, Any] | None = None
kill_interval: float = 60
kill_manager_kwargs: dict[str, Any] | None = None
kill_on_error: str | Callable[[list[str]], bool] | None = 'srun: error:'
log_interval: float = 300
loky_start_method: Literal['loky', 'loky_int_main', 'spawn', 'fork', 'forkserver'] = 'loky'
max_fails_per_job: int = 50
max_log_lines: int = 500
max_simultaneous_jobs: int = 100
memory: str | tuple[str | None | Callable[[], str | None], ...] | None = None
name: str = 'adaptive-scheduler'
new(update=None)[source]

Create a new SlurmExecutor with the same parameters.

Return type:

SlurmExecutor

nodes: int | tuple[int | None | Callable[[], int | None], ...] | None = 1
num_threads: int | tuple[int | Callable[[], int], ...] = 1
overwrite_db: bool = True
partition: str | tuple[str | Callable[[], str], ...] | None = None
quiet: bool = True
runner_kwargs: dict | None = None
save_dataframe: bool = False
save_interval: float = 300
shutdown(wait=True, *, cancel_futures=False)[source]

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait (bool) – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures (bool) – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

Return type:

None

size_per_learner: int | None = None
submit(fn, /, *args, **kwargs)[source]

Submit a task to the executor.

Return type:

SlurmTask

url: str | None = None
class adaptive_scheduler.SlurmTask(executor, task_id)[source]

Bases: Future

A Future that loads the result from a SequenceLearner.

result(timeout=None)[source]

Return the result of the future if available.

Since this is an async task, this method will only return if the result is immediately available. Use await task to wait for the result.

Return type:

Any

adaptive_scheduler.slurm_run(learners, fnames, *, name='adaptive-scheduler', folder='', partition=None, nodes=1, cores_per_node=None, memory=None, num_threads=1, exclusive=False, executor_type='process-pool', extra_scheduler=None, extra_env_vars=None, goal=None, check_goal_on_start=True, dependencies=None, runner_kwargs=None, url=None, save_interval=300, log_interval=300, job_manager_interval=60, kill_interval=60, kill_on_error='srun: error:', overwrite_db=True, job_manager_kwargs=None, kill_manager_kwargs=None, loky_start_method='loky', cleanup_first=True, save_dataframe=True, dataframe_format='pickle', max_log_lines=500, max_fails_per_job=50, max_simultaneous_jobs=100, initializers=None, quiet=False, extra_run_manager_kwargs=None, extra_scheduler_kwargs=None)[source]

Run adaptive on a SLURM cluster.

cores_per_node, nodes, extra_scheduler, extra_env_vars, executor_type, exclusive, memory, num_threads and partition can be either a single value or a tuple of values. If a tuple is given, then the length of the tuple should be the same as the number of learners (jobs) that are run. This allows for different resources for different jobs. The tuple elements are also allowed to be callables without arguments, which will be called when the job is submitted. These callables should return the value that is needed. See the type hints for the allowed types.

Parameters:
  • learners (list[BaseLearner]) – A list of learners.

  • fnames (list[str] | list[Path]) – A list of filenames to save the learners.

  • name (str) – The name of the job.

  • folder (str | Path) – The folder to save the adaptive_scheduler files such as logs, database, and .sbatch files in.

  • partition (str | tuple[str | Callable[[], str], ...] | None) – The partition to use. If None, then the default partition will be used. (The one marked with a * in sinfo). Use adaptive_scheduler.scheduler.slurm_partitions to see the available partitions.

  • nodes (int | tuple[int | None | Callable[[], int | None], ...] | None) – The number of nodes to use.

  • cores_per_node (int | tuple[int | None | Callable[[], int | None], ...] | None) – The number of cores per node to use. If None, then all cores on the partition will be used.

  • memory (str | tuple[str | None | Callable[[], str | None], ...] | None) – Memory per job, e.g. "4GB" or "500MB". Adds --mem to the SBATCH options.

  • num_threads (int | tuple[int | Callable[[], int], ...]) – The number of threads to use.

  • exclusive (bool | tuple[bool | Callable[[], bool], ...]) – Whether to use exclusive nodes, adds "--exclusive" if True.

  • executor_type (Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | tuple[Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential'] | Callable[[], Literal['mpi4py', 'ipyparallel', 'dask-mpi', 'process-pool', 'loky', 'sequential']], ...]) – The executor that is used, by default concurrent.futures.ProcessPoolExecutor is used. One can use "ipyparallel", "dask-mpi", "mpi4py", "loky", "sequential", or "process-pool".

  • extra_scheduler (list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None) – Extra #SLURM (depending on scheduler type) arguments, e.g. ["--exclusive=user", "--time=1"] or a tuple of lists, e.g. (["--time=10"], ["--time=20"]]) for two jobs.

  • extra_env_vars (list[str] | tuple[list[str] | Callable[[], list[str]], ...] | None) – Extra environment variables that are exported in the job script. e.g. ["TMPDIR='/scratch'", "PYTHONPATH='my_dir:$PYTHONPATH'"].

  • goal (Callable[[BaseLearner], bool] | int | float | datetime | timedelta | None) – The goal passed to the adaptive.Runner. Note that this function will be serialized and pasted in the job_script. Can be a smart-goal that accepts Callable[[adaptive.BaseLearner], bool] | float | datetime | timedelta | None. See adaptive_scheduler.utils.smart_goal for more information.

  • check_goal_on_start (bool) – Checks whether a learner is already done. Only works if the learner is loaded.

  • dependencies (dict[int, list[int]] | None) – Dictionary of dependencies, e.g., {1: [0]} means that the learners[1] depends on the learners[0]. This means that the learners[1] will only start when the learners[0] is done.

  • runner_kwargs (dict | None) – Extra keyword argument to pass to the adaptive.Runner. Note that this dict will be serialized and pasted in the job_script.

  • url (str | None) – The url of the database manager, with the format tcp://ip_of_this_machine:allowed_port.. If None, a correct url will be chosen.

  • save_interval (float) – Time in seconds between saving of the learners.

  • log_interval (float) – Time in seconds between log entries.

  • job_manager_interval (float) – Time in seconds between checking and starting jobs.

  • kill_interval (float) – Check for kill_on_error string inside the log-files every kill_interval seconds.

  • kill_on_error (str | Callable[[list[str]], bool] | None) – If error is a string and is found in the log files, the job will be cancelled and restarted. If it is a callable, it is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not. Set to None if no KillManager is needed.

  • overwrite_db (bool) – Overwrite the existing database.

  • job_manager_kwargs (dict[str, Any] | None) – Keyword arguments for the JobManager function that aren’t set in __init__ here.

  • kill_manager_kwargs (dict[str, Any] | None) – Keyword arguments for the KillManager function that aren’t set in __init__ here.

  • loky_start_method (Literal['loky', 'loky_int_main', 'spawn', 'fork', 'forkserver']) – Loky start method, by default “loky”.

  • cleanup_first (bool) – Cancel all previous jobs generated by the same RunManager and clean logfiles.

  • save_dataframe (bool) – Whether to periodically save the learner’s data as a pandas.DataFame.

  • dataframe_format (Literal['parquet', 'csv', 'hdf', 'pickle', 'feather', 'excel', 'json']) – The format in which to save the pandas.DataFame. See the type hint for the options.

  • max_log_lines (int) – The maximum number of lines to display in the log viewer widget.

  • max_fails_per_job (int) – Maximum number of times that a job can fail. This is here as a fail switch because a job might fail instantly because of a bug inside your code. The job manager will stop when n_jobs * total_number_of_jobs_failed > max_fails_per_job is true.

  • max_simultaneous_jobs (int) – Maximum number of simultaneously running jobs. By default no more than 500 jobs will be running. Keep in mind that if you do not specify a runner.goal, jobs will run forever, resulting in the jobs that were not initially started (because of this max_simultaneous_jobs condition) to not ever start.

  • initializers (list[Callable[[], None]] | None) – List of functions that are called before the job starts, can populate a cache.

  • quiet (bool) – Whether to show a progress bar when creating learner files.

  • extra_run_manager_kwargs (dict[str, Any] | None) – Extra keyword arguments to pass to the RunManager.

  • extra_scheduler_kwargs (dict[str, Any] | None) – Extra keyword arguments to pass to the adaptive_scheduler.scheduler.SLURM.

Return type:

RunManager

adaptive_scheduler.start_one_by_one(*run_managers, goal=None, interval=120)[source]

Start a list of RunManagers after each other.

Parameters:
  • run_managers (RunManager) – A list of RunManagers.

  • goal (Callable[[RunManager], bool] | None) – A callable that takes a RunManager as argument and returns a boolean. If goal is not None, the RunManagers will be started after goal returns True for the previous RunManager. If goal is None, the RunManagers will be started after the previous RunManager has finished.

  • interval (float) – The interval at which to check if goal is True. Only used if goal is not None.

Returns:

The first element is the grouped task that starts all RunManagers. The second element is a list of tasks that start each RunManager.

Return type:

tuple[asyncio.Future, list[asyncio.Future]]

Examples

>>> manager_1 = adaptive_scheduler.slurm_run(
...     learners[:5],
...     fnames[:5],
...     partition="hb120rsv2-low",
...     goal=0.01,
...     name="first",
... )
>>> manager_1.start()
>>> manager_2 = adaptive_scheduler.slurm_run(
...     learners[5:],
...     fnames[5:],
...     partition="hb120rsv2-low",
...     goal=0.01,
...     name="second",
... )
>>> # Start second when the first RunManager has more than 1000 points.
>>> def start_goal(run_manager):
...     df = run_manager.parse_log_files()
...     npoints = df.get("npoints")
...     if npoints is None:
...         return False
...     return npoints.sum() > 1000
>>> tasks = adaptive_scheduler.start_one_by_one(
...     manager_1,
...     manager_2,
...     goal=start_goal,
... )