Source code for mumott.methods.basis_sets.gaussian_kernels

import logging
from typing import Any, Dict, Iterator, Tuple

import numpy as np
from numpy.typing import NDArray

from mumott import ProbedCoordinates, SphericalHarmonicMapper
from mumott.core.hashing import list_to_hash
from mumott.methods.utilities.tensor_operations import (framewise_contraction,
                                                        framewise_contraction_transpose)
from .base_basis_set import BasisSet
from .spherical_harmonics import SphericalHarmonics


logger = logging.getLogger(__name__)


[docs]class GaussianKernels(BasisSet): r""" Basis set class for gaussian kernels, a simple local representation on the sphere. The kernels follow a pseudo-even distribution similar to that described by Y. Kurihara `in 1965 <https://doi.org/10.1175/1520-0493%281965%29093%3C0399%3ANIOTPE%3E2.3.CO%3B2>`_, except with offsets added at the poles. Notes ----- The Gaussian kernel at location :math:`\rho_i` is given by .. math :: N_i \exp\left[ -\frac{1}{2} \left(\frac{d(\rho_i, r)}{\sigma}\right)^2 \right] .. math :: \sigma = \frac{\nu \pi}{2 (g + 1)} where :math:`\nu` is the kernel scale parameter and :math:`g` is the grid scale, and .. math :: d(\rho, r) = \arctan_2(\Vert \rho \times r \Vert, \rho \cdot r), that is, the great circle distance from the kernel location :math:`\rho` to the probed location :math:`r`. If Friedel symmetry is assumed, the expression is instead .. math :: d(\rho, r) = \arctan_2(\Vert \rho \times r \Vert, \vert \rho \cdot r \vert) The normalization factor :math:`\rho_i` is given by .. math :: N_i = \sum_j \exp\left[ -\frac{1}{2} \left( \frac{d(\rho_i, \rho_j)}{\sigma} \right)^2 \right] where the sum goes over the coordinates of all grid points. This leads to an approximately even spherical function, such that a set of coefficients which are all equal is approximately isotropic, to the extent possible with respect to restrictions imposed by grid resolution and scale parameter. Parameters ---------- probed_coordinates : ProbedCoordinates Optional. A container with the coordinates on the sphere probed at each detector segment by the experimental method. Its construction from the system geometry is method-dependent. By default, an empty instance of :class:`mumott.ProbedCoordinates` is created. grid_scale : int The size of the coordinate grid on the sphere. Denotes the number of azimuthal rings between the pole and the equator, where each ring has between ``2`` and ``2 * grid_scale`` points along the azimuth. kernel_scale_parameter : float The scale parameter of the kernel in units of :math:`\frac{\pi}{2 (g + 1)}`, where :math:`g` is ``grid_scale``. enforce_friedel_symmetry : bool If set to ``True``, Friedel symmetry will be enforced, using the assumption that points on opposite sides of the sphere are equivalent. kwargs Miscellaneous arguments which relate to segment integrations can be passed as keyword arguments: integration_mode Mode to integrate line segments on the reciprocal space sphere. Possible options are ``'simpson'``, ``'midpoint'``, ``'romberg'``, ``'trapezoid'``. ``'simpson'``, ``'trapezoid'``, and ``'romberg'`` use adaptive integration with the respective quadrature rule from ``scipy.integrate``. ``'midpoint'`` uses a single mid-point approximation of the integral. Default value is ``'simpson'``. n_integration_starting_points Number of points used in the first iteration of the adaptive integration. The number increases by the rule ``N`` &larr; ``2 * N - 1`` for each iteration. Default value is 3. integration_tolerance Tolerance for the maximum relative error between iterations before the integral is considered converged. Default is ``1e-5``. integration_maxiter Maximum number of iterations. Default is ``10``. enforce_sparsity If ``True``, makes matrix sparse by limiting the number of basis set elements that can map to each segment. Default is ``False``. sparsity_count Number of basis set elements that can map to each segment, if ``enforce_sparsity`` is set to ``True``. Default is ``3``. """ def __init__(self, probed_coordinates: ProbedCoordinates = None, grid_scale: int = 4, kernel_scale_parameter: float = 1., enforce_friedel_symmetry: bool = True, **kwargs): super().__init__(probed_coordinates, **kwargs) self._probed_coordinates_hash = hash(self.probed_coordinates) self._grid_scale = grid_scale self._kernel_scale_parameter = kernel_scale_parameter self._enforce_friedel_symmetry = enforce_friedel_symmetry self._projection_matrix = self._get_integrated_projection_matrix() def _get_kurihara_mesh(self, N) -> Tuple[NDArray, NDArray]: phi = [] theta = [] for i in np.arange(N, -1, -1): for j in np.arange(0, (2 * (i + 0.5))): phi.append((j + 0.5) / (2 * (i + 0.5)) * np.pi) phi.append((j + 0.5) / (2 * (i + 0.5)) * -np.pi) theta.append((i + 0.5) / (N + 1) * np.pi / 2) theta.append((i + 0.5) / (N + 1) * np.pi / 2) theta = np.array(theta) phi = np.mod(phi, 2 * np.pi) if not self._enforce_friedel_symmetry: theta = np.concatenate((theta, np.arccos(-np.cos(theta)))) phi = np.concatenate((phi, phi)) return theta, phi
[docs] def get_inner_product(self, u: NDArray, v: NDArray) -> NDArray: r""" Retrieves the inner product of two coefficient arrays, that is to say, the sum-product over the last axis. Parameters ---------- u The first coefficient array, of arbitrary shape and dimension, so long as the number of coefficients equals the length of this :class:`GaussianKernels` instance. v The second coefficient array, of the same shape as :attr:`u`. """ assert u.shape[-1] == len(self) assert u.shape == v.shape return np.einsum('...i, ...i -> ...', u, v, optimize='greedy')
def _get_spherical_distances(self, theta_1: NDArray[float], theta_2: NDArray[float], phi_1: NDArray[float], phi_2: NDArray[float], radius: float = 1., enforce_friedel_symmetry: bool = False) -> NDArray[float]: r""" Function for obtaining the distances between two point sets on a sphere, possibly with Friedel symmetry enforced. Arrays can have any shape, but they must all be broadcastable, and the polar angles of each set must have the same shape as the azimuthal angles. If the first and second set of points have the same shape, then the distances will be computed pointwise. Otherwise, the distances will be computed by broadcasting. Parameters ---------- theta_1 The polar angle of the first set of points, defined as :math:`\arccos(z_1)`. theta_2 The polar angle fo the second set of points, defined as :math:`\arccos(z_2)`. phi_2 The azimuthal angle of the first set of points, defined as :math:`\arctan_2(y_1, x_1)`. phi_2 The azimuthal angle of the second set of points, defined as :math:`\arctan_2(y_2, x_2)`. radius The radius of the sphere. Default is `1`, i.e., describing a unit sphere. enforce_friedel_symmetry If ``True`` (default), the point :math:`(x, y, z)` will be considered as equivalent to the point :math:`(-x, -y, -z)` and the maximum possible distance on the sphere will be :math:`\frac{\sqrt{x^2 + y^2 + z^2} \pi}{2}`, i.e., at a 90-degree angle. Otherwise, the two points will be considered distinct and the maximum distance will be :math:`\sqrt{x^2 + y^2 + z^2} \pi`. """ phi_diff = abs(phi_1 - phi_2) sine_factor = np.sin(theta_1) * np.sin(theta_2) * np.cos(phi_diff) cosine_factor = np.cos(theta_1) * np.cos(theta_2) if enforce_friedel_symmetry: return radius * np.arccos(abs(sine_factor + cosine_factor).clip(0., 1.)) else: return radius * np.arccos((sine_factor + cosine_factor).clip(-1., 1.)) def _get_projection_matrix(self, probed_coordinates: ProbedCoordinates = None) -> NDArray[float]: """ Computes the matrix necessary for forward and gradient calculations. Called when the coordinate system has been updated, or one of ``kernel_scale_parameter`` or ``grid_scale`` has been changed.""" if probed_coordinates is None: probed_coordinates = self.probed_coordinates theta, phi = self._get_kurihara_mesh(self._grid_scale) phi = phi.reshape(1, 1, 1, -1) theta = theta.reshape(1, 1, 1, -1) _, probed_polar_angles, probed_azim_angles = probed_coordinates.to_spherical probed_polar_angles = probed_polar_angles[..., np.newaxis] probed_azim_angles = probed_azim_angles[..., np.newaxis] # Find distances to all probed detector points on sphere distances = self._get_spherical_distances(theta, probed_polar_angles, phi, probed_azim_angles, enforce_friedel_symmetry=self._enforce_friedel_symmetry) # Probe at location of each kernel function to normalize over sphere mesh_distances = self._get_spherical_distances( theta.reshape(-1, 1), theta.reshape(1, -1), phi.reshape(-1, 1), phi.reshape(1, -1), enforce_friedel_symmetry=self._enforce_friedel_symmetry) std = (self._kernel_scale_parameter * np.sqrt(2 * np.pi)) / (2 * (self._grid_scale + 1)) matrix = np.exp(-(1 / 2) * (distances / std) ** 2) norm_matrix = np.exp(-(1 / 2) * (mesh_distances / std) ** 2) # The normalization factor is the inverse of the unnormalized function value at each grid point norm_factors = np.reciprocal(norm_matrix.sum(-1).reshape(1, 1, 1, -1)) return matrix * norm_factors
[docs] def forward(self, coefficients: NDArray, indices: NDArray = None) -> NDArray: """ Carries out a forward computation of projections from Gaussian kernel space into detector space, for one or several tomographic projections. Parameters ---------- coefficients An array of coefficients, of arbitrary shape so long as the last axis has the same size as :attr:`~.GaussianKernels.kernel_scale_parameter`, and if :attr:`indices` is ``None`` or greater than one, the first axis should have the same length as :attr:`indices` indices Optional. Indices of the tomographic projections for which the forward computation is to be performed. If ``None``, the forward computation will be performed for all projections. Returns ------- An array of values on the detector corresponding to the :attr:`coefficients` given. If :attr:`indices` contains exactly one index, the shape is ``(coefficients.shape[:-1], J)`` where ``J`` is the number of detector segments. If :attr:`indices` is ``None`` or contains several indices, the shape is ``(N, coefficients.shape[1:-1], J)`` where ``N`` is the number of tomographic projections for which the computation is performed. Notes ----- The assumption is made in this implementation that computations over several indices act on sets of images from different projections. For special usage where multiple projections of entire fields are desired, it may be better to use :attr:`projection_matrix` directly. This also applies to :meth:`gradient`. """ assert coefficients.shape[-1] == len(self) self._update() output = np.zeros(coefficients.shape[:-1] + (self._projection_matrix.shape[1],), coefficients.dtype) if indices is None: framewise_contraction_transpose(self._projection_matrix, coefficients, output) elif indices.size == 1: np.einsum('ijk, ...k -> ...j', self._projection_matrix[indices], coefficients, out=output, optimize='greedy', casting='unsafe') else: framewise_contraction_transpose(self._projection_matrix[indices], coefficients, output) return output
[docs] def gradient(self, coefficients: NDArray, indices: NDArray = None) -> NDArray: """ Carries out a gradient computation of projections from Gaussian kernel space into detector space for one or several tomographic projections. Parameters ---------- coefficients An array of coefficients (or residuals) of arbitrary shape so long as the last axis has the same size as the number of detector segments. indices Optional. Indices of the tomographic projections for which the gradient computation is to be performed. If ``None``, the gradient computation will be performed for all projections. Returns ------- An array of gradient values based on the :attr:`coefficients` given. If :attr:`indices` contains exactly one index, the shape is ``(coefficients.shape[:-1], J)`` where ``J`` is the number of detector segments. If indices is ``None`` or contains several indices, the shape is ``(N, coefficients.shape[1:-1], J)`` where ``N`` is the number of tomographic projections for which the computation is performed. Notes ----- When solving an inverse problem, one should not attempt to optimize the coefficients directly using the gradient one obtains by applying this method to the data. Instead, one must either take the gradient of the residual between the :meth:`~.GaussianKernels.forward` computation of the coefficients and the data. Alternatively one can apply both the forward and the gradient computation to the coefficients to be optimized, and the gradient computation to the data, and treat the residual of the two as the gradient of the optimization coefficients. The approaches are algebraically equivalent, but one may be more efficient than the other in some circumstances. However, normally, the projection between detector and ``GaussianKernel`` space is only a small part of the overall computation, so there is typically not much to be gained from optimizing it. """ self._update() output = np.zeros(coefficients.shape[:-1] + (self._projection_matrix.shape[2],), coefficients.dtype) if indices is None: framewise_contraction(self._projection_matrix, coefficients, output) elif indices.size == 1: np.einsum('ikj, ...k -> ...j', self._projection_matrix[indices], coefficients, out=output, optimize='greedy', casting='unsafe') else: framewise_contraction(self._projection_matrix[indices], coefficients, output) return output
[docs] def get_amplitudes(self, coefficients: NDArray[float], probed_coordinates: ProbedCoordinates = None) -> NDArray[float]: """ Computes the amplitudes of the spherical function represented by the provided :attr:`coefficients` at the :attr:`probed_coordinates`. Parameters ---------- coefficients An array of coefficients of arbitrary shape, provided that the last dimension contains the coefficients for one spherical function. probed_coordinates An instance of :class:`mumott.core.ProbedCoordinates` with its :attr:`vector` attribute indicating the points of the sphere for which to evaluate the amplitudes. """ if probed_coordinates is None: probed_coordinates = self._probed_coordinates matrix = self._get_projection_matrix(probed_coordinates) self._make_projection_matrix_sparse(matrix) return np.einsum('ij, ...j', matrix.squeeze(), coefficients, optimize='greedy')
[docs] def get_spherical_harmonic_coefficients(self, coefficients: NDArray, ell_max: int = None): """ Computes the spherical harmonic coefficients of the spherical function represented by the provided :attr:`coefficients` using a Driscoll-Healy grid. For details on the Driscoll-Healy grid, see `the SHTools page <https://shtools.github.io/SHTOOLS/grid-formats.html>`_ for a comprehensive overview. Parameters ---------- coefficients An array of coefficients of arbitrary shape, provided that the last dimension contains the coefficients for one function. ell_max The bandlimit of the spherical harmonic expansion. By default, it is ``2 * grid_scale``. """ if ell_max is None: ell_max = 2 * self._grid_scale dh_grid_size = 4 * (self._grid_scale + 1) mapper = SphericalHarmonicMapper(ell_max=ell_max, polar_resolution=dh_grid_size, azimuthal_resolution=dh_grid_size, enforce_friedel_symmetry=self._enforce_friedel_symmetry) coordinates = ProbedCoordinates(mapper.unit_vectors.reshape((1, -1, 1, 3))) amplitudes = self.get_amplitudes(coefficients, coordinates)\ .reshape(coefficients.shape[:-1] + (dh_grid_size, dh_grid_size)) spherical_harmonics_coefficients = mapper.get_harmonic_coefficients(amplitudes) return spherical_harmonics_coefficients
[docs] def get_output(self, coefficients: NDArray) -> Dict[str, Any]: r""" Returns a dictionary of output data for a given array of basis set coefficients. Parameters ---------- coefficients An array of coefficients of arbitrary shape and dimensions, except its last dimension must be the same length as the :attr:`len` of this instance. Computations only operate over the last axis of :attr:`coefficients`, so derived properties in the output will have the shape ``(*coefficients.shape[:-1], ...)``. Returns ------- A dictionary containing two sub-dictionaries, ``basis_set`` and ``spherical_harmonic_analysis``. ``basis_set`` contains information particular to :class:`GaussianKernels`, whereas ``spherical_harmonic_analysis`` contains an analysis of the spherical function using a spherical harmonic transform. Notes ----- In detail, the two sub-dictionaries ``basis_set`` and ``spherical_harmonic_analysis`` have the following members: basis_set name The name of the basis set, i.e., ``'GaussianKernels'`` coefficients A copy of :attr:`coefficients`. grid_scale A copy of :attr:`~.GaussianKernels.grid_scale`. kernel_Scale_paramter A copy of :attr:`~.GaussianKernels.kernel_scale_parameter`. enforce_friedel_symmetry A copy of :attr:`~.GaussianKernels.enforce_friedel_symmetry`. projection_matrix A copy of :attr:`~.GaussianKernels.projection_matrix`. spherical_harmonic_analysis An analysis of the spherical function in terms of spherical harmonics. See :meth:`SphericalHarmonics.get_output <SphericalHarmonics.get_output()>` for details. """ assert coefficients.shape[-1] == len(self) # Update to ensure non-dirty output state. self._update() sph_coefficients = self.get_spherical_harmonic_coefficients(coefficients) spherical_harmonics_set = SphericalHarmonics(ell_max=2 * self._grid_scale, enforce_friedel_symmetry=self._enforce_friedel_symmetry) sph_output = spherical_harmonics_set.get_output(sph_coefficients) output_dictionary = {} # basis set-specific information basis_set = {} output_dictionary['basis_set'] = basis_set basis_set['name'] = type(self).__name__ basis_set['coefficients'] = coefficients.copy() basis_set['grid_scale'] = self._grid_scale basis_set['grid'] = self.grid basis_set['kernel_scale_parameter'] = self._kernel_scale_parameter basis_set['enforce_friedel_symmetry'] = self._enforce_friedel_symmetry basis_set['projection_matrix'] = self._projection_matrix.copy() basis_set['hash'] = hex(hash(self)) # Analysis is more easily done in spherical harmonic space. output_dictionary['spherical_harmonic_analysis'] = sph_output return output_dictionary
def __iter__(self) -> Iterator[Tuple[str, Any]]: """ Allows class to be iterated over and in particular be cast as a dictionary. """ yield 'name', type(self).__name__ yield 'grid_scale', self._grid_scale yield 'kernel_scale_parameter', self._kernel_scale_parameter yield 'enforce_friedel_symmetry', self._enforce_friedel_symmetry yield 'projection_matrix', self._projection_matrix yield 'hash', hex(hash(self))[2:] def __len__(self) -> int: return self._projection_matrix.shape[-1] def __hash__(self) -> int: """Returns a hash reflecting the internal state of the instance. Returns ------- A hash of the internal state of the instance, cast as an ``int``. """ to_hash = [self._grid_scale, self.grid, self._kernel_scale_parameter, self._enforce_friedel_symmetry, self._projection_matrix, self._probed_coordinates_hash] return int(list_to_hash(to_hash), 16) def _update(self) -> None: # We only run updates if the hashes do not match. if self.is_dirty: self._projection_matrix = self._get_integrated_projection_matrix() self._probed_coordinates_hash = hash(self._probed_coordinates) @property def is_dirty(self) -> bool: return hash(self._probed_coordinates) != self._probed_coordinates_hash @property def projection_matrix(self) -> NDArray: """ The matrix used to project spherical functions from the unit sphere onto the detector. If ``v`` is a vector of gaussian kernel coefficients, and ``M`` is the ``projection_matrix``, then ``M @ v`` gives the corresponding values on the detector segments associated with each projection. ``M[i] @ v`` gives the values on the detector segments associated with projection ``i``. If ``r`` is a residual between a projection from Gaussian kernel to detector space and data from projection ``i``, then ``M[i].T @ r`` gives the associated gradient in Gaussian kernel space. """ self._update() return self._projection_matrix @property def grid_scale(self) -> int: """ The number of azimuthal rings from each pole to the equator in the spherical grid. """ return self._grid_scale @grid_scale.setter def grid_scale(self, val: int) -> None: if val < 0 or val != round(val): raise ValueError('grid_scale must be a non-negative integer,' f' but a value of {val} was given!') self._grid_scale = val self._projection_matrix = self._get_integrated_projection_matrix() @property def kernel_scale_parameter(self) -> float: """ The scale parameter for each kernel. """ return self._kernel_scale_parameter @kernel_scale_parameter.setter def kernel_scale_parameter(self, val: float) -> float: self._kernel_scale_parameter = val self._projection_matrix = self._get_integrated_projection_matrix() @property def enforce_friedel_symmetry(self) -> bool: """ If ``True``, Friedel symmetry is enforced, i.e., the point :math:`-r` is treated as equivalent to :math:`r`. """ return self._enforce_friedel_symmetry @property def grid(self) -> Tuple[NDArray['float'], NDArray['float']]: r""" Returns the polar and azimuthal angles of the grid used by the basis. Returns ------- A ``Tuple`` with contents ``(polar_angle, azimuthal_angle)``, where the polar angle is defined as :math:`\arccos(z)`. """ return self._get_kurihara_mesh(self._grid_scale) @property def grid_hash(self) -> str: """ Returns a hash of :attr:`grid`. """ return list_to_hash([self.grid]) @property def projection_matrix_hash(self) -> str: """ Returns a hash of :attr:`projection_matrix`. """ return list_to_hash([self.projection_matrix]) def __str__(self) -> str: wdt = 74 s = [] s += ['-' * wdt] s += ['GaussianKernels'.center(wdt)] s += ['-' * wdt] with np.printoptions(threshold=4, edgeitems=2, precision=5, linewidth=60): s += ['{:18} : {}'.format('grid_scale', self.grid_scale)] s += ['{:18} : {}'.format('grid_hash', self.grid_hash)] s += ['{:18} : {}'.format('enforce_friedel_symmetry', self.enforce_friedel_symmetry)] s += ['{:18} : {}'.format('kernel_scale_parameter', self.kernel_scale_parameter)] s += ['{:18} : {}'.format('projection_matrix_hash', self.projection_matrix_hash)] s += ['{:18} : {}'.format('hash', hex(hash(self))[2:8])] s += ['-' * wdt] return '\n'.join(s) def _repr_html_(self) -> str: s = [] s += [f'<h3>{self.__class__.__name__}</h3>'] s += ['<table border="1" class="dataframe">'] s += ['<thead><tr><th style="text-align: left;">Field</th><th>Size</th><th>Data</th></tr></thead>'] s += ['<tbody>'] with np.printoptions(threshold=4, edgeitems=2, precision=2, linewidth=40): s += ['<tr><td style="text-align: left;">grid_scale</td>'] s += [f'<td>1</td><td>{self.grid_scale}</td></tr>'] s += ['<tr><td style="text-align: left;">grid_hash</td>'] s += [f'<td>{len(self.grid_hash)}</td><td>{self.grid_hash[:6]}</td></tr>'] s += ['<tr><td style="text-align: left;">kernel_scale_parameter</td>'] s += [f'<td>1</td><td>{self.kernel_scale_parameter}</td></tr>'] s += ['<tr><td style="text-align: left;">enforce_friedel_symmetry</td>'] s += [f'<td>1</td>' f'<td>{self.enforce_friedel_symmetry}</td></tr>'] s += ['<tr><td style="text-align: left;">projection_matrix</td>'] s += [f'<td>{len(self.projection_matrix_hash)}</td>' f'<td>{self.projection_matrix_hash[:6]}</td></tr>'] s += ['<tr><td style="text-align: left;">hash</td>'] s += [f'<td>{len(hex(hash(self)))}</td><td>{hex(hash(self))[2:8]}</td></tr>'] s += ['</tbody>'] s += ['</table>'] return '\n'.join(s)