from __future__ import annotations
import asyncio
import shutil
import time
import warnings
import weakref
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable
import pandas as pd
from adaptive_scheduler.utils import (
LOKY_START_METHODS,
GoalTypes,
_at_least_adaptive_version,
_remove_or_move_files,
_time_between,
fname_to_learner_fname,
load_dataframes,
load_parallel,
sleep_unless_task_is_done,
smart_goal,
)
from adaptive_scheduler.widgets import info
from .base_manager import BaseManager
from .common import (
_delete_old_ipython_profiles,
_maybe_path,
cleanup_scheduler_files,
console,
get_allowed_url,
)
from .database_manager import DatabaseManager
from .job_manager import JobManager
from .kill_manager import KillManager
from .parse_logs import parse_log_files
if TYPE_CHECKING:
import adaptive
from adaptive_scheduler.scheduler import BaseScheduler
from adaptive_scheduler.utils import _DATAFRAME_FORMATS
# This is a global list of all active run managers.
ACTIVE_RUN_MANAGERS: list[weakref.ReferenceType[RunManager]] = []
[docs]
class RunManager(BaseManager):
"""A convenience tool that starts the job, database, and kill manager.
Parameters
----------
scheduler
A scheduler instance from `adaptive_scheduler.scheduler`.
learners
List of `learners` corresponding to `fnames`.
fnames
List of `fnames` corresponding to `learners`.
goal
The goal passed to the `adaptive.Runner`. Note that this function will
be serialized and pasted in the ``job_script``. Can be a smart-goal
that accepts
``Callable[[adaptive.BaseLearner], bool] | float | datetime | timedelta | None``.
See `adaptive_scheduler.utils.smart_goal` for more information.
initializers
List of functions that are called before the job starts, can populate
a cache.
check_goal_on_start
Checks whether a learner is already done. Only works if the learner is loaded.
runner_kwargs
Extra keyword argument to pass to the `adaptive.Runner`. Note that this dict
will be serialized and pasted in the ``job_script``.
url
The url of the database manager, with the format
``tcp://ip_of_this_machine:allowed_port.``. If None, a correct url will be chosen.
save_interval
Time in seconds between saving of the learners.
log_interval
Time in seconds between log entries.
job_name
From this string the job names will be created, e.g.
``["adaptive-scheduler-1", "adaptive-scheduler-2", ...]``.
job_manager_interval
Time in seconds between checking and starting jobs.
kill_interval
Check for `kill_on_error` string inside the log-files every `kill_interval` seconds.
kill_on_error
If ``error`` is a string and is found in the log files, the job will
be cancelled and restarted. If it is a callable, it is applied
to the log text. Must take a single argument, a list of
strings, and return True if the job has to be killed, or
False if not. Set to None if no `KillManager` is needed.
move_old_logs_to
Move logs of killed jobs to this directory. If None the logs will be deleted.
db_fname
Filename of the database, e.g. 'running.json'.
overwrite_db
Overwrite the existing database.
job_manager_kwargs
Keyword arguments for the `JobManager` function that aren't set in ``__init__`` here.
kill_manager_kwargs
Keyword arguments for the `KillManager` function that aren't set in ``__init__`` here.
loky_start_method
Loky start method, by default "loky".
cleanup_first
Cancel all previous jobs generated by the same RunManager and clean logfiles.
save_dataframe
Whether to periodically save the learner's data as a `pandas.DataFame`.
dataframe_format
The format in which to save the `pandas.DataFame`. See the type hint for the options.
max_log_lines
The maximum number of lines to display in the log viewer widget.
Attributes
----------
job_names : list
List of job_names. Generated with ``self.job_name``.
database_manager : `DatabaseManager`
The database manager.
job_manager : `JobManager`
The job manager.
kill_manager : `KillManager` or None
The kill manager.
start_time : float or None
Time at which ``self.start()`` is called.
end_time : float or None
Time at which the jobs are all done or at which ``self.cancel()`` is called.
Examples
--------
Here is an example of using the `RunManager` with a modified ``job_script_function``.
>>> import adaptive_scheduler
>>> scheduler = adaptive_scheduler.scheduler.DefaultScheduler(cores=10)
>>> run_manager = adaptive_scheduler.server_support.RunManager(
... scheduler=scheduler
... ).start()
Or an example using `ipyparallel.Client`.
>>> from functools import partial
>>> import adaptive_scheduler
>>> scheduler = adaptive_scheduler.scheduler.DefaultScheduler(
... cores=10, executor_type="ipyparallel",
... )
>>> def goal(learner):
... return learner.npoints > 2000
>>> run_manager = adaptive_scheduler.server_support.RunManager(
... scheduler=scheduler,
... goal=goal,
... log_interval=30,
... save_interval=30,
... )
>>> run_manager.start()
"""
def __init__(
self,
scheduler: BaseScheduler,
learners: list[adaptive.BaseLearner],
fnames: list[str] | list[Path],
*,
goal: GoalTypes | None = None,
check_goal_on_start: bool = True,
runner_kwargs: dict | None = None,
url: str | None = None,
save_interval: float = 300,
log_interval: float = 300,
job_name: str = "adaptive-scheduler",
job_manager_interval: float = 60,
kill_interval: float = 60,
kill_on_error: str | Callable[[list[str]], bool] | None = "srun: error:",
move_old_logs_to: str | Path | None = "old_logs",
db_fname: str | Path | None = None,
overwrite_db: bool = True,
job_manager_kwargs: dict[str, Any] | None = None,
kill_manager_kwargs: dict[str, Any] | None = None,
loky_start_method: LOKY_START_METHODS = "loky",
cleanup_first: bool = False,
save_dataframe: bool = False,
dataframe_format: _DATAFRAME_FORMATS = "pickle",
max_log_lines: int = 500,
max_fails_per_job: int = 50,
max_simultaneous_jobs: int = 100,
initializers: list[Callable[[], None]] | None = None,
) -> None:
super().__init__()
# Set from arguments
self.scheduler = scheduler
self.goal = smart_goal(goal, learners)
self.check_goal_on_start = check_goal_on_start
self.runner_kwargs = runner_kwargs
self.save_interval = save_interval
self.log_interval = log_interval
self.job_name = job_name
self.job_manager_interval = job_manager_interval
self.kill_interval = kill_interval
self.kill_on_error = kill_on_error
self.move_old_logs_to = _maybe_path(move_old_logs_to)
self.db_fname = Path(db_fname or f"{job_name}-database.json")
self.overwrite_db = overwrite_db
self.job_manager_kwargs = job_manager_kwargs or {}
self.kill_manager_kwargs = kill_manager_kwargs or {}
self.loky_start_method = loky_start_method
self.save_dataframe = save_dataframe
self.dataframe_format = dataframe_format
self.max_log_lines = max_log_lines
self.max_fails_per_job = max_fails_per_job
self.max_simultaneous_jobs = max_simultaneous_jobs
self.initializers = initializers
# Track job start times, (job_name, start_time) -> request_time
self._job_start_time_dict: dict[tuple[str, str], str] = {}
for key in ["max_fails_per_job", "max_simultaneous_jobs"]:
if key in self.job_manager_kwargs:
msg = (
f"The `{key}` argument is not allowed in `job_manager_kwargs`."
" Please specify it in `RunManager.__init__` instead.",
)
raise ValueError(msg)
if self.save_dataframe:
_at_least_adaptive_version("0.14.0", "save_dataframe")
# Set in methods
self.start_time: float | None = None
self.end_time: float | None = None
self._start_one_by_one_task: (
tuple[
asyncio.Future,
list[asyncio.Task],
]
| None
) = None
# Set on init
self.learners = learners
self.fnames = fnames
if isinstance(self.fnames[0], (list, tuple)):
# For a BalancingLearner
assert isinstance(self.fnames[0][0], (str, Path))
else:
assert isinstance(self.fnames[0], (str, Path))
self.job_names = [f"{self.job_name}-{i}" for i in range(len(self.learners))]
if cleanup_first:
self.scheduler.cancel(self.job_names)
self.cleanup(remove_old_logs_folder=True)
self.url = url or get_allowed_url()
self.database_manager = DatabaseManager(
url=self.url,
scheduler=self.scheduler,
db_fname=self.db_fname,
learners=self.learners,
fnames=self.fnames,
overwrite_db=self.overwrite_db,
initializers=self.initializers,
)
self.job_manager = JobManager(
self.job_names,
self.database_manager,
scheduler=self.scheduler,
interval=self.job_manager_interval,
max_fails_per_job=self.max_fails_per_job,
max_simultaneous_jobs=self.max_simultaneous_jobs,
# Launcher command line options
save_dataframe=self.save_dataframe,
dataframe_format=self.dataframe_format,
loky_start_method=self.loky_start_method,
log_interval=self.log_interval,
save_interval=self.save_interval,
runner_kwargs=self.runner_kwargs,
goal=self.goal,
**self.job_manager_kwargs,
)
self.kill_manager: KillManager | None
if self.kill_on_error is not None:
self.kill_manager = KillManager(
scheduler=self.scheduler,
database_manager=self.database_manager,
error=self.kill_on_error,
interval=self.kill_interval,
move_to=self.move_old_logs_to,
**self.kill_manager_kwargs,
)
else:
self.kill_manager = None
def _setup(self) -> None:
self.database_manager.start()
if self.check_goal_on_start:
# Check if goal already reached
# Only works after the `database_manager` has started.
done_fnames = [
fname
for fname, learner in zip(self.fnames, self.learners)
if self.goal(learner)
]
self.database_manager._stop_requests(done_fnames) # type: ignore[arg-type]
self.job_manager.start()
if self.kill_manager:
self.kill_manager.start()
self.start_time = time.time()
[docs]
def start(self, wait_for: RunManager | None = None) -> RunManager: # type: ignore[override]
"""Start the RunManager and optionally wait for another RunManager to finish."""
_track_and_maybe_cancel_existing(self)
if wait_for is not None:
self._start_one_by_one_task = start_one_by_one(wait_for, self)
else:
super().start()
return self
async def _manage(self) -> None:
assert self.job_manager.task is not None
while not self.job_manager.task.done():
if self.job_manager._request_times:
for job in self.database_manager.as_dicts():
start_time = job["start_time"]
job_name = job["job_name"]
# Check if the job actually started (not cancelled)
if (
start_time is not None
and job_name in self.job_manager._request_times
and (job_name, start_time) not in self._job_start_time_dict
):
request_time = self.job_manager._request_times.pop(job_name)
self._job_start_time_dict[job_name, start_time] = request_time
if await sleep_unless_task_is_done(
self.database_manager.task, # type: ignore[arg-type]
5,
): # if true, we are done
break
self.end_time = time.time()
[docs]
def job_starting_times(self) -> list[float]:
"""Return the starting times of the jobs."""
return [
_time_between(end, start)
for (_, start), end in self._job_start_time_dict.items()
]
[docs]
def cancel(self) -> None:
"""Cancel the manager tasks and the jobs in the queue."""
self.database_manager.cancel()
self.job_manager.cancel()
if self.kill_manager is not None:
self.kill_manager.cancel()
self.scheduler.cancel(self.job_names)
if self.task is not None:
self.task.cancel()
self.end_time = time.time()
if self._start_one_by_one_task is not None:
self._start_one_by_one_task[0].cancel()
[docs]
def cleanup(self, *, remove_old_logs_folder: bool = False) -> None:
"""Cleanup the log and batch files."""
for fname in self.fnames:
fname_cloudpickle = fname_to_learner_fname(fname)
with suppress(FileNotFoundError):
fname_cloudpickle.unlink()
if "ipyparallel" in self.scheduler.executor_type:
# executor_type is a tuple or string
_delete_old_ipython_profiles(self.scheduler)
cleanup_scheduler_files(
job_names=self.job_names,
scheduler=self.scheduler,
with_progress_bar=True,
move_to=self.move_old_logs_to,
)
if remove_old_logs_folder and self.move_old_logs_to is not None:
with suppress(FileNotFoundError):
shutil.rmtree(self.move_old_logs_to)
[docs]
def parse_log_files(self, *, only_last: bool = True) -> pd.DataFrame:
"""Parse the log-files and convert it to a `~pandas.core.frame.DataFrame`.
Parameters
----------
only_last
Only look use the last printed status message.
Returns
-------
pandas.core.frame.DataFrame
"""
return parse_log_files(
self.database_manager,
self.scheduler,
only_last=only_last,
)
[docs]
def task_status(self) -> None:
r"""Print the stack of the `asyncio.Task`\s."""
if self.job_manager.task is not None:
self.job_manager.task.print_stack()
if self.database_manager.task is not None:
self.database_manager.task.print_stack()
if self.kill_manager is not None and self.kill_manager.task is not None:
self.kill_manager.task.print_stack()
if self.task is not None:
self.task.print_stack()
[docs]
def get_database(self) -> pd.DataFrame:
"""Get the database as a `pandas.DataFrame`."""
return pd.DataFrame(self.database_manager.as_dicts())
[docs]
def load_learners(self) -> None:
"""Load the learners in parallel using `adaptive_scheduler.utils.load_parallel`."""
load_parallel(self.learners, self.fnames)
[docs]
def elapsed_time(self) -> float:
"""Total time elapsed since the `RunManager` was started."""
if not self.is_started:
return 0
assert self.job_manager.task is not None # for mypy
if self.job_manager.task.done():
end_time = self.end_time
if end_time is None:
# task was cancelled before it began
assert self.job_manager.task.cancelled()
return 0
else:
end_time = time.time()
return end_time - self.start_time # type: ignore[operator]
[docs]
def status(self) -> str:
"""Return the current status of the `RunManager`."""
if not self.is_started:
return "not yet started"
try:
assert self.job_manager.task is not None
self.job_manager.task.result()
except asyncio.InvalidStateError:
status = "running"
except asyncio.CancelledError:
status = "cancelled"
except Exception: # noqa: BLE001
status = "failed"
console.log("`JobManager` failed because of the following")
console.print_exception(show_locals=True)
else:
status = "finished"
try:
assert self.database_manager.task is not None # for mypy
self.database_manager.task.result()
except (asyncio.InvalidStateError, asyncio.CancelledError):
pass
except Exception: # noqa: BLE001
status = "failed"
console.log("`DatabaseManager` failed because of the following")
console.print_exception(show_locals=True)
if status == "running":
return "running"
if self.end_time is None:
self.end_time = time.time()
return status
def _repr_html_(self) -> None:
return info(self)
[docs]
def info(self) -> None:
return info(self)
[docs]
def load_dataframes(self) -> pd.DataFrame:
"""Load the `pandas.DataFrame`s with the most recently saved learners data."""
if not self.save_dataframe:
msg = "The `save_dataframe` option was not set to True."
raise ValueError(msg)
return load_dataframes(self.fnames, format=self.dataframe_format) # type: ignore[return-value]
[docs]
def remove_existing_data(
self,
*,
move_to: str | Path | None = None,
force: bool = False,
) -> None:
"""Remove the existing data files.
Parameters
----------
move_to
Move the files to this directory. If None the files will be deleted.
force
Remove the files even if the `RunManager` is already started.
"""
if self.status() != "running" and not force:
msg = (
"Data files can only be removed if the `RunManager` is not running."
" Use `force=True` to remove them anyway."
)
raise RuntimeError(msg)
_remove_or_move_files(self.fnames, move_to=move_to)
async def _wait_for_finished(
manager_first: RunManager,
manager_second: RunManager,
goal: Callable[[RunManager], bool] | None = None,
interval: float = 120,
) -> None:
if goal is None:
assert manager_first.task is not None # for mpypy
await manager_first.task
else:
while not goal(manager_first):
await asyncio.sleep(interval)
manager_second.start()
def _start_after(
manager_first: RunManager,
manager_second: RunManager,
goal: Callable[[RunManager], bool] | None = None,
interval: float = 120,
) -> asyncio.Task:
if manager_second.is_started:
msg = "The second manager must not be started yet."
raise ValueError(msg)
coro = _wait_for_finished(manager_first, manager_second, goal, interval)
return asyncio.create_task(coro)
[docs]
def start_one_by_one(
*run_managers: RunManager,
goal: Callable[[RunManager], bool] | None = None,
interval: float = 120,
) -> tuple[asyncio.Future, list[asyncio.Task]]:
"""Start a list of RunManagers after each other.
Parameters
----------
run_managers
A list of RunManagers.
goal
A callable that takes a RunManager as argument and returns a boolean.
If `goal` is not None, the RunManagers will be started after `goal`
returns True for the previous RunManager. If `goal` is None, the
RunManagers will be started after the previous RunManager has finished.
interval
The interval at which to check if `goal` is True. Only used if `goal`
is not None.
Returns
-------
tuple[asyncio.Future, list[asyncio.Future]]
The first element is the grouped task that starts all RunManagers.
The second element is a list of tasks that start each RunManager.
Examples
--------
>>> manager_1 = adaptive_scheduler.slurm_run(
... learners[:5],
... fnames[:5],
... partition="hb120rsv2-low",
... goal=0.01,
... name="first",
... )
>>> manager_1.start()
>>> manager_2 = adaptive_scheduler.slurm_run(
... learners[5:],
... fnames[5:],
... partition="hb120rsv2-low",
... goal=0.01,
... name="second",
... )
>>> # Start second when the first RunManager has more than 1000 points.
>>> def start_goal(run_manager):
... df = run_manager.parse_log_files()
... npoints = df.get("npoints")
... if npoints is None:
... return False
... return npoints.sum() > 1000
>>> tasks = adaptive_scheduler.start_one_by_one(
... manager_1,
... manager_2,
... goal=start_goal,
... )
"""
uniques = ["job_name", "db_fname"]
for u in uniques:
if len({getattr(r, u) for r in run_managers}) != len(run_managers):
msg = (
f"All `RunManager`s must have a unique {u}."
" If using `slurm_run` these are controlled through the `name` argument.",
)
raise ValueError(msg)
tasks = [
_start_after(run_managers[i], run_managers[i + 1], goal, interval)
for i in range(len(run_managers) - 1)
]
return asyncio.gather(*tasks), tasks
def _track_and_maybe_cancel_existing(current_run_manager: RunManager) -> None:
"""Cancel the existing run manager with the same name.
Additionally, update the ACTIVE_RUN_MANAGERS list with the new run manager.
Emit a warning when canceling another run manager.
"""
# First, cancel and remove the old run manager(s) with the same job_name
to_cancel = [
weak_rm
for weak_rm in ACTIVE_RUN_MANAGERS
if weak_rm() is not None
and weak_rm().job_name == current_run_manager.job_name # type: ignore[union-attr]
and weak_rm() is not current_run_manager # type: ignore[union-attr]
]
for weak_rm in to_cancel:
rm = weak_rm()
if rm is not None:
# Emit a warning before canceling
warnings.warn(
f"Cancelling a RunManager with job_name '{rm.job_name}' because"
" another instance is created with the same name.",
UserWarning,
stacklevel=2,
)
rm.cancel()
ACTIVE_RUN_MANAGERS.remove(weak_rm)
# Then, check if the current run manager is already in the list
current_weak_rm = weakref.ref(current_run_manager)
if not any(current_weak_rm() is rm() for rm in ACTIVE_RUN_MANAGERS):
ACTIVE_RUN_MANAGERS.append(current_weak_rm)
# Cleanup: Remove all run_managers that are done, cancelled, or have been garbage collected
ACTIVE_RUN_MANAGERS[:] = [
rm
for rm in ACTIVE_RUN_MANAGERS
if rm() is not None and rm().end_time is None # type: ignore[union-attr]
]