Source code for adaptive_scheduler._server_support.base_manager
from__future__importannotationsimportabcimportasynciofromtypingimportTYPE_CHECKINGifTYPE_CHECKING:fromcollections.abcimportCoroutineclassManagerAlreadyStartedError(Exception):"""Raised when a manager is already started."""
[docs]defstart(self)->BaseManager:ifself.is_started:msg=f"{self.__class__} is already started!"raiseManagerAlreadyStartedError(msg)self._setup()self.ioloop=asyncio.get_event_loop()self._coro=self._manage()self.task=self.ioloop.create_task(self._coro)returnself