Post-processing and Visualization#

This demo illustrates how to visualize the results of a cardiac simulation using pyvista, matplotlib, and adios4dolfinx.

We will:

  1. Load the mesh and simulation data (checkpoints) generated by the simulation demo.

  2. Visualization: Create 3D animations of the displacement, fiber stress, and fiber strain using PyVista.

  3. Regional Analysis: Compute average stress and strain in the 17-segment AHA model over time.

  4. Plotting: Generate time-trace plots of these regional metrics.

Requirements:

  • pyvista, matplotlib, adios4dolfinx

  • fenicsx-ldrb (for AHA segment generation/handling)

  • cardiac-geometries

  • imagemagick (optional, for creating GIFs/MP4s)


1. Imports and Setup#

from pathlib import Path
from enum import Enum
import shutil
import subprocess as sp
import json
from mpi4py import MPI
import dolfinx
import ufl
import pyvista as pv
import matplotlib.pyplot as plt
import numpy as np
import adios4dolfinx
import ldrb
import cardiac_geometries

Paths must match the output of the simulation demo Changing to ‘results_biv_complete’ to match the previous demo script

result_folder = Path("results_biv_complete")
screenshot_dir = Path("screenshots_biv_complete")
screenshot_dir.mkdir(exist_ok=True)
checkpoint_filename = result_folder / "function_checkpoint.bp"

2. Load Mesh and Timestamps#

We read the mesh and the list of available time steps from the checkpoint file.

mesh = adios4dolfinx.read_mesh(
    checkpoint_filename,
    comm=comm,
)
timestamps = adios4dolfinx.read_timestamps(
    checkpoint_filename,
    comm=comm,
    function_name="displacement",
)

Option to skip initial frames if you want to focus e.g on the last beat

start_time = 0.0
try:
    start_index = np.where(timestamps >= start_time)[0][0]
except IndexError:
    start_index = 0
timestamps = timestamps[start_index:]

Downsample frames for faster processing (plot every Nth frame)

N = 5
print(f"Found {len(timestamps)} time steps. Processing every {N}th step.")

3. Prepare Function Spaces#

  • W: Vector space for displacement (P2).

  • V: Scalar space for stress/strain (DG1).

4. ImageMagick Helpers (Animations)#

We would like to create a gif or movie from the screenshots with the two plots side by side. This can be done with external tools like ImageMagick and/or ffmpeg. We will first combine each plot into a single image side by side, and then create a mp4 from the combined images. Note that to combine images side by side, we can use ImageMagick which can be installed via package managers like brew on macOS or apt on Ubuntu. To create a mp4 from images, we can still use ImageMagick but it requires ffmpeg to be installed as well. First lets define some helper functions to do this.

class Direction(Enum):
    HORIZONTAL = "horizontal"
    VERTICAL = "vertical"
def magick_cmd():
    """Find the ImageMagick executable."""
    magick = shutil.which("magick")
    if magick is None:
        magick = shutil.which("convert")
    return magick
def combine_screenshots(screenshot_files: list[str], output_file: str, direction: Direction):
    """Combine images side-by-side or vertically."""
    cmd = magick_cmd()
    if cmd:
        append = "+append" if direction == Direction.HORIZONTAL else "-append"
        sp.run([cmd] + list(map(str, screenshot_files)) + [append, str(output_file)])
def create_gif(screenshot_files: list[str], output_file: str, delay: int = 5):
    """Create an animated GIF/MP4."""
    cmd = magick_cmd()
    if cmd:
        sp.run([cmd, "-delay", str(delay), "-loop", "0"] + list(map(str, screenshot_files)) + [str(output_file)])

5. Visualize Displacement#

We generate frames of the beating heart from two angles (Side and Top).

