Source code for mumott.core.john_transform

from math import floor
from numba import njit, int32, float32, float64, prange, void
from numpy.typing import NDArray
import numpy as np


[docs]def john_transform( field: NDArray[float], projections: NDArray[float], unit_vector_p: NDArray[float], unit_vector_j: NDArray[float], unit_vector_k: NDArray[float], offsets_j: NDArray[float], offsets_k: NDArray[float], float_type: str = 'float64') -> callable: r""" Frontend for performing the John transform with parallel CPU computing, using an algorithm akin to :func:`mumott.core.john_transform_cuda`. Parameters ---------- field The field to be projected, with 4 dimensions. The last index should have the same size as the last index of :attr:`projections`. projections A 4-dimensional numpy array where the projections are stored. The first index runs over the different projection directions. unit_vector_p The direction of projection in Cartesian coordinates. unit_vector_j One of the directions for the pixels of :attr:`projections``. unit_vector_k The other direction for the pixels of :attr:`projections`. offsets_j Offsets which align projections in the direction of `j` offsets_k Offsets which align projections in the direction of `k`. float_type Whether to use 'float64' (default) or 'float32'. The argument should be supplied as a string. The types of :attr:`field` and :attr:`projections` must match this type. Notes ----- The computation performed by this function may be written as .. math:: p(I, J, K)_i = \sum_{s=0}^{N-1} d \cdot \sum_{t=0}^3 t_w V_i(\lfloor \mathbf{r}_j + d s \cdot \mathbf{v} \rfloor + \mathbf{t}) where :math:`p(I, J, K)_i` is ``projection[I, J, K, i]``, :math:`V_i` is ``volume[..., i]``, and :math:`\mathbf{v}` is ``unit_vector_p[I]``. :math:`N` is the number of voxels in the maximal direction of ``unit_vector_p[I]``. :math:`d` is the step length, and is given by the norm of :math:`\Vert \mathbf{v}_I \Vert / \vert \max(\mathbf{v})) \vert`. :math:`t_w` and :math:`\mathbf{t}` are weights and index shifts, respectively. The latter are necessary to perform bilinear interpolation between the four closest voxels in the two directions orthogonal to the maximal direction of :math:`\mathbf{v}`. :math:`\mathbf{r}_j` is the starting position for the ray, and is given by .. math:: \mathbf{r}_j(J, K) = (J + 0.5 + o_J - 0.5 \cdot J_\text{max}) \cdot \mathbf{u} + \\ (K + 0.5 + o_K - 0.5 \cdot K_\text{max}) \cdot \mathbf{w} + \\ (\Delta_p) \cdot \mathbf{v} + 0.5 \cdot \mathbf{r}_\text{max} \\ where :math:`o_J` is ``offsets_j[I]``, :math:`o_K` is ``offsets_k[I]``, :math:`\mathbf{u}` is ``unit_vector_j[I]``, :math:`\mathbf{w}` is ``unit_vector_k[I]``, :math:`J_\text{max}` and :math:`K_\text{max}` are ``projections.shape[1]`` and ``projections.shape[2]`` respectively, and :math:`\mathbf{r}_\text{max}` is ``volume.shape[:3]``. :math:`\Delta_p` is an additional offset that places the starting position at the edge of the volume. """ # noqa if float_type == 'float64': numba_float = float64 elif float_type == 'float32': numba_float = float32 else: raise ValueError('float_type must be either "float64" or "float32", ' f'but a value of {float_type} was specified!') if str(field.dtype) != float_type: raise TypeError('The dtype of the argument "field" must be the same ' f'as the dtype specified by "float_type" ({float_type}), but ' f'field.dtype is {field.dtype}!') if str(projections.dtype) != float_type: raise TypeError('The dtype of the argument "projections" must be the same ' f'as the dtype specified by "float_type" ({float_type}), but ' f'projections.dtype is {projections.dtype}!') # Find zeroth, first and second directions for indexing. Zeroth is the maximal projection direction. # (x, y, z), (y, x, z), (z, x, y) direction_0_index = np.argmax(abs(unit_vector_p), axis=1).reshape(-1, 1).astype(np.int32) direction_1_index = 1 * (direction_0_index == 0).astype(np.int32).reshape(-1, 1).astype(np.int32) direction_2_index = (2 - (direction_0_index == 2)).astype(np.int32).reshape(-1, 1).astype(np.int32) # Step size for direction 1 and 2. step_sizes_1 = (np.take_along_axis(unit_vector_p, direction_1_index, 1) / np.take_along_axis(unit_vector_p, direction_0_index, 1)).astype(str(numba_float)).ravel() step_sizes_2 = (np.take_along_axis(unit_vector_p, direction_2_index, 1) / np.take_along_axis(unit_vector_p, direction_0_index, 1)).astype(str(numba_float)).ravel() # Shape in each of the three directions. dimensions_0 = np.array(field.shape, dtype=str(numba_float))[direction_0_index.ravel()] dimensions_1 = np.array(field.shape, dtype=str(numba_float))[direction_1_index.ravel()] dimensions_2 = np.array(field.shape, dtype=str(numba_float))[direction_2_index.ravel()] # Correction factor for length of line when taking a one-slice step. distance_multipliers = np.sqrt(1.0 + step_sizes_1 ** 2 + step_sizes_2 ** 2).astype(str(numba_float)) max_index = projections.shape[0] max_j = projections.shape[1] max_k = projections.shape[2] # CUDA chunking and memory size constants. channels = int(field.shape[-1]) # Indices to navigate each projection. s is the surface positioning. k_vectors = unit_vector_k.astype(str(numba_float)) j_vectors = unit_vector_j.astype(str(numba_float)) s_vectors = (unit_vector_k * (-0.5 * max_k + offsets_k.reshape(-1, 1)) + unit_vector_j * (-0.5 * max_j + offsets_j.reshape(-1, 1))).astype(str(numba_float)) # (0, 1, 2) if x main, (1, 0, 2) if y main, (2, 0, 1) if z main. direction_indices = np.stack((direction_0_index.ravel(), direction_1_index.ravel(), direction_2_index.ravel()), axis=1) # Bilinear interpolation over each slice. @njit(void(numba_float[:, :, :, ::1], int32, numba_float, numba_float, numba_float, int32[::1], numba_float[::1]), fastmath=True, nogil=True, cache=True) def bilinear_interpolation(field: NDArray[float], direction_0: int, r0: float, r1: float, r2: float, dimensions, accumulator: NDArray[float]): """ Kernel for bilinear interpolation. Replaces texture interpolation.""" if not ((0 <= r0 < dimensions[0]) and (-1 <= r1 < dimensions[1]) and (-1 <= r2 < dimensions[2])): return # At edges, use nearest-neighbor interpolation. r1_weight = int32(1 <= (int32(r1) + 1) < dimensions[1]) r2_weight = int32(1 <= (int32(r2) + 1) < dimensions[2]) r1_edge_weight = int32(int32(r1) == -1) r2_edge_weight = int32(int32(r2) == -1) weight_1 = numba_float((r1 - floor(r1)) * r1_weight) weight_2 = numba_float((r2 - floor(r2)) * r2_weight) t0 = (1 - weight_1) * (1 - weight_2) t1 = weight_1 * weight_2 t2 = (1 - weight_1) * weight_2 t3 = (1 - weight_2) * weight_1 if direction_0 == 0: x = int32(r0) y = int32(r1) + r1_edge_weight z = int32(r2) + r2_edge_weight for i in range(accumulator.size): accumulator[i] += field[x, y, z, i] * t0 accumulator[i] += field[x, y + r1_weight, z, i] * t3 accumulator[i] += field[x, y, z + r2_weight, i] * t2 accumulator[i] += field[x, y + r1_weight, z + r2_weight, i] * t1 elif direction_0 == 1: x = int32(r1) + r1_edge_weight y = int32(r0) z = int32(r2) + r2_edge_weight for i in range(accumulator.size): accumulator[i] += field[x, y, z, i] * t0 accumulator[i] += field[x + r1_weight, y, z, i] * t3 accumulator[i] += field[x, y, z + r2_weight, i] * t2 accumulator[i] += field[x + r1_weight, y, z + r2_weight, i] * t1 elif direction_0 == 2: x = int32(r1) + r1_edge_weight y = int32(r2) + r2_edge_weight z = int32(r0) for i in range(accumulator.size): accumulator[i] += field[x, y, z, i] * t0 accumulator[i] += field[x + r1_weight, y, z, i] * t3 accumulator[i] += field[x, y + r2_weight, z, i] * t2 accumulator[i] += field[x + r1_weight, y + r2_weight, z, i] * t1 @njit(void(numba_float[:, :, :, ::1], numba_float[:, :, :, ::1]), fastmath=True, nogil=True, parallel=True, cache=True) def john_transform_inner(field: NDArray[float], projection: NDArray[float]): """ Performs the John transform of a field. Relies on a large number of pre-defined constants outside the kernel body. """ for index in range(max_index): # Define compile-time constants. step_size_1 = step_sizes_1[index] step_size_2 = step_sizes_2[index] k_vectors_c = k_vectors[index] j_vectors_c = j_vectors[index] s_vectors_c = s_vectors[index] dimensions_0_c = dimensions_0[index] dimensions_1_c = dimensions_1[index] dimensions_2_c = dimensions_2[index] dimensions = np.empty(3, int32) dimensions[0] = dimensions_0_c dimensions[1] = dimensions_1_c dimensions[2] = dimensions_2_c direction_indices_c = direction_indices[index] distance_multiplier = distance_multipliers[index] for j in prange(max_j): accumulator = np.empty(channels, numba_float) # Could be chunked for very asymmetric samples. fj = numba_float(j) + 0.5 for k in range(max_k): for i in range(channels): accumulator[i] = 0. fk = numba_float(k) + 0.5 # Initial coordinates of projection. start_position_0 = (s_vectors_c[direction_indices_c[0]] + fj * j_vectors_c[direction_indices_c[0]] + fk * k_vectors_c[direction_indices_c[0]]) start_position_1 = (s_vectors_c[direction_indices_c[1]] + fj * j_vectors_c[direction_indices_c[1]] + fk * k_vectors_c[direction_indices_c[1]]) start_position_2 = (s_vectors_c[direction_indices_c[2]] + fj * j_vectors_c[direction_indices_c[2]] + fk * k_vectors_c[direction_indices_c[2]]) # Centering w.r.t volume. centering_step_1 = start_position_1 - step_size_1 * start_position_0 centering_step_2 = start_position_2 - step_size_2 * start_position_0 position_0 = numba_float(0) + 0.5 position_1 = step_size_1 * (numba_float(0) - 0.5 * dimensions[0] + 0.5) + \ centering_step_1 + 0.5 * dimensions[1] - 0.5 position_2 = step_size_2 * (numba_float(0) - 0.5 * dimensions[0] + 0.5) + \ centering_step_2 + 0.5 * dimensions[2] - 0.5 for i in range(dimensions[0]): bilinear_interpolation(field, direction_indices_c[0], position_0, position_1, position_2, dimensions, accumulator) position_0 += 1.0 position_1 += step_size_1 position_2 += step_size_2 for i in range(channels): projection[index, j, k, i] = accumulator[i] * distance_multiplier def john_transform_wrapper(field, projection): john_transform_inner(field, projection) return projection return john_transform_wrapper
[docs]def john_transform_adjoint(field: NDArray[float], projections: NDArray[float], unit_vector_p: NDArray[float], unit_vector_j: NDArray[float], unit_vector_k: NDArray[float], offsets_j: NDArray[float], offsets_k: NDArray[float], float_type: str = 'float64') -> callable: r""" Frontend for performing the adjoint of the John transform with parallel CPU computing, using an algorithm akin to :func:`mumott.core.john_transform_cuda`. Parameters ---------- field The field into which the adjoint is projected, with 4 dimensions. The last index should have the same size as the last index of :attr:`projections`. projections The projections from which the adjoint is calculated. The first index runs over the different projection directions. unit_vector_p The direction of projection in Cartesian coordinates. unit_vector_j One of the directions for the pixels of :attr:`projections`. unit_vector_k The other direction for the pixels of :attr:`projections`. offsets_j Offsets which align projections in the direction of `j` offsets_k Offsets which align projections in the direction of `k`. float_type Whether to use 'float64' (default) or 'float32'. The argument should be supplied as a string. The types of :attr:`field` and :attr:`projections` must match this type. Notes ----- The computation performed by this function may be written as .. math:: V_i(\mathbf{x}) = \sum_{s=0}^{N-1} \cdot \sum_{t = 0}^{4} t_w p_i(s, \mathbf{x} \cdot \mathbf{u} + \Delta_J + t_J, \mathbf{x} \cdot \mathbf{w} + \Delta_K + t_K) :math:`N` is the total number of projections. :math:`V_i` is ``volume[:, i]``, and :math:`\mathbf{v}` is ``projection_vector[s]``. :math:`\mathbf{u}` is ``unit_vector_j[s]``, :math:`\mathbf{w}` is ``unit_vector_k[s]``. :math:`\Delta_J` and :math:`\Delta_K` are additional offsets based on the unit vectors, shapes, and offsets, which align the centers of the projection and volume, so that the intersection of each ray is correctly computed. :math:`t_W` are weights for bilinear interpolation between the four pixels nearest to each ray; :math:`t_J` and :math:`t_K` are the index offsets necessary to perform this interpolation. """ # noqa if float_type == 'float64': numba_float = float64 elif float_type == 'float32': numba_float = float32 else: raise ValueError('float_type must be either "numba_float" or "float32", ' f'but a value of {float_type} was specified!') if str(field.dtype) != float_type: raise TypeError('The dtype of the argument "field" must be the same ' f'as the dtype specified by "float_type" ({float_type}), but ' f'field.dtype is {field.dtype}!') if str(projections.dtype) != float_type: raise TypeError('The dtype of the argument "projections" must be the same ' f'as the dtype specified by "float_type" ({float_type}), but ' f'projections.dtype is {projections.dtype}!') max_j = projections.shape[1] max_k = projections.shape[2] max_x, max_y, max_z = field.shape[:3] max_index = projections.shape[0] # Projection vectors. s for positioning the projection. p_vectors = unit_vector_p.astype(str(numba_float)) k_vectors = unit_vector_k.astype(str(numba_float)) j_vectors = unit_vector_j.astype(str(numba_float)) s_vectors = (unit_vector_k * (-0.5 * max_k + offsets_k.reshape(-1, 1)) + unit_vector_j * (-0.5 * max_j + offsets_j.reshape(-1, 1))).astype(str(numba_float)) # Translate volume steps to normalized projection steps. Can add support for non-square voxels. vector_norm = np.einsum('...i, ...i', p_vectors, np.cross(j_vectors, k_vectors)) norm_j = -np.cross(p_vectors, k_vectors) / vector_norm[..., None] norm_k = np.cross(p_vectors, j_vectors) / vector_norm[..., None] norm_offset_j = -np.einsum('...i, ...i', p_vectors, np.cross(s_vectors, k_vectors)) / vector_norm norm_offset_k = np.einsum('...i, ...i', p_vectors, np.cross(s_vectors, j_vectors)) / vector_norm channels = field.shape[-1] @njit(void(numba_float[:, :, :, ::1], numba_float, numba_float, numba_float, numba_float[::1]), fastmath=True, cache=True, nogil=True) def bilinear_interpolation_projection(projection: NDArray[float], r0: float, r1: float, r2: float, accumulator: NDArray[float]): if not (-1 <= r1 < max_j and -1 <= r2 < max_k): return # At edges, use nearest-neighbor interpolation. r1_weight = int32(0 <= int32(r1) + 1 < max_j) r2_weight = int32(0 <= int32(r2) + 1 < max_k) r1_edge_weight = int32(int32(r1) == -1) r2_edge_weight = int32(int32(r2) == -1) y_weight = numba_float((r1 - floor(r1)) * r1_weight) * (1 - r1_edge_weight) z_weight = numba_float((r2 - floor(r2)) * r2_weight) * (1 - r2_edge_weight) x = int32(r0) y = int32(r1) + r1_edge_weight z = int32(r2) + r2_edge_weight t0 = (1 - z_weight) * (1 - y_weight) t1 = z_weight * y_weight t2 = (1 - z_weight) * y_weight t3 = (1 - y_weight) * z_weight for i in range(channels): accumulator[i] += projection[x, y, z, i] * t0 accumulator[i] += projection[x, y + r1_weight, z, i] * t2 accumulator[i] += projection[x, y, z + r2_weight, i] * t3 accumulator[i] += projection[x, y + r1_weight, z + r2_weight, i] * t1 @njit(void(numba_float[:, :, :, ::1], numba_float[:, :, :, ::1]), fastmath=True, cache=True, nogil=True, parallel=True) def john_transform_adjoint_inner(field: NDArray[float], projection: NDArray[float]): """ Performs the John transform of a field. Relies on a large number of pre-defined constants outside the kernel body. """ # Indexing of volume coordinates. for x in range(max_x): fx = x - 0.5 * field.shape[0] + 0.5 for y in prange(max_y): z_acc = np.empty(channels, numba_float) fy = y - 0.5 * field.shape[1] + 0.5 for z in range(max_z): # Center of voxel and coordinate system. fz = z - 0.5 * field.shape[2] + 0.5 for j in range(channels): z_acc[j] = 0.0 for a in range(max_index): # Center with respect to projection. fj = (norm_offset_j[a] + fx * norm_j[a][0] + fy * norm_j[a][1] + fz * norm_j[a][2] - 0.5) fk = (norm_offset_k[a] + fx * norm_k[a][0] + fy * norm_k[a][1] + fz * norm_k[a][2] - 0.5) bilinear_interpolation_projection(projection, a, fj, fk, z_acc) for i in range(channels): field[x, y, z, i] = z_acc[i] def john_transform_adjoint_inner_wrapper(field, projection): john_transform_adjoint_inner(field, projection) return field return john_transform_adjoint_inner_wrapper