Coverage for local_installation_linux/mumott/methods/basis_sets/base_basis_set.py: 92%

160 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2025-05-05 21:21 +0000

1 

2import logging 

3import numpy as np 

4 

5from abc import ABC, abstractmethod 

6from mumott import ProbedCoordinates 

7from mumott.output_handling.reconstruction_derived_quantities import ReconstructionDerivedQuantities 

8from scipy.integrate import simpson, romb, trapezoid 

9from scipy.sparse import csr_array 

10 

11from numpy.typing import NDArray 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16class BasisSet(ABC): 

17 

18 """This is the base class from which specific basis sets are being derived. 

19 """ 

20 

21 def __init__(self, 

22 probed_coordinates: ProbedCoordinates = None, 

23 **kwargs): 

24 if probed_coordinates is None: 

25 probed_coordinates = ProbedCoordinates() 

26 self.probed_coordinates = probed_coordinates 

27 self._integration_mode = kwargs.get('integration_mode', 'simpson') 

28 if self._integration_mode not in ('simpson', 'romberg', 'trapezoid', 'midpoint'): 28 ↛ 29line 28 didn't jump to line 29, because the condition on line 28 was never true

29 raise ValueError('integration_mode must be "simpson" (for integration with Simpson\'s rule), ' 

30 ' "midpoint" (for center-of-segment approximation), "romberg", or "trapezoid"!') 

31 self._integration_tolerance = kwargs.get('integration_tolerance', 1e-5) 

32 self._integration_maxiter = kwargs.get('integration_maxiter', 10) 

33 self._n_integration_starting_points = kwargs.get('n_integration_starting_points', 3) 

34 self._enforce_sparsity = kwargs.get('enforce_sparsity', False) 

35 self._sparsity_count = kwargs.get('sparsity_count', 3) 

36 

37 @property 

38 def probed_coordinates(self) -> ProbedCoordinates: 

39 return self._probed_coordinates 

40 

41 @probed_coordinates.setter 

42 def probed_coordinates(self, pc: ProbedCoordinates) -> None: 

43 self._probed_coordinates = pc 

44 

45 @abstractmethod 

46 def forward(self, 

47 coefficients: NDArray, 

48 indices: NDArray = None) -> NDArray: 

49 pass 

50 

51 @abstractmethod 

52 def gradient(self, 

53 coefficients: NDArray, 

54 indices: NDArray = None) -> NDArray: 

55 pass 

56 

57 @abstractmethod 

58 def get_spherical_harmonic_coefficients(self, 

59 coefficients: NDArray, 

60 ell_max: int = None,) -> NDArray: 

61 pass 

62 

63 @abstractmethod 

64 def _get_projection_matrix(self, probed_coordinates: ProbedCoordinates): 

65 pass 

66 

67 def generate_map(self, 

68 coefficients: NDArray[float], 

69 resolution_in_degrees: int = 5, 

70 map_half_sphere: bool = True) -> tuple[NDArray]: 

71 """ Generate a (theta, phi) map of the function modeled by the input coefficients. 

72 If :attr:`map_half_sphere=True` (default) a map of only the z>0 half sphere is returned. 

73 

74 Parameters 

75 ---------- 

76 coefficients 

77 One dimensional numpy array with length ``len(self)`` containing the coefficients 

78 of the function to be plotted. 

79 resolution_in_degrees 

80 The resoution of the map in degrees. The map uses eqidistant lines in longitude 

81 and latitude. 

82 map_half_sphere 

83 If `True` returns a map of the z>0 half sphere. 

84 

85 Returns 

86 ------- 

87 map_intensity 

88 Intensity values of the map. 

89 map_theta 

90 Polar cooridnates of the map. 

91 map_phi 

92 Azimuthal coordinates of the map. 

93 """ 

94 # Generate coordinates. 

95 if map_half_sphere: 

96 steps = int(np.ceil(90/resolution_in_degrees)) 

97 map_theta = np.linspace(0, np.pi/2, steps + 1) 

98 else: 

99 steps = int(np.ceil(180/resolution_in_degrees)) 

100 map_theta = np.linspace(0, np.pi, steps + 1) 

101 steps = int(np.ceil(360/resolution_in_degrees)) 