plotter = pv.Plotter(off_screen=True)
topology, cells, geometry = dolfinx.plot.vtk_mesh(W)
grid = pv.UnstructuredGrid(topology, cells, geometry)
values = np.zeros((geometry.shape[0], 3))
values[:, : len(u)] = u.x.array.reshape(geometry.shape[0], len(u))
grid["u"] = values
grid.set_active_vectors("u")
warped = grid.warp_by_vector("u", factor=1.0)
plotter.add_mesh_clip_plane(
    warped,
    show_edges=False,
    color="white",
    normal=(0.43382957710452097, 0.8998903772243152, 0.044600526996802344),
    origin=(-0.010140199723173711, -0.004698830488865986, -0.000526727463371799),
)
widget = plotter.plane_widgets[0]
widget.SetEnabled(not widget.GetEnabled())
outdir_u = screenshot_dir / "u"
outdir_u.mkdir(exist_ok=True, parents=True)
plotter.camera_position = [
    (-0.013579771455618183, -0.17527467868587382, -0.0160491475687889),
    (-0.013977611437439919, -0.001986214891076088, -0.002878248691558838),
    (0.9998162418488605, 0.0037246515399841644, -0.018804507601279535)
]

One thing to note here is how to find good camera positions for the plots as well as good clipping planes. This can be a bit of trial and error, but PyVista has an interactive plotting mode that can help with this. You can run plotter.show() in an interactive environment such as a Jupyter notebook or Python script to explore the geometry and find good views. Here is a short video showing how to do this:

for i, t in enumerate(timestamps[::N]):
    adios4dolfinx.read_function(checkpoint_filename, u, time=t, name="displacement")
    grid["u"][:, : len(u)] = u.x.array.reshape(geometry.shape[0], len(u))
    warped_n = grid.warp_by_vector(factor=1)
    warped.points[:, :] = warped_n.points
    plotter.add_text(f"Time: {t:.2f} s", name="timer")
    plotter.screenshot(outdir_u / f"side_u_sim_warped_{i}.png")
plotter = pv.Plotter(off_screen=True)
plotter.add_mesh(warped, color="white", show_edges=True)
plotter.camera_position = [
    (0.20028045308219225, -0.037228043080557334, -0.003401786822110152),
    (-0.013977611199999481, -6.356105320067668e-06, -0.002878247674289465),
    (0.16197623979674286, 0.9367431745355433, -0.3102836165548876)
]
for i, t in enumerate(timestamps[::N]):
    adios4dolfinx.read_function(checkpoint_filename, u, time=t, name="displacement")
    grid["u"][:, : len(u)] = u.x.array.reshape(geometry.shape[0], len(u))
    warped_n = grid.warp_by_vector(factor=1)
    warped.points[:, :] = warped_n.points
    plotter.add_text(f"Time: {t:.2f} s", name="timer")
    plotter.screenshot(outdir_u / f"top_u_sim_warped_{i}.png")

Create Animation

if magick_cmd():
    side_screenshots = [outdir_u / f"side_u_sim_warped_{i}.png" for i in range(len(timestamps[::N]))]
    top_screenshots = [outdir_u / f"top_u_sim_warped_{i}.png" for i in range(len(timestamps[::N]))]
    combined_screenshots = []
    for i, (side, top) in enumerate(zip(side_screenshots, top_screenshots)):
        combined = outdir_u / f"combined_u_sim_warped_{i}.png"
        combine_screenshots([side, top], str(combined), Direction.HORIZONTAL)
        combined_screenshots.append(str(combined))

    create_gif(combined_screenshots, str(screenshot_dir / "u_simulation_warped.mp4"), delay=5)

The resulting video should look something like this:

6. Regional Analysis (AHA Segments)#

We perform a regional analysis by computing the average fiber stress and strain in each of the 17 AHA segments.

  1. Generate Segments: We use ldrb to define the segments based on the geometry.

  2. Integrate: We define integration measures dx_aha(i) for each segment.

  3. Trace: We loop over time, compute the average, and store the history.

Need geometry markers to generate AHA (LV/RV/Epi etc)#

These should be in the geometry folder#

markers_file = result_folder / "geometry" / "markers.json"
markers = json.loads(markers_file.read_text())

Note: We need a mesh tags object ‘ffun’ describing surface markers. This was saved during the simulation setup.

