import logging
import typing
from dataclasses import dataclass, field
from enum import Enum
import basix
import dolfinx
import dolfinx.fem.petsc
import dolfinx.nls.petsc
import numpy as np
import scifem
import ufl
from .boundary_conditions import BoundaryConditions
from .cardiac_model import CardiacModel
from .geometry import HeartGeometry
from .units import Variable, mesh_factor
T = typing.TypeVar("T", dolfinx.fem.Function, np.ndarray)
logger = logging.getLogger(__name__)
[docs]
def interpolate(x0: T, x1: T, alpha: float):
r"""Interpolate between :math:`x_0` and :math:`x_1`
to find `math:`x_{1-\alpha}`
Parameters
----------
x0 : T
First point
x1 : T
Second point
alpha : float
Amount of interpolate
Returns
-------
T
`math:`x_{1-\alpha}`
"""
return alpha * x0 + (1 - alpha) * x1
[docs]
class Geometry(typing.Protocol):
"""Protocol for geometry objects used in mechanics problems."""
dx: ufl.Measure
ds: ufl.Measure
mesh: dolfinx.mesh.Mesh
facet_tags: dolfinx.mesh.MeshTags
markers: dict[str, tuple[int, int]]
@property
def facet_normal(self) -> ufl.FacetNormal: ...
def surface_area(self, marker: str) -> float: ...
[docs]
class Cavity(typing.NamedTuple):
marker: str
volume: dolfinx.fem.Constant
[docs]
class BaseBC(str, Enum):
"""Base boundary condition"""
fixed = "fixed"
free = "free"
[docs]
@dataclass
class StaticProblem:
model: CardiacModel
geometry: Geometry
parameters: dict[str, typing.Any] = field(default_factory=dict)
bcs: BoundaryConditions = field(default_factory=BoundaryConditions)
cavities: list[Cavity] = field(default_factory=list)
def __post_init__(self):
parameters = type(self).default_parameters()
parameters.update(self.parameters)
self.parameters = parameters
self._init_spaces()
self._init_forms()
logger.debug("Initialized StaticProblem with parameters:")
for key, value in self.parameters.items():
logger.debug(f" {key}: {value}")
logger.debug(f"Number of cavities: {len(self.cavities)}")
logger.debug(f"Boundary conditions: {self.bcs}")
def _init_spaces(self):
"""Initialize function spaces"""
logger.debug("Initializing function spaces...")
self._init_u_space()
self._init_p_space()
self._init_cavity_pressure_spaces()
self._init_rigid_body()
def _init_p_space(self):
logger.debug("Initializing pressure function space...")
if self.is_incompressible:
logger.debug(
"Model is incompressible, initializing pressure space with Lagrange multiplier",
)
# Need lagrange multiplier for incompressible model
p_family, p_degree = self.parameters["p_space"].split("_")
p_element = basix.ufl.element(
family=p_family,
cell=self.geometry.mesh.basix_cell(),
degree=int(p_degree),
)
self.p_space = dolfinx.fem.functionspace(self.geometry.mesh, p_element)
self.p = dolfinx.fem.Function(self.p_space)
self.p_old = dolfinx.fem.Function(self.p_space)
self.p_test = ufl.TestFunction(self.p_space)
self.dp = ufl.TrialFunction(self.p_space)
else:
logger.debug("Model is compressible, no pressure space needed")
self.p_space = None
self.p_old = None
self.p = None
self.p_test = None
self.dp = None
self.model.compressibility.register(self.p)
def _init_u_space(self):
logger.debug("Initializing displacement function space...")
u_family, u_degree = self.parameters["u_space"].split("_")
logger.debug(f"Displacement space: family={u_family}, degree={u_degree}")
u_element = basix.ufl.element(
family=u_family,
cell=self.geometry.mesh.basix_cell(),
degree=int(u_degree),
shape=(self.geometry.mesh.topology.dim,),
)
self.u_space = dolfinx.fem.functionspace(self.geometry.mesh, u_element)
self.u = dolfinx.fem.Function(self.u_space)
self.u_old = dolfinx.fem.Function(self.u_space)
self.u_test = ufl.TestFunction(self.u_space)
self.du = ufl.TrialFunction(self.u_space)
@property
def is_incompressible(self):
return not self.model.compressibility.is_compressible()
@staticmethod
def default_parameters():
return {
"u_space": "P_2",
"p_space": "P_1",
"base_bc": BaseBC.free,
"rigid_body_constraint": False,
"mesh_unit": "m",
"base_marker": "BASE",
"petsc_options": {
"ksp_type": "preonly",
"pc_type": "lu",
"pc_factor_mat_solver_type": "mumps",
},
}
@property
def num_cavity_pressure_states(self):
return len(self.cavities)
def _init_cavity_pressure_spaces(self):
logger.debug("Initializing cavity pressure function spaces...")
self.cavity_pressures = []
self.cavity_pressures_test = []
self.cavity_pressures_trial = []
self.cavity_pressures_old = []
if self.num_cavity_pressure_states > 0:
logger.debug(f"Number of cavity pressure states: {self.num_cavity_pressure_states}")
self.real_space = scifem.create_real_functionspace(self.geometry.mesh)
for _ in range(self.num_cavity_pressure_states):
cavity_pressure = dolfinx.fem.Function(self.real_space)
cavity_pressure_old = dolfinx.fem.Function(self.real_space)
cavity_pressure_test = ufl.TestFunction(self.real_space)
cavity_pressure_trial = ufl.TrialFunction(self.real_space)
self.cavity_pressures.append(cavity_pressure)
self.cavity_pressures_old.append(cavity_pressure_old)
self.cavity_pressures_test.append(cavity_pressure_test)
self.cavity_pressures_trial.append(cavity_pressure_trial)
else:
logger.debug("No cavity pressure states needed")
def _init_rigid_body(self):
logger.debug("Initializing rigid body function space...")
if self.parameters["rigid_body_constraint"]:
logger.debug("Rigid body constraint enabled")
self.rigid_space = scifem.create_real_functionspace(
self.geometry.mesh,
value_shape=(6,),
)
self.r = dolfinx.fem.Function(self.rigid_space)
self.r_old = dolfinx.fem.Function(self.rigid_space)
self.dr = ufl.TrialFunction(self.rigid_space)
self.q = ufl.TestFunction(self.rigid_space)
else:
logger.debug("No rigid body constraint needed")
self.rigid_space = None
self.r = None
self.r_old = None
self.dr = None
self.q = None
def _create_residual_form(self, form: dolfinx.fem.Form) -> list[dolfinx.fem.Form]:
return [
ufl.derivative(form, f, f_test) for f, f_test in zip(self.states, self.test_functions)
]
def _empty_form(self):
return [ufl.as_ufl(0.0) for _ in range(self.num_states)]
def _rigid_body_form(self, u: dolfinx.fem.Function) -> list[dolfinx.fem.Form]:
if not self.parameters["rigid_body_constraint"]:
return self._empty_form()
logger.debug("Creating rigid body constraint form...")
X = ufl.SpatialCoordinate(self.geometry.mesh)
RM = [
ufl.as_vector((1, 0, 0)),
ufl.as_vector((0, 1, 0)),
ufl.as_vector((0, 0, 1)),
ufl.cross(X, ufl.as_vector((1, 0, 0))),
ufl.cross(X, ufl.as_vector((0, 1, 0))),
ufl.cross(X, ufl.as_vector((0, 0, 1))),
]
form = sum(ufl.inner(u, zi) * self.r[i] * ufl.dx for i, zi in enumerate(RM))
forms = self._create_residual_form(form)
return forms
def _material_form(self, u: dolfinx.fem.Function, p: dolfinx.fem.Function):
logger.debug("Creating material form...")
F = ufl.grad(u) + ufl.Identity(3)
J = ufl.det(F)
var_C = ufl.grad(self.u_test).T * F + F.T * ufl.grad(self.u_test)
C = ufl.variable(F.T * F)
forms = self._empty_form()
forms[0] += ufl.inner(self.model.S(C), 0.5 * var_C) * self.geometry.dx
if self.is_incompressible:
forms[-1] += (J - 1.0) * self.p_test * self.geometry.dx
return forms
def _robin_form(
self,
u: dolfinx.fem.Function,
v: dolfinx.fem.Function | None = None,
) -> list[dolfinx.fem.Form]:
forms = self._empty_form()
if not self.bcs.robin:
return forms
logger.debug("Creating Robin boundary condition form...")
form = ufl.as_ufl(0.0)
N = self.geometry.facet_normal
F = ufl.grad(u) + ufl.Identity(3)
J = ufl.det(F)
# Pull back normal vector to the reference configuration
cof = J * ufl.inv(F).T
cofnorm = ufl.sqrt(ufl.dot(cof * N, cof * N))
NN = 1 / cofnorm * cof * N
for robin in self.bcs.robin:
if robin.damping:
# Should be applied to the velocity
continue
k = robin.value.to_base_units() * mesh_factor(str(self.parameters["mesh_unit"]))
if robin.perpendicular:
nn = ufl.Identity(u.ufl_shape[0]) - ufl.outer(NN, NN)
else:
nn = ufl.outer(NN, NN)
value = -nn * k * u
form += -ufl.dot(value, self.u_test) * cofnorm * self.geometry.ds(robin.marker)
forms[0] += form
return forms
def _neumann_form(self, u: dolfinx.fem.Function) -> list[dolfinx.fem.Form]:
forms = self._empty_form()
if not self.bcs.neumann:
return forms
logger.debug("Creating Neumann boundary condition form...")
F = ufl.grad(u) + ufl.Identity(3)
N = self.geometry.facet_normal
ds = self.geometry.ds
form = ufl.as_ufl(0.0)
for neumann in self.bcs.neumann:
t = neumann.traction.to_base_units()
n = t * ufl.det(F) * ufl.inv(F).T * N
form += ufl.inner(self.u_test, n) * ds(neumann.marker)
forms[0] += form
return forms
def _body_force_form(self, u: dolfinx.fem.Function) -> list[dolfinx.fem.Form]:
forms = self._empty_form()
if not self.bcs.body_force:
return forms
logger.debug("Creating body force form...")
form = ufl.as_ufl(0.0)
for body_force in self.bcs.body_force:
form += -ufl.derivative(ufl.inner(body_force, u) * self.geometry.dx, u, self.u_test)
forms[0] += form
return forms
def _cavity_pressure_form(
self,
u: dolfinx.fem.Function,
cavity_pressures: list[dolfinx.fem.Function] | None = None,
):
logger.debug("Creating cavity pressure form...")
if self.num_cavity_pressure_states == 0:
return self._empty_form()
if not isinstance(self.geometry, HeartGeometry):
raise RuntimeError("Cavity pressures are only supported for HeartGeometry")
V_u = self.geometry.volume_form(u)
form = ufl.as_ufl(0.0)
assert cavity_pressures is not None
assert len(self.cavities) == self.num_cavity_pressure_states
for i, (marker, cavity_volume) in enumerate(self.cavities):
area = self.geometry.surface_area(marker)
pendo = cavity_pressures[i]
marker_id = self.geometry.markers[marker][0]
form += pendo * (cavity_volume / area - V_u) * self.geometry.ds(marker_id)
return self._create_residual_form(form)
@property
def base_dirichlet(self):
bcs = []
# First add boundary conditions for the base
if self.parameters["base_bc"] == BaseBC.fixed:
marker = self.geometry.markers[self.parameters["base_marker"]][0]
base_facets = self.geometry.facet_tags.find(marker)
dofs_base = dolfinx.fem.locate_dofs_topological(self.u_space, 2, base_facets)
u_bc_base = dolfinx.fem.Function(self.u_space)
u_bc_base.x.array[:] = 0
bcs.append(dolfinx.fem.dirichletbc(u_bc_base, dofs_base))
# Now check if we have any for bcs
for dirichlet_bc in self.bcs.dirichlet:
if callable(dirichlet_bc):
bcs += dirichlet_bc(self.u_space)
return bcs
@property
def num_states(self):
return (
1
+ self.num_cavity_pressure_states
+ int(self.parameters["rigid_body_constraint"])
+ int(self.is_incompressible)
)
@property
def R(self):
# Order is always (u, cavity pressures, rigid body, p)
R = self._empty_form()
R_material = self._material_form(self.u, p=self.p)
R_cavity = self._cavity_pressure_form(self.u, self.cavity_pressures)
R_robin = self._robin_form(self.u)
R_neumann = self._neumann_form(self.u)
R_rigid = self._rigid_body_form(self.u)
R_body_force = self._body_force_form(self.u)
for i in range(self.num_states):
R[i] += R_material[i]
R[i] += R_cavity[i]
R[i] += R_robin[i]
R[i] += R_neumann[i]
R[i] += R_rigid[i]
R[i] += R_body_force[i]
return R
@property
def states(self):
u = [self.u]
if self.num_cavity_pressure_states > 0:
u += self.cavity_pressures
if self.parameters["rigid_body_constraint"]:
u.append(self.r)
if self.is_incompressible:
u.append(self.p)
return u
@property
def old_states(self):
u = [self.u_old]
if self.num_cavity_pressure_states > 0:
u += self.cavity_pressures_old
if self.parameters["rigid_body_constraint"]:
u.append(self.r_old)
if self.is_incompressible:
u.append(self.p_old)
return u
[docs]
def update_old_states(self):
"""Update old states to current values"""
logger.debug("Updating old states to current values...")
logger.debug("Updating old displacement state to current value...")
self.u_old.x.array[:] = self.u.x.array.copy()
for i in range(self.num_cavity_pressure_states):
logger.debug(f"Updating old cavity pressure state {i} to current value...")
self.cavity_pressures_old[i].x.array[:] = self.cavity_pressures[i].x.array.copy()
if self.parameters["rigid_body_constraint"]:
logger.debug("Updating old rigid body constraint state to current value...")
self.r_old.x.array[:] = self.r.x.array.copy()
if self.is_incompressible:
logger.debug("Updating old pressure state to current value...")
self.p_old.x.array[:] = self.p.x.array.copy()
[docs]
def reset_states(self):
"""Reset states to old values"""
logger.debug("Resetting states to old values...")
logger.debug("Resetting displacement state to old value...")
self.u.x.array[:] = self.u_old.x.array.copy()
for i in range(self.num_cavity_pressure_states):
logger.debug(f"Resetting cavity pressure state {i} to old value...")
self.cavity_pressures[i].x.array[:] = self.cavity_pressures_old[i].x.array.copy()
if self.parameters["rigid_body_constraint"]:
logger.debug("Resetting rigid body constraint state to old value...")
self.r.x.array[:] = self.r_old.x.array.copy()
if self.is_incompressible:
logger.debug("Resetting pressure state to old value...")
self.p.x.array[:] = self.p_old.x.array.copy()
@property
def test_functions(self):
u = [self.u_test]
if self.num_cavity_pressure_states > 0:
u += self.cavity_pressures_test
if self.parameters["rigid_body_constraint"]:
u.append(self.q)
if self.is_incompressible:
u.append(self.p_test)
return u
@property
def trial_functions(self):
u = [self.du]
if self.num_cavity_pressure_states > 0:
u += self.cavity_pressures_trial
if self.parameters["rigid_body_constraint"]:
u.append(self.dr)
if self.is_incompressible:
u.append(self.dp)
return u
def K(self, R):
K = []
for i in range(self.num_states):
K_row = [
ufl.derivative(R[i], f, df) for f, df in zip(self.states, self.trial_functions)
]
K.append(K_row)
return K
def _init_forms(self) -> None:
"""Initialize ufl forms"""
logger.debug("Initializing ufl forms...")
# Markers
if self.geometry.markers is None:
raise RuntimeError("Missing markers in geometry")
R = self.R
u = self.states
K = self.K(R)
assert len(R) == self.num_states
assert len(K) == self.num_states
assert all(len(Ki) == self.num_states for Ki in K)
bcs = self.base_dirichlet
logger.debug("Creating Newton solver...")
self._solver = scifem.NewtonSolver(
R,
K,
u,
bcs=bcs,
max_iterations=25,
petsc_options=self.parameters["petsc_options"],
)
def update_fields(self):
pass
def reset(self):
logger.debug("Resetting problem...")
self.reset_states()
self._init_forms()
def solve(self) -> bool:
logger.debug("Solving the system...")
self.update_old_states()
ret = self._solver.solve(rtol=1e-10, atol=1e-6)
self.update_fields()
return ret
[docs]
class DynamicProblem(StaticProblem):
def __post_init__(self):
super().__post_init__()
# Just make sure we have units on rho and dt
for key, default_unit in [("rho", "kg/m^3"), ("dt", "s")]:
if not isinstance(self.parameters[key], Variable):
self.parameters[key] = Variable(self.parameters[key], default_unit)
def _init_u_space(self):
super()._init_u_space()
self.u_old = dolfinx.fem.Function(self.u_space)
self.v_old = dolfinx.fem.Function(self.u_space)
self.a_old = dolfinx.fem.Function(self.u_space)
def _init_p_space(self):
super()._init_p_space()
if self.is_incompressible:
self.p_old = dolfinx.fem.Function(self.p_space)
else:
self.p_old = None
def _init_cavity_pressure_spaces(self):
super()._init_cavity_pressure_spaces()
self.cavity_pressures_old = []
for _ in range(self.num_cavity_pressure_states):
cavity_pressure_old = dolfinx.fem.Function(self.real_space)
self.cavity_pressures_old.append(cavity_pressure_old)
def _material_form(self, u, v, p):
F = ufl.grad(u) + ufl.Identity(3)
C = ufl.variable(F.T * F)
J = ufl.det(F)
F_dot = ufl.grad(v)
C_dot = ufl.variable(F_dot.T * F + F.T * F_dot)
var_C = ufl.grad(self.u_test).T * F + F.T * ufl.grad(self.u_test)
forms = self._empty_form()
forms[0] += ufl.inner(self.model.S(C, C_dot=C_dot), 0.5 * var_C) * self.geometry.dx
if self.is_incompressible:
forms[-1] += (J - 1.0) * self.p_test * self.geometry.dx
return forms
def _acceleration_form(self, a: dolfinx.fem.Function):
rho = self.parameters["rho"].to_base_units()
forms = self._empty_form()
forms[0] += ufl.inner(rho * a, self.u_test) * self.geometry.dx
return forms
def _robin_form(self, u: dolfinx.fem.Function, v: dolfinx.fem.Function | None = None):
forms = super()._robin_form(u)
N = self.geometry.facet_normal
for robin in self.bcs.robin:
if not robin.damping:
# Should be applied to the velocity
continue
assert v is not None
k = robin.value.to_base_units() * mesh_factor(str(self.parameters["mesh_unit"]))
value = ufl.inner(k * v, N)
forms[0] += ufl.inner(value * self.u_test, N) * self.geometry.ds(robin.marker)
return forms
@property
def R(self):
# Order is always (u, cavity pressures, rigid body, p)
alpha_m = self.parameters["alpha_m"]
alpha_f = self.parameters["alpha_f"]
a_new = self.a(u=self.u, u_old=self.u_old, v_old=self.v_old, a_old=self.a_old)
v_new = self.v(a=a_new, v_old=self.v_old, a_old=self.a_old)
u = interpolate(self.u_old, self.u, alpha_f)
v = interpolate(self.v_old, v_new, alpha_f)
a = interpolate(self.a_old, a_new, alpha_m)
R = self._empty_form()
R_material = self._material_form(u=u, v=v, p=self.p)
R_cavity = self._cavity_pressure_form(u, self.cavity_pressures)
R_robin = self._robin_form(u=u, v=v)
R_neumann = self._neumann_form(u)
R_rigid = self._rigid_body_form(u)
R_acceleration = self._acceleration_form(a)
for i in range(self.num_states):
R[i] += R_material[i]
R[i] += R_cavity[i]
R[i] += R_robin[i]
R[i] += R_neumann[i]
R[i] += R_rigid[i]
R[i] += R_acceleration[i]
return R
@staticmethod
def default_parameters():
parameters = StaticProblem.default_parameters()
parameters.update(
{
"dt": Variable(1e-3, "s"),
"rho": Variable(1e3, "kg/m^3"),
"alpha_m": 0.2,
"alpha_f": 0.4,
},
)
return parameters
[docs]
def v(
self,
a: T,
v_old: T,
a_old: T,
) -> T:
r"""
Velocity computed using the generalized
:math:`alpha`-method
.. math::
v_{i+1} = v_i + (1-\gamma) \Delta t a_i + \gamma \Delta t a_{i+1}
Parameters
----------
a : T
Current acceleration
v_old : T
Previous velocity
a_old: T
Previous acceleration
Returns
-------
T
The current velocity
"""
dt = self.parameters["dt"].to_base_units()
return v_old + (1 - self._gamma) * dt * a_old + self._gamma * dt * a
[docs]
def a(
self,
u: T,
u_old: T,
v_old: T,
a_old: T,
) -> T:
r"""
Acceleration computed using the generalized
:math:`alpha`-method
.. math::
a_{i+1} = \frac{u_{i+1} - (u_i + \Delta t v_i +
(0.5 - \beta) \Delta t^2 a_i)}{\beta \Delta t^2}
Parameters
----------
u : T
Current displacement
u_old : T
Previous displacement
v_old : T
Previous velocity
a_old: T
Previous acceleration
Returns
-------
T
The current acceleration
"""
dt = self.parameters["dt"].to_base_units()
dt2 = dt**2
beta = self._beta
return (u - (u_old + dt * v_old + (0.5 - beta) * dt2 * a_old)) / (beta * dt2)
[docs]
def update_fields(self) -> None:
"""Update old values of displacement, velocity
and acceleration
"""
u = self.u.x.array.copy()
u_old = self.u_old.x.array.copy()
v_old = self.v_old.x.array.copy()
a_old = self.a_old.x.array.copy()
a = self.a(
u=u,
u_old=u_old,
v_old=v_old,
a_old=a_old,
)
v = self.v(a=a, v_old=v_old, a_old=a_old)
self.a_old.x.array[:] = a
self.v_old.x.array[:] = v
self.u_old.x.array[:] = u
# self.update_base_values()
for i in range(self.num_cavity_pressure_states):
self.cavity_pressures_old[i].x.array[:] = self.cavity_pressures[i].x.array.copy()
@property
def _gamma(self) -> float:
"""Parameter in the generalized alpha-method"""
alpha_m = self.parameters["alpha_m"]
alpha_f = self.parameters["alpha_f"]
assert isinstance(alpha_m, float)
assert isinstance(alpha_f, float)
return 0.5 + alpha_f - alpha_m
@property
def _beta(self) -> float:
"""Parameter in the generalized alpha-method"""
return (self._gamma + 0.5) ** 2 / 4.0