import logging
from typing import List, Optional, Tuple
import numpy
logger = logging.getLogger(__name__)
[docs]
def get_centrosymmetric_limits(azimuth_range: List[float]) -> Tuple[List[float], ...]:
"""
Calculate the centrosymmetric limits for a given azimuth range.
Inputs:
azimuth_range (List[float]): The azimuth range as a list of two values.
Outputs:
Tuple[List[flo
at], ...]: A tuple containing one or two lists of centrosymmetric limits.
"""
if azimuth_range[0] == azimuth_range[1]:
return ([-180, 180],)
lim = numpy.array(azimuth_range)
if lim.prod() <= 0:
lim = max(abs(lim))
return ([max(-lim, -180), min(lim, 180)],)
else:
a = min(max(abs(lim)), 180)
b = max(min(abs(lim)), -180)
return ([-a, -b], [b, a])
[docs]
def get_array_limit(
unit_range: List[float], unit_array: numpy.ndarray
) -> Tuple[int, int]:
"""
Get the array indices corresponding to the unit range.
Inputs:
unit_range (List[float]): The unit range as a list of two values.
unit_array (numpy.ndarray): The array of unit values.
Outputs:
Tuple[int, int]: The indices corresponding to the unit range.
"""
arg_min = numpy.argmin(abs(unit_array - unit_range[0]))
arg_max = numpy.argmin(abs(unit_array - unit_range[1]))
return (arg_min, arg_max)
[docs]
def create_mask_average(
data_shape: tuple,
data_ranges: List[List[Tuple[int, int]]],
):
"""
This method skips the first dimension as it is supposed to be frame dimension.
True -> to keep
False -> to mask
"""
mask_slice = numpy.ones(data_shape, dtype=bool)
if len(data_ranges) != len(data_shape):
logger.error(
f"Number of dimensions in dataset_ranges ({len(data_ranges)}) does not match with data_shape"
)
return mask_slice
for ind_dim, ranges_dimension in enumerate(data_ranges):
mask_ = numpy.zeros(data_shape, dtype=bool)
idx = [slice(None)] * len(data_shape)
for r in ranges_dimension:
if not isinstance(r, (list, tuple)):
logger.error(f"Array range {r} is not a list or tuple.")
continue
if len(r) != 2:
logger.error(
f"Array range {r} is not valid. It should contain two elements."
)
continue
idx[ind_dim] = slice(int(r[0]), int(r[1]))
mask_[tuple(idx)] = True
mask_slice &= mask_
return mask_slice
[docs]
def calculate_average(
dataset_intensity: Optional[numpy.ndarray],
dataset_variance: Optional[numpy.ndarray] = None,
dataset_sum_signal: Optional[numpy.ndarray] = None,
dataset_sum_norm: Optional[numpy.ndarray] = None,
dataset_sum_variance: Optional[numpy.ndarray] = None,
axis_average: int = 1,
array1_range: Optional[List[Tuple[int, int]]] = None,
array2_range: Optional[List[Tuple[int, int]]] = None,
Dummy: int = -10,
calculate_variance: bool = False,
**kwargs,
) -> Tuple[
Optional[numpy.ndarray],
Optional[numpy.ndarray],
Optional[numpy.ndarray],
Optional[numpy.ndarray],
]:
"""
Calculate the average intensity, signal normalization, variance, and sigma for a dataset.
Inputs:
dataset_intensity (Optional[numpy.ndarray]): The intensity dataset.
dataset_sum_signal (Optional[numpy.ndarray], optional): The sum signal dataset. Defaults to None.
dataset_sum_norm (Optional[numpy.ndarray], optional): The sum normalization dataset. Defaults to None.
dataset_sum_variance (Optional[numpy.ndarray], optional): The sum variance dataset. Defaults to None.
array_ranges (Optional[List[Tuple[int, int]]], optional): The ranges for averaging. Defaults to None.
axis_average (int, optional): The axis along which to average. Defaults to 1. 1 is for azimuth axis, 2 is for radial axis.
Dummy (int, optional): The dummy value to replace invalid data. Defaults to -10.
Outputs:
Tuple[Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray]]:
The average intensity, signal normalization, variance, and sigma datasets.
"""
if axis_average >= dataset_intensity.ndim:
raise ValueError(
f"Axis {axis_average} is out of bounds for dataset with {dataset_intensity.ndim} dimensions."
)
array1_range = array1_range or [(0, -1)]
array2_range = array2_range or [(0, -1)]
# Create the mask and apply to all dataset
mask_slice = create_mask_average(
data_shape=dataset_intensity.shape[1:],
data_ranges=[array1_range, array2_range],
)
dataset_intensity_masked = numpy.where(mask_slice, dataset_intensity, Dummy)
if dataset_variance is not None:
dataset_variance_masked = numpy.where(mask_slice, dataset_variance, Dummy)
else:
dataset_variance_masked = None
# Route 1: average = sum(dataset_intensity) / counts; var(average) = sum(variance) / (counts * counts)
# dataset_average_intensity = numpy.mean(
# dataset_intensity_masked,
# axis=axis_average,
# where=(dataset_intensity_masked != Dummy),
# )
dataset_mask = numpy.where(dataset_intensity_masked == Dummy, Dummy, 1)
sum_signal = numpy.sum(
dataset_intensity_masked,
axis=axis_average,
where=(dataset_intensity_masked != Dummy),
dtype=dataset_intensity.dtype,
)
counts = numpy.sum(
dataset_mask,
axis=axis_average,
where=(dataset_intensity_masked != Dummy),
dtype=dataset_intensity.dtype,
)
dataset_average_intensity = numpy.where(counts == 0, numpy.nan, sum_signal / counts)
if dataset_variance_masked is not None:
sum_variance = numpy.sum(
dataset_variance_masked,
axis=axis_average,
where=(dataset_variance_masked != Dummy),
dtype=dataset_intensity.dtype,
)
dataset_average_variance = numpy.where(
counts == 0, numpy.nan, sum_variance / (counts * counts)
)
dataset_average_sigma = numpy.where(
counts == 0, numpy.nan, numpy.sqrt(sum_variance) / counts
)
else:
dataset_average_variance = None
dataset_average_sigma = None
return (
dataset_average_intensity,
dataset_average_variance,
dataset_average_sigma,
dataset_intensity_masked,
)