ffun = adios4dolfinx.read_meshtags(checkpoint_filename, mesh, "ffun")
# Generate AHA Segments Function
aha_func = ldrb.aha.gernerate_aha_biv(
    mesh=mesh,
    ffun=ffun,
    markers=cardiac_geometries.mesh.transform_markers(markers, clipped=True),
    function_space="DG_0",
    base_max=0.75,
    mid_base=0.70,
    apex_mid=0.65,
)
# Convert to MeshTags for integration
entities = np.arange(mesh.topology.index_map(3).size_local, dtype=np.int32)
values = aha_func.x.array.astype(np.int32)
num_segments = len(np.unique(values))
aha_tags = dolfinx.mesh.meshtags(mesh, 3, entities, values)

We can also plot the segments to verify

plotter = pv.Plotter(off_screen=True)
plotter.add_mesh_clip_plane(
    grid,
    show_edges=True,
    cmap="tab20",
    n_colors=num_segments,
    normal=(0.43382957710452097, 0.8998903772243152, 0.044600526996802344),
    origin=(-0.010140199723173711, -0.004698830488865986, -0.000526727463371799),
)
widget = plotter.plane_widgets[0]
widget.SetEnabled(not widget.GetEnabled())
plotter.camera_position = [
    (-0.013579771455618183, -0.17527467868587382, -0.0160491475687889),
    (-0.013977611437439919, -0.001986214891076088, -0.002878248691558838),
    (0.9998162418488605, 0.0037246515399841644, -0.018804507601279535)
]
plotter.screenshot(screenshot_dir / "aha_slice.png")
plotter = pv.Plotter(off_screen=True)
plotter.add_mesh(grid, cmap="tab20", n_colors=num_segments, show_edges=True)
plotter.camera_position = [
    (0.20028045308219225, -0.037228043080557334, -0.003401786822110152),
    (-0.013977611199999481, -6.356105320067668e-06, -0.002878247674289465),
    (0.16197623979674286, 0.9367431745355433, -0.3102836165548876)
]
plotter.screenshot(screenshot_dir / "aha_top.png")

Create integration measure

dx_aha = ufl.Measure("dx", domain=mesh, subdomain_data=aha_tags)

7. Visualize Stress/Strain & Compute Averages#

fiber_stress_regions = {i: [] for i in range(num_segments)}
fiber_strain_regions = {i: [] for i in range(num_segments)}

Compute the forms for each AHA segment in advance

forms_regions = {i : dolfinx.fem.form(f * dx_aha(i)) for i in range(num_segments)}
volumes_regions = {i : dolfinx.fem.assemble_scalar(dolfinx.fem.form(dolfinx.fem.Constant(mesh, 1.0) * dx_aha(i))) for i in range(num_segments)}

Now we loop over stress and strain, generate plots, and compute regional averages. We store the regional averages in the dictionaries defined above for later plotting.

