Source code for fenicsx_pulse.utils
import dolfinx
import numpy as np
import numpy.typing as npt
[docs]
def vertex_to_dofmap(V: dolfinx.fem.FunctionSpace) -> npt.NDArray[np.int32]:
"""Compute a mapping from vertices to dofs in a function space
Parameters
----------
V : dolfinx.fem.FunctionSpace
The function space
Returns
-------
npt.NDArray[np.int32]
The mapping from vertices to dofs
"""
mesh = V.mesh
num_vertices_per_cell = dolfinx.cpp.mesh.cell_num_entities(mesh.topology.cell_type, 0)
dof_layout = np.empty((num_vertices_per_cell,), dtype=np.int32)
for i in range(num_vertices_per_cell):
var = V.dofmap.dof_layout.entity_dofs(0, i)
assert len(var) == 1
dof_layout[i] = var[0]
num_vertices = mesh.topology.index_map(0).size_local + mesh.topology.index_map(0).num_ghosts
c_to_v = mesh.topology.connectivity(mesh.topology.dim, 0)
assert (c_to_v.offsets[1:] - c_to_v.offsets[:-1] == c_to_v.offsets[1]).all(), (
"Single cell type supported"
)
vertex_to_dof_map = np.empty(num_vertices, dtype=np.int32)
vertex_to_dof_map[c_to_v.array] = V.dofmap.list[:, dof_layout].reshape(-1)
return vertex_to_dof_map
[docs]
def unroll_dofmap(dofs: npt.NDArray[np.int32], bs: int) -> npt.NDArray[np.int32]:
"""
Given a two-dimensional dofmap of size `(num_cells, num_dofs_per_cell)`
Expand the dofmap by its block size such that the resulting array
is of size `(num_cells, bs*num_dofs_per_cell)`
"""
num_cells, num_dofs_per_cell = dofs.shape
unrolled_dofmap = np.repeat(dofs, bs).reshape(num_cells, num_dofs_per_cell * bs) * bs
unrolled_dofmap += np.tile(np.arange(bs), num_dofs_per_cell)
return unrolled_dofmap
[docs]
def evaluate_at_vertex_tag(
u: dolfinx.fem.Function,
vt: dolfinx.mesh.MeshTags,
tag: int,
) -> npt.NDArray[np.int32]:
"""
Given a function `u` and a vertex tag `vt` return the values of `u` at the vertices
tagged with `tag`
Parameters
----------
u: dolfinx.fem.Function
The function to evaluate
vt: dolfinx.mesh.MeshTags
The vertex tags
tag: int
The tag to evaluate at
Returns
-------
npt.NDArray[np.int32]
The values of `u` at the vertices tagged with `tag`
"""
v2d = vertex_to_dofmap(u.function_space)
block_index = v2d[vt.find(tag)]
dofs = unroll_dofmap(block_index.reshape(-1, 1), u.function_space.dofmap.bs)
return u.x.array[dofs]
[docs]
def gather_broadcast_array(comm, local_array):
"""
Collects local arrays from all processes on the root process
and distributes the global array to all processes.
Assumes that the local arrays are either the same of empty / None on all processes.
Parameters
----------
comm: MPI.Comm
The MPI communicator
local_array: np.ndarray
The local array
Returns
-------
np.ndarray
The global array on the root process
"""
all_arrays = comm.gather(local_array, root=0)
if comm.rank == 0:
nonempty_arrays = [array for array in all_arrays if array is not None and len(array) > 0]
# Check that all nonempty arrays are the same
if len(nonempty_arrays) == 0:
global_array = np.array([])
elif len(nonempty_arrays) == 1:
global_array = nonempty_arrays[0]
else:
all_same = True
for array in nonempty_arrays[1:]:
all_same = all_same and np.allclose(array, nonempty_arrays[0])
if not all_same:
raise ValueError("Local arrays are not the same", nonempty_arrays)
global_array = nonempty_arrays[0]
else:
global_array = np.array([])
return comm.bcast(global_array, root=0)