Source code for adaptive_scheduler._server_support.kill_manager

from __future__ import annotations

import asyncio
from pathlib import Path
from typing import TYPE_CHECKING, Callable

from adaptive_scheduler.utils import (
    _remove_or_move_files,
    sleep_unless_task_is_done,
)

from .base_manager import BaseManager
from .common import _maybe_path, log

if TYPE_CHECKING:
    from adaptive_scheduler.scheduler import BaseScheduler

    from .database_manager import DatabaseManager


[docs] def logs_with_string_or_condition( error: str | Callable[[list[str]], bool], database_manager: DatabaseManager, ) -> list[tuple[str, list[str]]]: """Get jobs that have `string` (or apply a callable) inside their log-file. Either use `string` or `error`. Parameters ---------- error String that is searched for or callable that is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not. database_manager A `DatabaseManager` instance. Returns ------- has_string A list ``(job_name, fnames)``, which have the string inside their log-file. """ if isinstance(error, str): has_error = lambda lines: error in "".join(lines) # noqa: E731 elif callable(error): has_error = error else: msg = "`error` can only be a `str` or `callable`." raise TypeError(msg) def file_has_error(fname: Path) -> bool: if not fname.exists(): return False with fname.open(encoding="utf-8") as f: lines = f.readlines() return has_error(lines) have_error = [] for entry in database_manager.as_dicts(): fnames = entry["output_logs"] if entry["job_id"] is not None and any(file_has_error(Path(f)) for f in fnames): all_fnames = [*fnames, entry["log_fname"]] have_error.append((entry["job_name"], all_fnames)) return have_error
[docs] class KillManager(BaseManager): """Kill manager. Automatically cancel jobs that contain an error (or other condition) in the log files. Parameters ---------- scheduler A scheduler instance from `adaptive_scheduler.scheduler`. database_manager A `DatabaseManager` instance. error If ``error`` is a string and is found in the log files, the job will be cancelled and restarted. If it is a callable, it is applied to the log text. Must take a single argument, a list of strings, and return True if the job has to be killed, or False if not. interval Time in seconds between checking for the condition. max_cancel_tries Try maximum `max_cancel_tries` times to cancel a job. move_to If a job is cancelled the log is either removed (if ``move_to=None``) or moved to a folder (e.g. if ``move_to='old_logs'``). """ def __init__( self, scheduler: BaseScheduler, database_manager: DatabaseManager, *, error: str | Callable[[list[str]], bool] = "srun: error:", interval: float = 600, max_cancel_tries: int = 5, move_to: str | Path | None = None, ) -> None: super().__init__() self.scheduler = scheduler self.database_manager = database_manager self.error = error self.interval = interval self.max_cancel_tries = max_cancel_tries self.move_to = _maybe_path(move_to) self.cancelled: list[str] = [] self.deleted: list[str] = [] async def _manage(self) -> None: while True: try: self.database_manager.update() failed_jobs = logs_with_string_or_condition( self.error, self.database_manager, ) to_cancel: list[str] = [] to_delete: list[str] = [] for job_name, fnames in failed_jobs: to_cancel.append(job_name) to_delete.extend(fnames) self.scheduler.cancel( to_cancel, with_progress_bar=False, max_tries=self.max_cancel_tries, ) _remove_or_move_files( to_delete, with_progress_bar=False, move_to=self.move_to, ) self.cancelled.extend(to_cancel) self.deleted.extend(to_delete) if await sleep_unless_task_is_done( self.database_manager.task, # type: ignore[arg-type] self.interval, ): # if true, we are done return except asyncio.CancelledError: # noqa: PERF203 log.info("task was cancelled because of a CancelledError") raise except Exception as e: # noqa: BLE001 log.exception("got exception in kill manager", exception=str(e)) if await sleep_unless_task_is_done( self.database_manager.task, # type: ignore[arg-type] self.interval, ): # if true, we are done return