Source code for adaptive_scheduler._server_support.parse_logs
from __future__ import annotations
import datetime
import json
import os
from contextlib import suppress
from typing import TYPE_CHECKING
import pandas as pd
if TYPE_CHECKING:
from typing import Any
from adaptive_scheduler.scheduler import BaseScheduler
from .database_manager import DatabaseManager
def _get_infos(fname: str, *, only_last: bool = True) -> list[dict[str, Any]]:
status_lines: list[dict[str, Any]] = []
with open(fname, encoding="utf-8") as f: # noqa: PTH123
lines = f.readlines()
for line in reversed(lines):
with suppress(Exception):
info = json.loads(line)
if info["event"] == "current status":
status_lines.append(info)
if only_last:
return status_lines
return status_lines
[docs]
def parse_log_files(
database_manager: DatabaseManager,
scheduler: BaseScheduler,
*,
only_last: bool = True,
) -> pd.DataFrame:
"""Parse the log-files and convert it to a `~pandas.core.frame.DataFrame`.
Parameters
----------
job_names
List of job names.
database_manager
A `DatabaseManager` instance.
scheduler
A scheduler instance from `adaptive_scheduler.scheduler`.
only_last
Only look use the last printed status message.
Returns
-------
`~pandas.core.frame.DataFrame`
"""
_queue = scheduler.queue()
database_manager.update(_queue)
infos = []
for entry in database_manager.as_dicts():
log_fname = entry["log_fname"]
if log_fname is None or not os.path.exists(log_fname): # noqa: PTH110
continue
for info_dict in _get_infos(log_fname, only_last=only_last):
info_dict.pop("event") # this is always "current status"
info_dict["timestamp"] = datetime.datetime.strptime( # noqa: DTZ007
info_dict["timestamp"],
"%Y-%m-%d %H:%M.%S",
)
info_dict["elapsed_time"] = pd.to_timedelta(info_dict["elapsed_time"])
info_dict.update(entry)
if "npoints" in info_dict: # ensure it is an int, xref: #223
info_dict["npoints"] = int(info_dict["npoints"])
infos.append(info_dict)
for info_dict in infos:
info_from_queue = _queue.get(info_dict["job_id"])
if info_from_queue is None:
continue
info_dict["state"] = info_from_queue["state"]
info_dict["job_name"] = info_from_queue["job_name"]
return pd.DataFrame(infos)