adaptive_scheduler.server_support module¶
Imports for the server_support module.
- class adaptive_scheduler.server_support.BaseManager[source]¶
Bases:
object
- class adaptive_scheduler.server_support.DatabaseManager(url, scheduler, db_fname, learners, fnames, *, overwrite_db=True, initializers=None)[source]¶
Bases:
BaseManager
Database manager.
- Parameters:
url (
str
) – The url of the database manager, with the formattcp://ip_of_this_machine:allowed_port.
. Useget_allowed_url
to get a url that will work.scheduler (
BaseScheduler
) – A scheduler instance fromadaptive_scheduler.scheduler
.db_fname (
str
|Path
) – Filename of the database, e.g. ‘running.json’.learners (
list
[BaseLearner
]) – List of learners corresponding to fnames.fnames (
Union
[List
[str
],List
[Path
],List
[List
[str
]],List
[List
[Path
]]]) – List of fnames corresponding to learners.overwrite_db (
bool
) – Overwrite the existing database upon starting.initializers (
list
[Callable
[[],None
]] |None
) – List of functions that are called before the job starts, can populate a cache.
- as_df()[source]¶
Return the database as a
pandas.DataFrame
.- Return type:
- class adaptive_scheduler.server_support.JobManager(job_names, database_manager, scheduler, interval=30, *, max_simultaneous_jobs=100, max_fails_per_job=50, save_dataframe=True, dataframe_format='pickle', loky_start_method='loky', log_interval=60, save_interval=300, runner_kwargs=None, goal=None)[source]¶
Bases:
BaseManager
Job manager.
- Parameters:
job_names (
list
[str
]) – List of unique names used for the jobs with the same length as learners. Note that a job name does not correspond to a certain specific learner.database_manager (
DatabaseManager
) – ADatabaseManager
instance.scheduler (
BaseScheduler
) – A scheduler instance fromadaptive_scheduler.scheduler
.interval (
float
) – Time in seconds between checking and starting jobs.max_simultaneous_jobs (
int
) – Maximum number of simultaneously running jobs. By default no more than 500 jobs will be running. Keep in mind that if you do not specify arunner.goal
, jobs will run forever, resulting in the jobs that were not initially started (because of this max_simultaneous_jobs condition) to not ever start.max_fails_per_job (
int
) – Maximum number of times that a job can fail. This is here as a fail switch because a job might fail instantly because of a bug inside your code. The job manager will stop whenn_jobs * total_number_of_jobs_failed > max_fails_per_job
is true.save_dataframe (
bool
) – Whether to periodically save the learner’s data as a pandas.DataFame.dataframe_format (
Literal
['parquet'
,'csv'
,'hdf'
,'pickle'
,'feather'
,'excel'
,'json'
]) – The format in which to save the pandas.DataFame. See the type hint for the options.loky_start_method (
Literal
['loky'
,'loky_int_main'
,'spawn'
,'fork'
,'forkserver'
]) – Loky start method, by default “loky”.log_interval (
float
) – Time in seconds between log entries.save_interval (
float
) – Time in seconds between saving of the learners.runner_kwargs (
dict
[str
,Any
] |None
) – Extra keyword argument to pass to theadaptive.Runner
. Note that this dict will be serialized and pasted in thejob_script
.goal (
Union
[Callable
[[BaseLearner
],bool
],int
,float
,datetime
,timedelta
,None
]) – The goal passed to theadaptive.Runner
. Note that this function will be serialized and pasted in thejob_script
. Can be a smart-goal that acceptsCallable[[adaptive.BaseLearner], bool] | float | datetime | timedelta | None
. Seeadaptive_scheduler.utils.smart_goal
for more information.
- n_started¶
Total number of jobs started by the
JobManager
.- Type:
- class adaptive_scheduler.server_support.KillManager(scheduler, database_manager, *, error='srun: error:', interval=600, max_cancel_tries=5, move_to=None)[source]¶
Bases:
BaseManager
Kill manager.
Automatically cancel jobs that contain an error (or other condition) in the log files.
- Parameters:
scheduler (
BaseScheduler
) – A scheduler instance fromadaptive_scheduler.scheduler
.database_manager (
DatabaseManager
) – ADatabaseManager
instance.error (
Union
[str
,Callable
[[list
[str
]],bool
]]) – Iferror
is a string and is found in the log files, the job will be cancelled and restarted. If it is a callable, it is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not.interval (
float
) – Time in seconds between checking for the condition.max_cancel_tries (
int
) – Try maximum max_cancel_tries times to cancel a job.move_to (
str
|Path
|None
) – If a job is cancelled the log is either removed (ifmove_to=None
) or moved to a folder (e.g. ifmove_to='old_logs'
).
- exception adaptive_scheduler.server_support.MaxRestartsReachedError[source]¶
Bases:
Exception
Max restarts reached.
Jobs can fail instantly because of an error in your Python code which results jobs being started indefinitely.
- class adaptive_scheduler.server_support.RunManager(scheduler, learners, fnames, *, goal=None, check_goal_on_start=True, runner_kwargs=None, url=None, save_interval=300, log_interval=300, job_name='adaptive-scheduler', job_manager_interval=60, kill_interval=60, kill_on_error='srun: error:', move_old_logs_to='old_logs', db_fname=None, overwrite_db=True, job_manager_kwargs=None, kill_manager_kwargs=None, loky_start_method='loky', cleanup_first=False, save_dataframe=False, dataframe_format='pickle', max_log_lines=500, max_fails_per_job=50, max_simultaneous_jobs=100, initializers=None)[source]¶
Bases:
BaseManager
A convenience tool that starts the job, database, and kill manager.
- Parameters:
scheduler (
BaseScheduler
) – A scheduler instance fromadaptive_scheduler.scheduler
.learners (
list
[BaseLearner
]) – List of learners corresponding to fnames.fnames (
list
[str
] |list
[Path
]) – List of fnames corresponding to learners.goal (
Union
[Callable
[[BaseLearner
],bool
],int
,float
,datetime
,timedelta
,None
]) – The goal passed to theadaptive.Runner
. Note that this function will be serialized and pasted in thejob_script
. Can be a smart-goal that acceptsCallable[[adaptive.BaseLearner], bool] | float | datetime | timedelta | None
. Seeadaptive_scheduler.utils.smart_goal
for more information.initializers (
list
[Callable
[[],None
]] |None
) – List of functions that are called before the job starts, can populate a cache.check_goal_on_start (
bool
) – Checks whether a learner is already done. Only works if the learner is loaded.runner_kwargs (
dict
|None
) – Extra keyword argument to pass to theadaptive.Runner
. Note that this dict will be serialized and pasted in thejob_script
.url (
str
|None
) – The url of the database manager, with the formattcp://ip_of_this_machine:allowed_port.
. If None, a correct url will be chosen.save_interval (
float
) – Time in seconds between saving of the learners.log_interval (
float
) – Time in seconds between log entries.job_name (
str
) – From this string the job names will be created, e.g.["adaptive-scheduler-1", "adaptive-scheduler-2", ...]
.job_manager_interval (
float
) – Time in seconds between checking and starting jobs.kill_interval (
float
) – Check for kill_on_error string inside the log-files every kill_interval seconds.kill_on_error (
Union
[str
,Callable
[[list
[str
]],bool
],None
]) – Iferror
is a string and is found in the log files, the job will be cancelled and restarted. If it is a callable, it is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not. Set to None if noKillManager
is needed.move_old_logs_to (
str
|Path
|None
) – Move logs of killed jobs to this directory. If None the logs will be deleted.db_fname (
str
|Path
|None
) – Filename of the database, e.g. ‘running.json’.overwrite_db (
bool
) – Overwrite the existing database.job_manager_kwargs (
dict
[str
,Any
] |None
) – Keyword arguments for theJobManager
function that aren’t set in__init__
here.kill_manager_kwargs (
dict
[str
,Any
] |None
) – Keyword arguments for theKillManager
function that aren’t set in__init__
here.loky_start_method (
Literal
['loky'
,'loky_int_main'
,'spawn'
,'fork'
,'forkserver'
]) – Loky start method, by default “loky”.cleanup_first (
bool
) – Cancel all previous jobs generated by the same RunManager and clean logfiles.save_dataframe (
bool
) – Whether to periodically save the learner’s data as a pandas.DataFame.dataframe_format (
Literal
['parquet'
,'csv'
,'hdf'
,'pickle'
,'feather'
,'excel'
,'json'
]) – The format in which to save the pandas.DataFame. See the type hint for the options.max_log_lines (
int
) – The maximum number of lines to display in the log viewer widget.
- database_manager¶
The database manager.
- Type:
- job_manager¶
The job manager.
- Type:
- kill_manager¶
The kill manager.
- Type:
KillManager
or None
- end_time¶
Time at which the jobs are all done or at which
self.cancel()
is called.- Type:
float or None
Examples
Here is an example of using the
RunManager
with a modifiedjob_script_function
.>>> import adaptive_scheduler >>> scheduler = adaptive_scheduler.scheduler.DefaultScheduler(cores=10) >>> run_manager = adaptive_scheduler.server_support.RunManager( ... scheduler=scheduler ... ).start()
Or an example using
ipyparallel.Client
.>>> from functools import partial >>> import adaptive_scheduler >>> scheduler = adaptive_scheduler.scheduler.DefaultScheduler( ... cores=10, executor_type="ipyparallel", ... ) >>> def goal(learner): ... return learner.npoints > 2000 >>> run_manager = adaptive_scheduler.server_support.RunManager( ... scheduler=scheduler, ... goal=goal, ... log_interval=30, ... save_interval=30, ... ) >>> run_manager.start()
- elapsed_time()[source]¶
Total time elapsed since the
RunManager
was started.- Return type:
- get_database()[source]¶
Get the database as a
pandas.DataFrame
.- Return type:
- load_dataframes()[source]¶
Load the `pandas.DataFrame`s with the most recently saved learners data.
- Return type:
- load_learners()[source]¶
Load the learners in parallel using
adaptive_scheduler.utils.load_parallel
.- Return type:
- parse_log_files(*, only_last=True)[source]¶
Parse the log-files and convert it to a
DataFrame
.- Parameters:
only_last (
bool
) – Only look use the last printed status message.- Return type:
- start(wait_for=None)[source]¶
Start the RunManager and optionally wait for another RunManager to finish.
- Return type:
- status()[source]¶
Return the current status of the
RunManager
.- Return type:
- task_status()[source]¶
Print the stack of the
asyncio.Task
s.- Return type:
- adaptive_scheduler.server_support.cleanup_scheduler_files(job_names, scheduler, *, with_progress_bar=True, move_to=None)[source]¶
Cleanup the scheduler log-files files.
- Parameters:
scheduler (
BaseScheduler
) – A scheduler instance fromadaptive_scheduler.scheduler
.with_progress_bar (
bool
) – Display a progress bar usingtqdm
.move_to (
str
|Path
|None
) – Move the file to a different directory. If None the file is removed.log_file_folder – The folder in which to delete the log-files.
- Return type:
- adaptive_scheduler.server_support.get_allowed_url()[source]¶
Get an allowed url for the database manager.
- Returns:
An url that can be used for the database manager, with the format
tcp://ip_of_this_machine:allowed_port.
.- Return type:
url
- adaptive_scheduler.server_support.logs_with_string_or_condition(error, database_manager)[source]¶
Get jobs that have
string
(or apply a callable) inside their log-file.Either use
string
or error.- Parameters:
error (
Union
[str
,Callable
[[list
[str
]],bool
]]) – String that is searched for or callable that is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not.database_manager (
DatabaseManager
) – ADatabaseManager
instance.
- Returns:
A list
(job_name, fnames)
, which have the string inside their log-file.- Return type:
has_string
- adaptive_scheduler.server_support.parse_log_files(database_manager, scheduler, *, only_last=True)[source]¶
Parse the log-files and convert it to a
DataFrame
.- Parameters:
job_names – List of job names.
database_manager (
DatabaseManager
) – ADatabaseManager
instance.scheduler (
BaseScheduler
) – A scheduler instance fromadaptive_scheduler.scheduler
.only_last (
bool
) – Only look use the last printed status message.
- Return type:
- adaptive_scheduler.server_support.periodically_clean_ipython_profiles(scheduler, interval=600)[source]¶
Periodically remove old IPython profiles.
In the
RunManager.cleanup
method the profiles will be removed. However, one might want to remove them earlier.- Parameters:
scheduler (
BaseScheduler
) – A scheduler instance fromadaptive_scheduler.scheduler
.interval (
float
) – The interval at which to remove old profiles.
- Return type:
- adaptive_scheduler.server_support.slurm_run(learners, fnames, *, partition=None, nodes=1, cores_per_node=None, goal=None, folder='', name='adaptive', num_threads=1, save_interval=300, log_interval=300, cleanup_first=True, save_dataframe=True, dataframe_format='pickle', max_fails_per_job=50, max_simultaneous_jobs=100, exclusive=True, executor_type='process-pool', extra_scheduler=None, extra_run_manager_kwargs=None, extra_scheduler_kwargs=None, initializers=None)[source]¶
Run adaptive on a SLURM cluster.
cores
,nodes
,cores_per_node
,extra_scheduler
,executor_type
,extra_script
,exclusive
,extra_env_vars
,num_threads
andpartition
can be either a single value or a tuple of values. If a tuple is given, then the length of the tuple should be the same as the number of learners (jobs) that are run. This allows for different resources for different jobs.- Parameters:
learners (
list
[BaseLearner
]) – A list of learners.fnames (
list
[str
] |list
[Path
]) – A list of filenames to save the learners.partition (
str
|tuple
[str
,...
] |None
) – The partition to use. If None, then the default partition will be used. (The one marked with a * in sinfo). Useadaptive_scheduler.scheduler.slurm_partitions
to see the available partitions.cores_per_node (
int
|tuple
[int
,...
] |None
) – The number of cores per node to use. If None, then all cores on the partition will be used.goal (
Union
[Callable
[[BaseLearner
],bool
],int
,float
,datetime
,timedelta
,None
]) – The goal of the adaptive run. If None, then the run will continue indefinitely.folder (
str
|Path
) – The folder to save the adaptive_scheduler files such as logs, database, and.sbatch
files in.name (
str
) – The name of the job.num_threads (
int
|tuple
[int
,...
]) – The number of threads to use.save_interval (
float
) – The interval at which to save the learners.log_interval (
float
) – The interval at which to log the status of the run.cleanup_first (
bool
) – Whether to clean up the folder before starting the run.save_dataframe (
bool
) – Whether to save the `pandas.DataFrame`s with the learners data.dataframe_format (
Literal
['parquet'
,'csv'
,'hdf'
,'pickle'
,'feather'
,'excel'
,'json'
]) – The format to save the pandas.DataFrame`s in. See `adaptive_scheduler.utils.save_dataframes for more information.max_fails_per_job (
int
) – The maximum number of times a job can fail before it is cancelled.max_simultaneous_jobs (
int
) – The maximum number of simultaneous jobs.executor_type (
Union
[Literal
['mpi4py'
,'ipyparallel'
,'dask-mpi'
,'process-pool'
,'loky'
,'sequential'
],tuple
[Literal
['mpi4py'
,'ipyparallel'
,'dask-mpi'
,'process-pool'
,'loky'
,'sequential'
],...
]]) – The executor that is used, by defaultconcurrent.futures.ProcessPoolExecutor
is used. One can use"ipyparallel"
,"dask-mpi"
,"mpi4py"
,"loky"
,"sequential"
, or"process-pool"
.exclusive (
bool
|tuple
[bool
,...
]) – Whether to use exclusive nodes, adds"--exclusive"
if True.extra_scheduler (
list
[str
] |tuple
[list
[str
],...
] |None
) – Extra#SLURM
(depending on scheduler type) arguments, e.g.["--exclusive=user", "--time=1"]
or a tuple of lists, e.g.(["--time=10"], ["--time=20"]])
for two jobs.extra_run_manager_kwargs (
dict
[str
,Any
] |None
) – Extra keyword arguments to pass to theRunManager
.extra_scheduler_kwargs (
dict
[str
,Any
] |None
) – Extra keyword arguments to pass to theadaptive_scheduler.scheduler.SLURM
.initializers (
list
[Callable
[[],None
]] |None
) – List of functions that are called before the job starts, can populate a cache.
- Return type:
- adaptive_scheduler.server_support.start_one_by_one(*run_managers, goal=None, interval=120)[source]¶
Start a list of RunManagers after each other.
- Parameters:
run_managers (
RunManager
) – A list of RunManagers.goal (
Optional
[Callable
[[RunManager
],bool
]]) – A callable that takes a RunManager as argument and returns a boolean. If goal is not None, the RunManagers will be started after goal returns True for the previous RunManager. If goal is None, the RunManagers will be started after the previous RunManager has finished.interval (
float
) – The interval at which to check if goal is True. Only used if goal is not None.
- Returns:
The first element is the grouped task that starts all RunManagers. The second element is a list of tasks that start each RunManager.
- Return type:
Examples
>>> manager_1 = adaptive_scheduler.slurm_run( ... learners[:5], ... fnames[:5], ... partition="hb120rsv2-low", ... goal=0.01, ... name="first", ... ) >>> manager_1.start() >>> manager_2 = adaptive_scheduler.slurm_run( ... learners[5:], ... fnames[5:], ... partition="hb120rsv2-low", ... goal=0.01, ... name="second", ... ) >>> # Start second when the first RunManager has more than 1000 points. >>> def start_goal(run_manager): ... df = run_manager.parse_log_files() ... npoints = df.get("npoints") ... if npoints is None: ... return False ... return npoints.sum() > 1000 >>> tasks = adaptive_scheduler.start_one_by_one( ... manager_1, ... manager_2, ... goal=start_goal, ... )