102 map_phi = np.linspace(0, 2*np.pi, steps + 1) 

103 map_theta, map_phi = np.meshgrid(map_theta, map_phi, indexing='ij') 

104 

105 # Create a ProbedCoordinates to pass into `_get_projection_matrix` 

106 x = np.cos(map_phi)*np.sin(map_theta) 

107 y = np.sin(map_phi)*np.sin(map_theta) 

108 z = np.cos(map_theta) 

109 vector = np.stack((x, y, z), axis=-1) 

110 probed_coordinates = ProbedCoordinates() 

111 probed_coordinates.vector = vector[:, :, np.newaxis, :] 

112 

113 # Evaluate map intensity 

114 basis_funciton_values = self._get_projection_matrix(probed_coordinates)[:, :, 0, :] 

115 map_intensity = np.einsum('m,tpm->tp', coefficients, basis_funciton_values) 

116 return map_intensity, map_theta, map_phi 

117 

118 def _make_projection_matrix_sparse(self, matrix: np.ndarray[float]) -> np.ndarray[float]: 

119 if self._enforce_sparsity: 

120 for i in range(matrix.shape[0]): 

121 for j in range(matrix.shape[1]): 

122 sorted_args = np.argsort(matrix[i, j, :]) 

123 for k in sorted_args[:-self._sparsity_count]: 

124 matrix[i, j, k] = 0. 

125 

126 def _get_integrated_projection_matrix(self, probed_coordinates: ProbedCoordinates = None): 

127 """ Uses Simpson's rule to integrate the basis set representation over 

128 each detector segment on the unit sphere.""" 

129 # use 0 and -1 for backwards compatibility 

130 if probed_coordinates is None: 

131 probed_coordinates = self.probed_coordinates 

132 start = probed_coordinates.vector[..., 0, :] 

133 # don't normalize in-place to avoid modifying probed_coordinates 

134 start = start / np.linalg.norm(start, axis=-1)[..., None] 

135 start = start[..., None, :] 

136 end = probed_coordinates.vector[..., -1, :] 

137 end = end / np.linalg.norm(end, axis=-1)[..., None] 

138 end = end[..., None, :] 

139 # if segments lie on small circle, correct using waxs offset vector before slerp 

140 offset = probed_coordinates.great_circle_offset[..., 0, :] 

141 offset = offset[..., None, :] 

142 # when run initially with probed_coordinates = None or similar 

143 if np.allclose(start, end): 

144 return self._get_projection_matrix(probed_coordinates)[:, :, 0] 

145 if self._integration_mode == 'midpoint': 

146 # Just use central point to get the projection matrix. 

147 pc = ProbedCoordinates(probed_coordinates.vector[..., 1:2, :]) 

148 return self._get_projection_matrix(pc)[..., 0, :] 

149 # segment length is subtended angle between start and end 

150 corr_start = start - offset 

151 corr_end = end - offset 

152 segment_length = np.arccos(np.einsum('...i, ...i', corr_start, corr_end)) 

153 

154 def slerp(t): 

155 omega = segment_length.reshape(segment_length.shape + (1,)) 

156 t = t.reshape(1, 1, -1, 1) 

157 # avoid division by 0, use 1st order approximation 

158 where = np.isclose(abs(omega[..., 0, 0]), 0.) 

159 sin_omega = np.sin(omega) 

160 sin_tomega = np.sin(t * omega) 

161 sin_1mtomega = np.sin((1 - t) * omega) 

162 a = np.zeros(start.shape[:2] + (t.shape[2], 1), dtype=float) 

163 b = np.zeros(start.shape[:2] + (t.shape[2], 1), dtype=float) 

164 a[~where] = sin_1mtomega[~where] / sin_omega[~where] 

165 b[~where] = sin_tomega[~where] / sin_omega[~where] 

166 # sin(ax) / sin(x) ~ a for x ~ 0 

167 a[where] = (1 - t) * np.ones_like(omega[where]) 

168 b[where] = t * np.ones_like(omega[where]) 

169 return a * corr_start + b * corr_end + offset 

170 

171 def quadrature(matrix, t): 

172 if self._integration_mode == 'simpson': 

173 return simpson(matrix, x=t, axis=-2) 

