Source code for beat.telemetry
import abc
import json
import logging
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, Union
from mpi4py import MPI
from petsc4py import PETSc
logger = logging.getLogger(__name__)
[docs]
class BaseMonitor(abc.ABC):
@abc.abstractmethod
@contextmanager
def track_time(self, name: str):
yield
@abc.abstractmethod
def record_ksp(self, ksp: PETSc.KSP) -> None:
pass
@abc.abstractmethod
def advance_step(self, t0: float, t1: float) -> None:
pass
[docs]
class NullMonitor(BaseMonitor):
@contextmanager
def track_time(self, name: str):
yield
def record_ksp(self, ksp: PETSc.KSP) -> None:
pass
def advance_step(self, t0: float, t1: float) -> None:
pass
[docs]
class PerformanceMonitor(BaseMonitor):
"""A monitor that accumulates timings and KSP stats, logging them periodically,
and can save/display a final summary."""
def __init__(self, log_frequency: int = 1, comm: MPI.Intracomm = MPI.COMM_WORLD):
self.log_frequency = log_frequency
self.comm = comm
self.step_counter = 0
self.timings: Dict[str, float] = {}
self.ksp_total_iterations = 0
self.ksp_max_iterations = 0
self.ksp_last_iterations = 0
self.ksp_last_residual_norm = 0.0
self.ksp_last_converged_reason = 0
@contextmanager
def track_time(self, name: str):
tic = time.perf_counter()
try:
yield
finally:
toc = time.perf_counter()
self.timings[name] = self.timings.get(name, 0.0) + (toc - tic)
def record_ksp(self, ksp: PETSc.KSP) -> None:
try:
iterations = int(ksp.getIterationNumber())
self.ksp_last_iterations = iterations
self.ksp_total_iterations += iterations
self.ksp_max_iterations = max(self.ksp_max_iterations, iterations)
self.ksp_last_residual_norm = float(ksp.getResidualNorm())
self.ksp_last_converged_reason = int(ksp.getConvergedReason())
except PETSc.Error:
pass
def advance_step(self, t0: float, t1: float) -> None:
self.step_counter += 1
if self.log_frequency <= 0 or self.step_counter % self.log_frequency != 0:
return
timing_text = ", ".join(f"{name}={value:.6f}s" for name, value in self.timings.items())
logger.info(
f"PDE step timing step={self.step_counter}, "
f"t=({t0:.5f}, {t1:.5f}), "
f"ksp_iterations={self.ksp_last_iterations}, "
f"ksp_residual_norm={self.ksp_last_residual_norm:.6e}, "
f"ksp_converged_reason={self.ksp_last_converged_reason}, "
f"{timing_text}",
)
[docs]
def display_summary(self) -> None:
"""Logs a nicely formatted summary of the accumulated timings and metrics."""
if self.comm.rank != 0:
return
summary = ["\n" + "=" * 50]
summary.append(f"{'PERFORMANCE SUMMARY':^50}")
summary.append("=" * 50)
summary.append(f"Total Steps: {self.step_counter}")
summary.append(f"KSP Total Iterations: {self.ksp_total_iterations}")
summary.append(f"KSP Max Iterations: {self.ksp_max_iterations}")
summary.append("-" * 50)
summary.append(f"{'Metric':<35} | {'Time (s)':>10}")
summary.append("-" * 50)
# Sort timings by duration (descending)
sorted_timings = sorted(self.timings.items(), key=lambda x: x[1], reverse=True)
for name, duration in sorted_timings:
summary.append(f"{name:<35} | {duration:>10.4f}")
summary.append("=" * 50 + "\n")
logger.info("\n".join(summary))
[docs]
def save_summary(self, filepath: Union[str, Path]) -> None:
"""Saves the performance metrics to a JSON file."""
if self.comm.rank != 0:
return
data = {
"total_steps": self.step_counter,
"ksp": {
"total_iterations": self.ksp_total_iterations,
"max_iterations": self.ksp_max_iterations,
},
"timings": self.timings,
}
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "w") as f:
json.dump(data, f, indent=4)
logger.info(f"Performance summary saved to {filepath}")