for fname, clim, region_dict in [("fiber_stress", (0, 60), fiber_stress_regions),
                    ("fiber_strain", (-0.3, 0.1), fiber_strain_regions)]:

    print(f"Processing {fname}...")
    cmap = plt.get_cmap("viridis")
    plotter = pv.Plotter(off_screen=True)
    topology, cells, geometry = dolfinx.plot.vtk_mesh(V)
    grid = pv.UnstructuredGrid(topology, cells, geometry)

    grid[fname] = f.x.array
    grid.set_active_scalars(fname)
    plotter.add_mesh_clip_plane(
        grid,
        cmap=cmap,
        clim=clim,
        normal=(0.43382957710452097, 0.8998903772243152, 0.044600526996802344),
        origin=(-0.010140199723173711, -0.004698830488865986, -0.000526727463371799),
    )
    widget = plotter.plane_widgets[0]
    widget.SetEnabled(not widget.GetEnabled())

    outdir = screenshot_dir / fname
    outdir.mkdir(exist_ok=True, parents=True)

    plotter.camera_position = [
        (-0.013579771455618183, -0.17527467868587382, -0.0160491475687889),
        (-0.013977611437439919, -0.001986214891076088, -0.002878248691558838),
        (0.9998162418488605, 0.0037246515399841644, -0.018804507601279535)
    ]

    for i, t in enumerate(timestamps[::N]):
        print(f"  Time {t:.2f} s")
        adios4dolfinx.read_function(checkpoint_filename, f, time=t, name=fname)

        # Compute regional averages
        for region in range(num_segments):
            integral = dolfinx.fem.assemble_scalar(forms_regions[region])
            avg_value = integral / volumes_regions[region] if volumes_regions[region] > 0 else 0.0
            region_dict[region].append(avg_value)

        grid[fname][:] = f.x.array
        plotter.add_text(f"Time: {t:.2f} s", name="timer")
        plotter.screenshot(outdir / f"side_{fname}_sim_{i}.png")

    # Save regional data so we don't have to recompute later
    np.save(screenshot_dir / f"{fname}_regions.npy", region_dict)

    plotter = pv.Plotter(off_screen=True)
    plotter.add_mesh(grid, cmap=cmap, clim=clim)
    plotter.camera_position = [
        (0.20028045308219225, -0.037228043080557334, -0.003401786822110152),
        (-0.013977611199999481, -6.356105320067668e-06, -0.002878247674289465),
        (0.16197623979674286, 0.9367431745355433, -0.3102836165548876)
    ]
    for i, t in enumerate(timestamps[::N]):
        adios4dolfinx.read_function(checkpoint_filename, f, time=t, name=fname)
        grid[fname][:] = f.x.array
        plotter.add_text(f"Time: {t:.2f} s", name="timer")
        plotter.screenshot(outdir / f"top_{fname}_sim_{i}.png")

    side_screenshots = [outdir / f"side_{fname}_sim_{i}.png" for i in range(len(timestamps[::N]))]
    top_screenshots = [outdir / f"top_{fname}_sim_{i}.png" for i in range(len(timestamps[::N]))]
    combined_screenshots = []
    for i, (side, top) in enumerate(zip(side_screenshots, top_screenshots)):
        combined_output = outdir / f"combined_{fname}_sim_{i}.png"
        combine_screenshots([side, top], str(combined_output), Direction.HORIZONTAL)
        combined_screenshots.append(str(combined_output))
    gif_output = outdir / f"{fname}_simulation.mp4"
    create_gif(combined_screenshots, str(gif_output), delay=1)

8. Plot Regional Fiber Stress/Strain#

fiber_strain_regions = np.load(screenshot_dir / "fiber_strain_regions.npy", allow_pickle=True).item()
fig, ax = plt.subplots(figsize=(10, 6))
for region in range(num_segments):
    ax.plot(np.array(timestamps[::N]), fiber_strain_regions[region], label=f"Segment {region}")
ax.set_title("Fiber Strain in AHA Segments")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Fiber Strain")
ax.grid()
ax.legend()
fig.savefig(screenshot_dir / "fiber_strain_regions.png")
fiber_stress_regions = np.load(screenshot_dir / "fiber_stress_regions.npy", allow_pickle=True).item()
fig, ax = plt.subplots(figsize=(10, 6))
for region in range(num_segments):
    ax.plot(np.array(timestamps[::N]), fiber_stress_regions[region], label=f"Segment {region}")
ax.set_title("Fiber Stress in AHA Segments")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Fiber Stress (kPa)")
ax.grid()
ax.legend()
fig.savefig(screenshot_dir / "fiber_stress_regions.png")

8. Plot Regional Traces#

Finally, we use matplotlib to plot the time evolution of stress/strain for the AHA segments.

if comm.rank == 0 and volumes_regions:
    print("Generating Regional Plots...")
    time_array = np.array(timestamps[::N])

    # Save raw data
    np.save(screenshot_dir / "regional_data.npy", data_history)

    for fname in data_history:
        fig, ax = plt.subplots(figsize=(12, 8))

        # Plot each segment
        for region, values in data_history[fname].items():
            ax.plot(time_array[:len(values)], values, label=f"Seg {region}")

        ax.set_title(f"Average {fname.replace('_', ' ').title()} by AHA Segment")
        ax.set_xlabel("Time [s]")
        ax.set_ylabel(f"{fname} {'[kPa]' if 'stress' in fname else '[-]'}")
        ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left', ncol=2)
        ax.grid(True)

        plt.tight_layout()
        plt.savefig(screenshot_dir / f"{fname}_regional_traces.png")
        plt.close(fig)
        print(f"Saved {fname}_regional_traces.png")