Source code for adaptive_scheduler._server_support.base_manager

from __future__ import annotations

import abc
import asyncio
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from collections.abc import Coroutine


class ManagerAlreadyStartedError(Exception):
    """Raised when a manager is already started."""


[docs] class BaseManager(metaclass=abc.ABCMeta): def __init__(self) -> None: self.ioloop: asyncio.events.AbstractEventLoop | None = None self._coro: Coroutine | None = None self.task: asyncio.Task | None = None
[docs] def start(self) -> BaseManager: if self.is_started: msg = f"{self.__class__} is already started!" raise ManagerAlreadyStartedError(msg) self._setup() self.ioloop = asyncio.get_event_loop() self._coro = self._manage() self.task = self.ioloop.create_task(self._coro) return self
@property def is_started(self) -> bool: return self.task is not None
[docs] def cancel(self) -> bool | None: if self.is_started: assert self.task is not None # for mypy return self.task.cancel() return None
def _setup(self) -> None: # noqa: B027 """Is run in the beginning of `self.start`.""" @abc.abstractmethod async def _manage(self) -> None: # pragma: no cover pass