Stimulation with Purkinje system#
In this demo we show how to simulate endocardial stimulation using a Purkinje system for activation.
We use the package fractal_tree to generate the Purkinje system. First let import the neccessary packages
from collections import defaultdict
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import gmsh # noqa: F401
import dolfin
import pyvista
import meshio
import cardiac_geometries
import beat
import beat.viz
import gotranx
from fractal_tree import generate_fractal_tree, FractalTreeParameters, Mesh
We create a function that generates the BiV geometry that we want to use for this demo.
def get_data(datadir="data_endocardial_stimulation"):
datadir = Path(datadir)
msh_file = datadir / "biv_ellipsoid.msh"
if not msh_file.is_file():
cardiac_geometries.create_biv_ellipsoid(
datadir,
char_length=0.15, # Reduce this value to get a finer mesh
center_lv_y=0.2,
center_lv_z=0.0,
a_endo_lv=5.0,
b_endo_lv=2.2,
c_endo_lv=2.2,
a_epi_lv=6.0,
b_epi_lv=3.0,
c_epi_lv=3.0,
center_rv_y=1.0,
center_rv_z=0.0,
a_endo_rv=6.0,
b_endo_rv=2.5,
c_endo_rv=2.7,
a_epi_rv=8.0,
b_epi_rv=5.5,
c_epi_rv=4.0,
create_fibers=True,
)
return cardiac_geometries.geometry.Geometry.from_folder(datadir)
We have a function that can load the solution from a file.
def load_from_file(heart_mesh, xdmffile, key="v", stop_index=None):
V = dolfin.FunctionSpace(heart_mesh, "Lagrange", 1)
v = dolfin.Function(V)
timesteps = beat.postprocess.load_timesteps_from_xdmf(xdmffile)
with dolfin.XDMFFile(Path(xdmffile).as_posix()) as f:
for i, t in timesteps.items():
f.read_checkpoint(v, key, i)
yield v.copy(deepcopy=True), t
and we have a function to compute the pseudo ECG
def compute_ecg_recovery():
datadir = Path("data_endocardial_stimulation")
xdmffile = datadir / "state.xdmf"
data = get_data(datadir=datadir)
# https://litfl.com/ecg-lead-positioning/
vs = load_from_file(geo.mesh, xdmffile, key="V")
leads = dict(
RA=(-15.0, 0.0, -10.0),
LA=(4.0, -12.0, -7.0),
RL=(0.0, 20.0, 3.0),
LL=(17.0, 11.0, 7.0),
V1=(-3.0, 4.0, -9.0),
V2=(0.0, 2.0, -8.0),
V3=(3.0, 1.0, -8.0),
V4=(6.0, 1.0, -6.0),
V5=(10.0, 2.0, 0.0),
V6=(10.0, -6.0, 2.0),
)
fname = datadir / "extracellular_potential.npy"
if 1: # not fname.is_file():
phie = defaultdict(list)
time = []
for v, t in vs:
time.append(t)
for name, point in leads.items():
phie[name].append(
beat.ecg.ecg_recovery(v=v, mesh=geo.mesh, sigma_b=1.0, point=point)
)
np.save(fname, {"phie": phie, "time": time})
phie_time = np.load(fname, allow_pickle=True).item()
phie = phie_time["phie"]
time = phie_time["time"]
fig, ax = plt.subplots(2, 5, sharex=True, figsize=(12, 8))
for i, (name, values) in enumerate(phie.items()):
axi = ax.flatten()[i]
axi.plot(time, values)
axi.set_title(name)
fig.savefig(datadir / "extracellular_potential.png")
ecg = beat.ecg.Leads12(**{k: np.array(v) for k, v in phie.items()})
fig, ax = plt.subplots(3, 4, sharex=True, figsize=(12, 8))
for i, name in enumerate(
[
"I",
"II",
"III",
"aVR",
"aVL",
"aVF",
"V1_",
"V2_",
"V3_",
"V4_",
"V5_",
"V6_",
]
):
y = getattr(ecg, name)
axi = ax.flatten()[i]
axi.plot(time, y)
axi.set_title(name)
fig.savefig(datadir / "ecg_12_leads.png")
First let use use gotranx to generate code for the ORd model.
here = Path.cwd()
model_path = here / "ORdmm_Land.py"
if not model_path.is_file():
ode = gotranx.load_ode(here / ".." / "odes" / "ORdmm_Land.ode")
code = gotranx.cli.gotran2py.get_code(
ode, scheme=[gotranx.schemes.Scheme.forward_generalized_rush_larsen]
)
model_path.write_text(code)
import ORdmm_Land
model = ORdmm_Land.__dict__
Next we generate the geometry that we will use for this demo and save it to the folder data_endocardial_stimulation
.
datadir = here / "data_endocardial_stimulation"
geo = get_data(datadir=datadir)
We can now generate the Purkinje system for the LV. We start by loading the mesh that is generated by GMSH
and extracting the endocardium of the LV. We first read the gmsh file
msh_file = datadir / "biv_ellipsoid.msh"
We also load file with meshio
msh = meshio.read(msh_file)
We can take a short look at the geometry in pyvista
V = dolfin.FunctionSpace(geo.mesh, "Lagrange", 1)
# pyvista.start_xvfb()
# plotter_markers = pyvista.Plotter()
# topology, cell_types, x = beat.viz.create_vtk_structures(V)
# grid = pyvista.UnstructuredGrid(topology, cell_types, x)
# plotter_markers.add_mesh(grid, show_edges=True)
# plotter_markers.view_zy()
# plotter_markers.camera.zoom(4)
# if not pyvista.OFF_SCREEN:
# plotter_markers.show()
# else:
# figure = plotter_markers.screenshot("biv_mesh_purkinje.png")
Next we extract the coordinates of the mesh
verts = geo.mesh.coordinates()
and then we extract the cells for the LV endocardium
tag = msh.field_data["ENDO_LV"][0]
inds = [i for i, x in enumerate(msh.cell_data["gmsh:physical"]) if x[0] == tag]
connectivity = np.vstack([msh.cells[i].data for i in inds])
To generate the purkinje tree we must first define the initial node for the purkinje system in the LV. Here we have uses paraview to an approximate coordinate for this which is located near the base on the endocardium.
init_node = [0, 2.4, 0.19]
Now that we have the approximate coordinate, we need to find the closest node in the mesh. We can do this by selecting the point with the shortest distance to this node.
index = np.linalg.norm(np.subtract(verts, init_node), axis=1).argmin()
# We create the mesh, and pass in the initial node
lv_mesh = Mesh(verts=verts, connectivity=connectivity, init_node=verts[index, :])
We also set the number of generations to 15 and we set the initial direction to point in the positive \(x\)-direction which is the direction from the base towards the apex (this can also be found by visualizing the mesh in Paraview). We also specify the initial length to be 3. This is about the length of the septal wall, so that the first branch will represent the His bundle. We set the length to 0.25 which will be the length each successive branch.
First we set a seed so that we know that the results are reproducible
np.random.seed(1)
Assume conduction velocty of 3 m/s = 3 mm/ms
conduction_velocity = 3.0
duration = 2.0
start = 1.0
dofs = V.tabulate_dof_coordinates()
param = FractalTreeParameters(
filename=datadir / "lv_tree",
init_length=7.0,
N_it=15,
length=0.5,
initial_direction=np.array([1, 0, 0]),
)
# Next we create the Purkinje networks for the LV
lv_tree = generate_fractal_tree(lv_mesh, param)
# We can now compute the distance from the root node to each node in the tree
dist_lv = np.zeros(lv_tree.nodes.shape[0])
for line in lv_tree.lines:
dist_lv[line[1]] += dist_lv[line[0]] + np.linalg.norm(
lv_tree.nodes[line[1], :] - lv_tree.nodes[line[0], :]
)
# Then we extract the end nodes of the tree and compute the distance from the root
lv_end_nodes = np.array(lv_tree.end_nodes, dtype=int)
lv_end_nodes_pos = lv_tree.nodes[lv_end_nodes]
lv_end_nodes_dist = dist_lv[lv_end_nodes]
# We can now compute the delay for each end node given the conduction velocity
delay_lv = lv_end_nodes_dist / conduction_velocity
# Finally, we create the expression for the stimulation protocol
delay_expr_lv = []
for pos, d in zip(lv_end_nodes_pos, delay_lv):
closest_dof = np.linalg.norm(dofs - pos, axis=1).argmin()
delay_expr_lv.append(
f"(time > {start + d} && time < {start + d + duration} && near(x[0], {dofs[closest_dof, 0]:.2f}, tol) && near(x[1], {dofs[closest_dof, 1]:.2f}, tol) && near(x[2], {dofs[closest_dof, 2]:.2f}, tol))"
)
# Repeat for RV
tag = msh.field_data["ENDO_RV"][0]
inds = [i for i, x in enumerate(msh.cell_data["gmsh:physical"]) if x[0] == tag]
connectivity = np.vstack([msh.cells[i].data for i in inds])
# and defining the initial node (again, we use Paraview to find this location).
init_node = [0, 3.19, 0.1]
index = np.linalg.norm(np.subtract(verts, init_node), axis=1).argmin()
rv_mesh = Mesh(verts=verts, connectivity=connectivity, init_node=verts[index, :])
# Set a new seed (note that if you don't like the generated system you can try a different seed)
0%| | 0/15 [00:00<?, ?it/s]
13%|█▎ | 2/15 [00:00<00:00, 16.16it/s]
27%|██▋ | 4/15 [00:00<00:00, 15.11it/s]
40%|████ | 6/15 [00:00<00:01, 8.79it/s]
53%|█████▎ | 8/15 [00:01<00:01, 5.28it/s]
60%|██████ | 9/15 [00:01<00:01, 4.03it/s]
67%|██████▋ | 10/15 [00:02<00:01, 2.75it/s]
73%|███████▎ | 11/15 [00:03<00:02, 1.88it/s]
80%|████████ | 12/15 [00:04<00:02, 1.39it/s]
87%|████████▋ | 13/15 [00:05<00:01, 1.23it/s]
93%|█████████▎| 14/15 [00:06<00:00, 1.14it/s]
100%|██████████| 15/15 [00:07<00:00, 1.05it/s]
100%|██████████| 15/15 [00:07<00:00, 1.90it/s]
np.random.seed(1234)
param = FractalTreeParameters(
filename=datadir / "rv_tree",
init_length=9.7,
N_it=15,
length=0.5,
initial_direction=np.array([1, 0, 0]),
)
rv_tree = generate_fractal_tree(rv_mesh, param)
0%| | 0/15 [00:00<?, ?it/s]
20%|██ | 3/15 [00:00<00:00, 14.77it/s]
33%|███▎ | 5/15 [00:00<00:01, 7.04it/s]
40%|████ | 6/15 [00:01<00:02, 4.48it/s]
47%|████▋ | 7/15 [00:01<00:02, 3.04it/s]
53%|█████▎ | 8/15 [00:02<00:03, 2.12it/s]
60%|██████ | 9/15 [00:03<00:04, 1.48it/s]
67%|██████▋ | 10/15 [00:05<00:04, 1.02it/s]
73%|███████▎ | 11/15 [00:07<00:05, 1.26s/it]
80%|████████ | 12/15 [00:09<00:04, 1.56s/it]
87%|████████▋ | 13/15 [00:12<00:03, 1.97s/it]
93%|█████████▎| 14/15 [00:15<00:02, 2.27s/it]
100%|██████████| 15/15 [00:17<00:00, 2.26s/it]
100%|██████████| 15/15 [00:17<00:00, 1.19s/it]
pyvista.start_xvfb()
lv_tree_vtu = pyvista.read((datadir / "lv_tree").with_suffix(".vtu"))
rv_tree_vtu = pyvista.read((datadir / "rv_tree").with_suffix(".vtu"))
plotter = pyvista.Plotter()
topology, cell_types, x = beat.viz.create_vtk_structures(V)
grid = pyvista.UnstructuredGrid(topology, cell_types, x)
plotter.add_mesh(grid, show_edges=True)
plotter.view_zy()
# plotter_markers.camera.zoom(4)
plotter.add_mesh(lv_tree_vtu)
plotter.add_mesh(rv_tree_vtu)
Actor (0x7f2c08c95a80)
Center: (2.9978950023651123, 3.74230033159256, 0.12935316562652588)
Pickable: True
Position: (0.0, 0.0, 0.0)
Scale: (1.0, 1.0, 1.0)
Visible: True
X Bounds 0.000E+00, 5.996E+00
Y Bounds 1.838E+00, 5.647E+00
Z Bounds -2.441E+00, 2.699E+00
User matrix: Identity
Has mapper: True
Property (0x7f2c08c95d80)
Ambient: 0.0
Ambient color: Color(name='lightblue', hex='#add8e6ff', opacity=255)
Anisotropy: 0.0
Color: Color(name='lightblue', hex='#add8e6ff', opacity=255)
Culling: "none"
Diffuse: 1.0
Diffuse color: Color(name='lightblue', hex='#add8e6ff', opacity=255)
Edge color: Color(name='black', hex='#000000ff', opacity=255)
Edge opacity: 1.0
Interpolation: 0
Lighting: True
Line width: 1.0
Metallic: 0.0
Opacity: 1.0
Point size: 5.0
Render lines as tubes: False
Render points as spheres: False
Roughness: 0.5
Show edges: False
Specular: 0.0
Specular color: Color(name='lightblue', hex='#add8e6ff', opacity=255)
Specular power: 100.0
Style: "Surface"
DataSetMapper (0x7f2c08c95900)
Scalar visibility: False
Scalar range: (0.0, 1.0)
Interpolate before mapping: True
Scalar map mode: default
Color mode: direct
Attached dataset:
UnstructuredGrid (0x7f2c08c87e20)
N Cells: 21529
N Points: 21530
X Bounds: 0.000e+00, 5.996e+00
Y Bounds: 1.838e+00, 5.647e+00
Z Bounds: -2.441e+00, 2.699e+00
N Arrays: 0
if not pyvista.OFF_SCREEN:
plotter.show()
else:
figure = plotter.screenshot("purkinje.png")
# -
We can now compute the distance from the root node to each node in the tree
dist_rv = np.zeros(rv_tree.nodes.shape[0])
for line in rv_tree.lines:
dist_rv[line[1]] += dist_rv[line[0]] + np.linalg.norm(
rv_tree.nodes[line[1], :] - rv_tree.nodes[line[0], :]
)
rv_end_nodes = np.array(rv_tree.end_nodes, dtype=int)
rv_end_nodes_pos = rv_tree.nodes[rv_end_nodes]
rv_end_nodes_dist = dist_rv[rv_end_nodes]
delay_rv = rv_end_nodes_dist / conduction_velocity
delay_expr_rv = []
for pos, d in zip(rv_end_nodes_pos, delay_rv):
closest_dof = np.linalg.norm(dofs - pos, axis=1).argmin()
delay_expr_rv.append(
f"(time > {start + d} && time < {start + d + duration} && near(x[0], {dofs[closest_dof, 0]:.2f}, tol) && near(x[1], {dofs[closest_dof, 1]:.2f}, tol) && near(x[2], {dofs[closest_dof, 2]:.2f}, tol))"
)
expr = (
f"time < {start + max(delay_lv.max(), delay_rv.max()) + duration} && ("
+ " || ".join(delay_expr_lv + delay_expr_rv)
+ ") ? stim_amp : 0.0"
)
mesh_unit = "mm"
time = dolfin.Constant(0.0)
stim_expr = dolfin.Expression(
expr,
time=time,
tol=0.05,
stim_amp=0.01,
degree=1,
)
markers = beat.utils.expand_layer_biv(
V=V,
mfun=geo.ffun,
endo_lv_marker=geo.markers["ENDO_LV"][0],
endo_rv_marker=geo.markers["ENDO_RV"][0],
epi_marker=geo.markers["EPI"][0],
endo_size=0.3,
epi_size=0.3,
)
init_states = {
0: model["init_state_values"](),
1: model["init_state_values"](),
2: model["init_state_values"](),
}
# endo = 0, epi = 1, M = 2
parameters = {
0: model["init_parameter_values"](amp=0.0, celltype=2),
1: model["init_parameter_values"](amp=0.0, celltype=0),
2: model["init_parameter_values"](amp=0.0, celltype=1),
}
fun = {
0: model["forward_generalized_rush_larsen"],
1: model["forward_generalized_rush_larsen"],
2: model["forward_generalized_rush_larsen"],
}
v_index = {
0: model["state_index"]("v"),
1: model["state_index"]("v"),
2: model["state_index"]("v"),
}
# Surface to volume ratio
chi = 140.0 * beat.units.ureg("mm**-1")
# Membrane capacitance
C_m = 0.01 * beat.units.ureg("uF/mm**2")
I_s = beat.stimulation.Stimulus(dolfin.dP(domain=geo.mesh), expr=stim_expr)
M = beat.conductivities.define_conductivity_tensor(
chi=chi, f0=geo.f0, g_il=0.17, g_it=0.019, g_el=0.62, g_et=0.24
)
params = {"preconditioner": "sor", "use_custom_preconditioner": False}
pde = beat.MonodomainModel(
time=time,
C_m=C_m.to(f"uF/{mesh_unit}**2").magnitude,
mesh=geo.mesh,
M=M,
I_s=I_s,
params=params,
)
ode = beat.odesolver.DolfinMultiODESolver(
v_ode=dolfin.Function(V),
v_pde=pde.state,
markers=markers,
num_states={i: len(s) for i, s in init_states.items()},
fun=fun,
init_states=init_states,
parameters=parameters,
v_index=v_index,
)
# +
T = 5
# Change to 500 to simulate the full cardiac cycle
t = 0.0
dt = 0.05
solver = beat.MonodomainSplittingSolver(pde=pde, ode=ode)
plotter_voltage = pyvista.Plotter()
plotter_voltage.view_zy()
viridis = plt.get_cmap("viridis")
grid.point_data["V"] = solver.pde.state.vector().get_local()
grid.set_active_scalars("V")
renderer = plotter_voltage.add_mesh(
grid,
show_edges=True,
lighting=False,
cmap=viridis,
clim=[-90.0, 40.0],
)
gif_file = Path("purkinje.gif")
gif_file.unlink(missing_ok=True)
plotter_voltage.open_gif(gif_file.as_posix())
plotter_voltage.camera.zoom("tight")
fname = (datadir / "state.xdmf").as_posix()
i = 0
while t < T + 1e-12:
if i % 20 == 0:
v = solver.pde.state.vector().get_local()
print(f"Solve for {t=:.2f}, {v.max() =}, {v.min() = }")
with dolfin.XDMFFile(dolfin.MPI.comm_world, fname) as xdmf:
xdmf.parameters["functions_share_mesh"] = True
xdmf.parameters["rewrite_function_mesh"] = False
xdmf.write_checkpoint(
solver.pde.state,
"V",
float(t),
dolfin.XDMFFile.Encoding.HDF5,
True,
)
grid.point_data["V"] = solver.pde.state.vector().get_local()
plotter_voltage.write_frame()
solver.step((t, t + dt))
i += 1
t += dt
plotter_voltage.close()
# # -
compute_ecg_recovery()
Solve for t=0.00, v.max() =-86.99999999999982, v.min() = -87.00000000000013
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solve for t=1.00, v.max() =-87.20069609464933, v.min() = -87.22531459848575
Solve for t=2.00, v.max() =-87.36449135432629, v.min() = -87.39787543121035
Solve for t=3.00, v.max() =-11.523230172621734, v.min() = -90.22536913049055
Solve for t=4.00, v.max() =61.34288137672245, v.min() = -91.43259453808955
Solve for t=5.00, v.max() =115.11440022390119, v.min() = -97.23138857358286

