Source code for beat.monodomain_solver
import logging
from dataclasses import dataclass, field
from typing import Protocol
import numpy as np
from .monodomain_model import MonodomainModel
from .telemetry import BaseMonitor, NullMonitor
logger = logging.getLogger(__name__)
EPS = 1e-12
[docs]
class ODESolver(Protocol):
def to_dolfin(self) -> None: ...
def from_dolfin(self) -> None: ...
def ode_to_pde(self) -> None: ...
def pde_to_ode(self) -> None: ...
def step(self, t0: float, dt: float) -> None: ...
[docs]
@dataclass
class MonodomainSplittingSolver:
pde: MonodomainModel
ode: ODESolver
theta: float = 1.0
monitor: BaseMonitor = field(default_factory=NullMonitor)
def __post_init__(self) -> None:
# assert np.isclose(self.theta, 1.0), "Only first order splitting is implemented"
self.ode.to_dolfin() # numpy array (ODE solver) -> dolfin function
self.ode.ode_to_pde() # dolfin function in ODE space (quad?) -> CG1 dolfin function
self.pde.assign_previous()
def solve(self, interval, dt):
T0, T = interval
if dt is None:
dt = T - T0
t0 = T0
t1 = T0 + dt
while t1 < T + EPS:
logger.debug(f"Solving on t = ({t0:.2f}, {t0:.2f})")
self.step((t0, t1))
t0 = t1
t1 = t0 + dt
def step(self, interval):
# Extract some parameters for readability
theta = self.theta
# Extract time domain
t0, t1 = interval
logger.debug(f"Stepping from {t0} to {t1} using theta = {theta}")
dt = t1 - t0
t = t0 + theta * dt
logger.debug(f"Tentative ODE step with t0={t0:.5f} dt={theta * dt:.5f}")
with self.monitor.track_time("total_step"):
with self.monitor.track_time("ode_step"):
self.ode.step(t0=t0, dt=theta * dt)
with self.monitor.track_time("ode_to_dolfin"):
# numpy array (ODE solver) -> dolfin function
self.ode.to_dolfin()
with self.monitor.track_time("ode_to_pde"):
# dolfin function in ODE space (quad?) -> CG1 dolfin function
self.ode.ode_to_pde()
with self.monitor.track_time("pde_assign_previous_before"):
self.pde.assign_previous()
logger.debug("PDE step")
with self.monitor.track_time("pde_step"):
self.pde.step((t0, t1))
with self.monitor.track_time("pde_to_ode"):
# CG1 dolfin function -> dolfin function in ODE space (quad?)
self.ode.pde_to_ode()
with self.monitor.track_time("ode_from_dolfin"):
self.ode.from_dolfin()
# If first order splitting, we are done. Otherwise, we need to do a corrective ODE step.
if np.isclose(theta, 1.0):
with self.monitor.track_time("pde_assign_previous_after"):
# But first update previous value in PDE
self.pde.assign_previous()
else:
logger.debug(f"Corrective ODE step with t0={t:5f} and dt={(1.0 - theta) * dt:.5f}")
with self.monitor.track_time("corrective_ode_step"):
self.ode.step(t, (1.0 - theta) * dt)
with self.monitor.track_time("corrective_ode_to_dolfin"):
# numpy array (ODE solver) -> dolfin function
self.ode.to_dolfin()
with self.monitor.track_time("corrective_ode_to_pde"):
# dolfin function in ODE space (quad?) -> CG1 dolfin function
self.ode.ode_to_pde()
with self.monitor.track_time("corrective_pde_assign_previous"):
self.pde.assign_previous()
# Alert the monitor that the step is finished so it can handle logging/aggregation
self.monitor.advance_step(t0, t1)