Source code for adaptive_scheduler._server_support.common
from __future__ import annotations
import asyncio
import itertools
import logging
import shutil
import socket
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
import zmq
import zmq.asyncio
import zmq.ssh
from rich.console import Console
from adaptive_scheduler.utils import (
_progress,
_remove_or_move_files,
)
if TYPE_CHECKING:
from adaptive_scheduler.scheduler import BaseScheduler
console = Console()
logger = logging.getLogger("adaptive_scheduler.server")
logger.setLevel(logging.INFO)
log = structlog.wrap_logger(logger)
[docs]
class MaxRestartsReachedError(Exception):
"""Max restarts reached.
Jobs can fail instantly because of an error in
your Python code which results jobs being started indefinitely.
"""
[docs]
def get_allowed_url() -> str:
"""Get an allowed url for the database manager.
Returns
-------
url
An url that can be used for the database manager, with the format
``tcp://ip_of_this_machine:allowed_port.``.
"""
ip = socket.gethostbyname(socket.gethostname())
port = zmq.ssh.tunnel.select_random_ports(1)[0]
return f"tcp://{ip}:{port}"
def _get_matching_files(f: Path, variable: str) -> list[Path]:
modified_name = f.name.replace(variable, "*")
return list(f.parent.glob(modified_name))
def _get_all_files(job_names: list[str], scheduler: BaseScheduler) -> set[Path]:
log_fnames = [scheduler.log_fname(name) for name in job_names]
_output_fnames = [scheduler.output_fnames(name) for name in job_names]
output_fnames: list[Path] = list(itertools.chain(*_output_fnames))
batch_fnames = [scheduler.batch_fname(name) for name in job_names]
fnames = log_fnames + output_fnames + batch_fnames
all_files = []
for f in fnames:
matching_files = _get_matching_files(f, scheduler._JOB_ID_VARIABLE)
all_files.extend(matching_files)
# For schedulers that use a single batch file
name_prefix = job_names[0].rsplit("-", 1)[0]
batch_file = scheduler.batch_fname(name_prefix)
if batch_file.exists():
all_files.append(batch_file)
return set(all_files)
[docs]
def cleanup_scheduler_files(
job_names: list[str],
scheduler: BaseScheduler,
*,
with_progress_bar: bool = True,
move_to: str | Path | None = None,
) -> None:
"""Cleanup the scheduler log-files files.
Parameters
----------
job_names
List of job names.
scheduler
A scheduler instance from `adaptive_scheduler.scheduler`.
with_progress_bar
Display a progress bar using `tqdm`.
move_to
Move the file to a different directory.
If None the file is removed.
log_file_folder
The folder in which to delete the log-files.
"""
to_rm = _get_all_files(job_names, scheduler)
_remove_or_move_files(
to_rm,
with_progress_bar=with_progress_bar,
move_to=move_to,
desc="Removing logs and batch files",
)
IPYTHON_PROFILE_PATTERN = "profile_adaptive_scheduler_"
def _ipython_profiles() -> list[Path]:
return list(Path("~/.ipython/").expanduser().glob(f"{IPYTHON_PROFILE_PATTERN}*"))
def _delete_old_ipython_profiles(
scheduler: BaseScheduler,
*,
with_progress_bar: bool = True,
) -> None:
# We need the job_ids because only job_names wouldn't be
# enough information. There might be other job_managers
# running.
running_job_ids = set(scheduler.queue().keys())
to_delete = [
folder
for folder in _ipython_profiles()
if str(folder).split(IPYTHON_PROFILE_PATTERN)[1] not in running_job_ids
]
with ThreadPoolExecutor(256) as ex:
desc = "Submitting deleting old IPython profiles tasks"
pbar = _progress(to_delete, desc=desc)
futs = [ex.submit(shutil.rmtree, folder) for folder in pbar]
desc = "Finishing deleting old IPython profiles"
for fut in _progress(futs, with_progress_bar, desc=desc):
fut.result()
[docs]
def periodically_clean_ipython_profiles(
scheduler: BaseScheduler,
interval: float = 600,
) -> asyncio.Task:
"""Periodically remove old IPython profiles.
In the `RunManager.cleanup` method the profiles will be removed. However,
one might want to remove them earlier.
Parameters
----------
scheduler
A scheduler instance from `adaptive_scheduler.scheduler`.
interval
The interval at which to remove old profiles.
Returns
-------
asyncio.Task
"""
if isinstance(scheduler.executor_type, tuple):
msg = (
"This function is not implemented for multiple executors."
" Please open an issue on GitHub if you need this feature."
)
raise NotImplementedError(msg)
async def clean(interval: float) -> None:
while True:
with suppress(Exception):
_delete_old_ipython_profiles(scheduler, with_progress_bar=False)
await asyncio.sleep(interval)
ioloop = asyncio.get_event_loop()
coro = clean(interval)
return ioloop.create_task(coro)
def _maybe_path(fname: str | Path | None) -> Path | None: # pragma: no cover
"""Convert a string to a Path or return None."""
if fname is None:
return None
if isinstance(fname, str):
return Path(fname)
return fname