174 elif self._integration_mode == 'romberg': 

175 return romb(matrix, dx=1 / (t.size - 1), axis=-2) 

176 elif self._integration_mode == 'trapezoid': 176 ↛ exitline 176 didn't return from function 'quadrature', because the condition on line 176 was never false

177 return trapezoid(matrix, x=t, axis=-2) 

178 number_of_points = self._n_integration_starting_points 

179 t = np.linspace(0, 1, number_of_points) 

180 pc = ProbedCoordinates(slerp(t)) 

181 old_matrix = self._get_projection_matrix(pc) 

182 # get an initial matrix for comparison 

183 old_matrix = quadrature(old_matrix, t) 

184 for i in range(self._integration_maxiter): 

185 # double the number of intervals in each iteration 

186 number_of_points += max(number_of_points - 1, 1) 

187 t = np.linspace(0, 1, number_of_points) 

188 vector = slerp(t) 

189 pc = ProbedCoordinates(vector) 

190 # integrate all matrices using simpson's rule 

191 new_matrix = quadrature(self._get_projection_matrix(pc), t) 

192 norm = np.max(abs(new_matrix - old_matrix)) / np.max(abs(new_matrix)) 

193 if norm < self._integration_tolerance: 

194 break 

195 old_matrix = new_matrix 

196 else: 

197 logger.warning('Projection matrix did not converge! ' 

198 'Try increasing integration_maxiter or reducing integration_tolerance.') 

199 self._make_projection_matrix_sparse(new_matrix) 

200 return new_matrix 

201 

202 @property 

203 def integration_mode(self) -> str: 

204 """ 

205 Mode of integration for calculating projection matrix. 

206 Accepted values are ``'simpson'``, ``'romberg'``, ``'trapezoid'``, 

207 and ``'midpoint'``. 

208 """ 

209 return self._integration_mode 

210 

211 @integration_mode.setter 

212 def integration_mode(self, val) -> None: 

213 if val not in ('simpson', 'midpoint', 'romberg'): 213 ↛ 214line 213 didn't jump to line 214, because the condition on line 213 was never true

214 raise ValueError('integration_mode must have value "midpoint", ' 

215 '"romberg", "trapezoid" or "simpson", ' 

216 f'but a value of {val} was given!') 

217 self._integration_mode = val 

218 self._projection_matrix = self._get_integrated_projection_matrix() 

219 

220 @abstractmethod 

221 def get_output(self, 

222 coefficients: NDArray) -> ReconstructionDerivedQuantities: 

223 pass 

224 

225 @abstractmethod 

226 def __len__(self) -> int: 

227 pass 

228 

229 @abstractmethod 

230 def __dict__(self) -> dict: 

231 pass 

232 

233 @abstractmethod 

234 def __str__(self) -> str: 

235 pass 

236 

237 @abstractmethod 

238 def _repr_html_(self) -> str: 

239 pass 

240 

241 @property 

242 def csr_representation(self) -> tuple: 

243 """ The projection matrix as a stack of sparse matrices in 

244 CSR representation as a tuple. The information in the tuple consists of 

245 the 3 dense matrices making up the representation, 

246 in the order ``(pointers, indices, data)``.""" 

247 nnz = np.max((self._projection_matrix > 0).sum((-1, -2))) 

248 sparse_data = np.zeros((self._projection_matrix.shape[0], nnz), dtype=np.float32) 

249 sparse_indices = np.zeros((self._projection_matrix.shape[0], nnz), dtype=np.int32) 

250 sparse_pointers = np.zeros((self._projection_matrix.shape[0], 

251 self._projection_matrix.shape[1] + 1), dtype=np.int32) 

252 for i in range(self._projection_matrix.shape[0]): 

253 sparse_matrix = csr_array(self._projection_matrix[i]) 

254 sparse_matrix.eliminate_zeros() 

255 sparse_data[i, :sparse_matrix.nnz] = sparse_matrix.data 

256 sparse_indices[i, :sparse_matrix.nnz] = sparse_matrix.indices 

257 sparse_pointers[i, :len(sparse_matrix.indptr)] = sparse_matrix.indptr 

258 return sparse_pointers, sparse_indices, sparse_data