Coverage for local_installation_linux/mumott/methods/basis_sets/base_basis_set.py: 92%
160 statements
« prev ^ index » next coverage.py v7.3.2, created at 2025-05-05 21:21 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2025-05-05 21:21 +0000
2import logging
3import numpy as np
5from abc import ABC, abstractmethod
6from mumott import ProbedCoordinates
7from mumott.output_handling.reconstruction_derived_quantities import ReconstructionDerivedQuantities
8from scipy.integrate import simpson, romb, trapezoid
9from scipy.sparse import csr_array
11from numpy.typing import NDArray
13logger = logging.getLogger(__name__)
16class BasisSet(ABC):
18 """This is the base class from which specific basis sets are being derived.
19 """
21 def __init__(self,
22 probed_coordinates: ProbedCoordinates = None,
23 **kwargs):
24 if probed_coordinates is None:
25 probed_coordinates = ProbedCoordinates()
26 self.probed_coordinates = probed_coordinates
27 self._integration_mode = kwargs.get('integration_mode', 'simpson')
28 if self._integration_mode not in ('simpson', 'romberg', 'trapezoid', 'midpoint'): 28 ↛ 29line 28 didn't jump to line 29, because the condition on line 28 was never true
29 raise ValueError('integration_mode must be "simpson" (for integration with Simpson\'s rule), '
30 ' "midpoint" (for center-of-segment approximation), "romberg", or "trapezoid"!')
31 self._integration_tolerance = kwargs.get('integration_tolerance', 1e-5)
32 self._integration_maxiter = kwargs.get('integration_maxiter', 10)
33 self._n_integration_starting_points = kwargs.get('n_integration_starting_points', 3)
34 self._enforce_sparsity = kwargs.get('enforce_sparsity', False)
35 self._sparsity_count = kwargs.get('sparsity_count', 3)
37 @property
38 def probed_coordinates(self) -> ProbedCoordinates:
39 return self._probed_coordinates
41 @probed_coordinates.setter
42 def probed_coordinates(self, pc: ProbedCoordinates) -> None:
43 self._probed_coordinates = pc
45 @abstractmethod
46 def forward(self,
47 coefficients: NDArray,
48 indices: NDArray = None) -> NDArray:
49 pass
51 @abstractmethod
52 def gradient(self,
53 coefficients: NDArray,
54 indices: NDArray = None) -> NDArray:
55 pass
57 @abstractmethod
58 def get_spherical_harmonic_coefficients(self,
59 coefficients: NDArray,
60 ell_max: int = None,) -> NDArray:
61 pass
63 @abstractmethod
64 def _get_projection_matrix(self, probed_coordinates: ProbedCoordinates):
65 pass
67 def generate_map(self,
68 coefficients: NDArray[float],
69 resolution_in_degrees: int = 5,
70 map_half_sphere: bool = True) -> tuple[NDArray]:
71 """ Generate a (theta, phi) map of the function modeled by the input coefficients.
72 If :attr:`map_half_sphere=True` (default) a map of only the z>0 half sphere is returned.
74 Parameters
75 ----------
76 coefficients
77 One dimensional numpy array with length ``len(self)`` containing the coefficients
78 of the function to be plotted.
79 resolution_in_degrees
80 The resoution of the map in degrees. The map uses eqidistant lines in longitude
81 and latitude.
82 map_half_sphere
83 If `True` returns a map of the z>0 half sphere.
85 Returns
86 -------
87 map_intensity
88 Intensity values of the map.
89 map_theta
90 Polar cooridnates of the map.
91 map_phi
92 Azimuthal coordinates of the map.
93 """
94 # Generate coordinates.
95 if map_half_sphere:
96 steps = int(np.ceil(90/resolution_in_degrees))
97 map_theta = np.linspace(0, np.pi/2, steps + 1)
98 else:
99 steps = int(np.ceil(180/resolution_in_degrees))
100 map_theta = np.linspace(0, np.pi, steps + 1)
101 steps = int(np.ceil(360/resolution_in_degrees))
102 map_phi = np.linspace(0, 2*np.pi, steps + 1)
103 map_theta, map_phi = np.meshgrid(map_theta, map_phi, indexing='ij')
105 # Create a ProbedCoordinates to pass into `_get_projection_matrix`
106 x = np.cos(map_phi)*np.sin(map_theta)
107 y = np.sin(map_phi)*np.sin(map_theta)
108 z = np.cos(map_theta)
109 vector = np.stack((x, y, z), axis=-1)
110 probed_coordinates = ProbedCoordinates()
111 probed_coordinates.vector = vector[:, :, np.newaxis, :]
113 # Evaluate map intensity
114 basis_funciton_values = self._get_projection_matrix(probed_coordinates)[:, :, 0, :]
115 map_intensity = np.einsum('m,tpm->tp', coefficients, basis_funciton_values)
116 return map_intensity, map_theta, map_phi
118 def _make_projection_matrix_sparse(self, matrix: np.ndarray[float]) -> np.ndarray[float]:
119 if self._enforce_sparsity:
120 for i in range(matrix.shape[0]):
121 for j in range(matrix.shape[1]):
122 sorted_args = np.argsort(matrix[i, j, :])
123 for k in sorted_args[:-self._sparsity_count]:
124 matrix[i, j, k] = 0.
126 def _get_integrated_projection_matrix(self, probed_coordinates: ProbedCoordinates = None):
127 """ Uses Simpson's rule to integrate the basis set representation over
128 each detector segment on the unit sphere."""
129 # use 0 and -1 for backwards compatibility
130 if probed_coordinates is None:
131 probed_coordinates = self.probed_coordinates
132 start = probed_coordinates.vector[..., 0, :]
133 # don't normalize in-place to avoid modifying probed_coordinates
134 start = start / np.linalg.norm(start, axis=-1)[..., None]
135 start = start[..., None, :]
136 end = probed_coordinates.vector[..., -1, :]
137 end = end / np.linalg.norm(end, axis=-1)[..., None]
138 end = end[..., None, :]
139 # if segments lie on small circle, correct using waxs offset vector before slerp
140 offset = probed_coordinates.great_circle_offset[..., 0, :]
141 offset = offset[..., None, :]
142 # when run initially with probed_coordinates = None or similar
143 if np.allclose(start, end):
144 return self._get_projection_matrix(probed_coordinates)[:, :, 0]
145 if self._integration_mode == 'midpoint':
146 # Just use central point to get the projection matrix.
147 pc = ProbedCoordinates(probed_coordinates.vector[..., 1:2, :])
148 return self._get_projection_matrix(pc)[..., 0, :]
149 # segment length is subtended angle between start and end
150 corr_start = start - offset
151 corr_end = end - offset
152 segment_length = np.arccos(np.einsum('...i, ...i', corr_start, corr_end))
154 def slerp(t):
155 omega = segment_length.reshape(segment_length.shape + (1,))
156 t = t.reshape(1, 1, -1, 1)
157 # avoid division by 0, use 1st order approximation
158 where = np.isclose(abs(omega[..., 0, 0]), 0.)
159 sin_omega = np.sin(omega)
160 sin_tomega = np.sin(t * omega)
161 sin_1mtomega = np.sin((1 - t) * omega)
162 a = np.zeros(start.shape[:2] + (t.shape[2], 1), dtype=float)
163 b = np.zeros(start.shape[:2] + (t.shape[2], 1), dtype=float)
164 a[~where] = sin_1mtomega[~where] / sin_omega[~where]
165 b[~where] = sin_tomega[~where] / sin_omega[~where]
166 # sin(ax) / sin(x) ~ a for x ~ 0
167 a[where] = (1 - t) * np.ones_like(omega[where])
168 b[where] = t * np.ones_like(omega[where])
169 return a * corr_start + b * corr_end + offset
171 def quadrature(matrix, t):
172 if self._integration_mode == 'simpson':
173 return simpson(matrix, x=t, axis=-2)
174 elif self._integration_mode == 'romberg':
175 return romb(matrix, dx=1 / (t.size - 1), axis=-2)
176 elif self._integration_mode == 'trapezoid': 176 ↛ exitline 176 didn't return from function 'quadrature', because the condition on line 176 was never false
177 return trapezoid(matrix, x=t, axis=-2)
178 number_of_points = self._n_integration_starting_points
179 t = np.linspace(0, 1, number_of_points)
180 pc = ProbedCoordinates(slerp(t))
181 old_matrix = self._get_projection_matrix(pc)
182 # get an initial matrix for comparison
183 old_matrix = quadrature(old_matrix, t)
184 for i in range(self._integration_maxiter):
185 # double the number of intervals in each iteration
186 number_of_points += max(number_of_points - 1, 1)
187 t = np.linspace(0, 1, number_of_points)
188 vector = slerp(t)
189 pc = ProbedCoordinates(vector)
190 # integrate all matrices using simpson's rule
191 new_matrix = quadrature(self._get_projection_matrix(pc), t)
192 norm = np.max(abs(new_matrix - old_matrix)) / np.max(abs(new_matrix))
193 if norm < self._integration_tolerance:
194 break
195 old_matrix = new_matrix
196 else:
197 logger.warning('Projection matrix did not converge! '
198 'Try increasing integration_maxiter or reducing integration_tolerance.')
199 self._make_projection_matrix_sparse(new_matrix)
200 return new_matrix
202 @property
203 def integration_mode(self) -> str:
204 """
205 Mode of integration for calculating projection matrix.
206 Accepted values are ``'simpson'``, ``'romberg'``, ``'trapezoid'``,
207 and ``'midpoint'``.
208 """
209 return self._integration_mode
211 @integration_mode.setter
212 def integration_mode(self, val) -> None:
213 if val not in ('simpson', 'midpoint', 'romberg'): 213 ↛ 214line 213 didn't jump to line 214, because the condition on line 213 was never true
214 raise ValueError('integration_mode must have value "midpoint", '
215 '"romberg", "trapezoid" or "simpson", '
216 f'but a value of {val} was given!')
217 self._integration_mode = val
218 self._projection_matrix = self._get_integrated_projection_matrix()
220 @abstractmethod
221 def get_output(self,
222 coefficients: NDArray) -> ReconstructionDerivedQuantities:
223 pass
225 @abstractmethod
226 def __len__(self) -> int:
227 pass
229 @abstractmethod
230 def __dict__(self) -> dict:
231 pass
233 @abstractmethod
234 def __str__(self) -> str:
235 pass
237 @abstractmethod
238 def _repr_html_(self) -> str:
239 pass
241 @property
242 def csr_representation(self) -> tuple:
243 """ The projection matrix as a stack of sparse matrices in
244 CSR representation as a tuple. The information in the tuple consists of
245 the 3 dense matrices making up the representation,
246 in the order ``(pointers, indices, data)``."""
247 nnz = np.max((self._projection_matrix > 0).sum((-1, -2)))
248 sparse_data = np.zeros((self._projection_matrix.shape[0], nnz), dtype=np.float32)
249 sparse_indices = np.zeros((self._projection_matrix.shape[0], nnz), dtype=np.int32)
250 sparse_pointers = np.zeros((self._projection_matrix.shape[0],
251 self._projection_matrix.shape[1] + 1), dtype=np.int32)
252 for i in range(self._projection_matrix.shape[0]):
253 sparse_matrix = csr_array(self._projection_matrix[i])
254 sparse_matrix.eliminate_zeros()
255 sparse_data[i, :sparse_matrix.nnz] = sparse_matrix.data
256 sparse_indices[i, :sparse_matrix.nnz] = sparse_matrix.indices
257 sparse_pointers[i, :len(sparse_matrix.indptr)] = sparse_matrix.indptr
258 return sparse_pointers, sparse_indices, sparse_data