Source code for ewoksid02.average

import logging
from typing import List, Optional, Tuple

import numpy

logger = logging.getLogger(__name__)


[docs] def get_centrosymmetric_limits(azimuth_range: List[float]) -> Tuple[List[float], ...]: """ Calculate the centrosymmetric limits for a given azimuth range. Inputs: azimuth_range (List[float]): The azimuth range as a list of two values. Outputs: Tuple[List[flo at], ...]: A tuple containing one or two lists of centrosymmetric limits. """ if azimuth_range[0] == azimuth_range[1]: return ([-180, 180],) lim = numpy.array(azimuth_range) if lim.prod() <= 0: lim = max(abs(lim)) return ([max(-lim, -180), min(lim, 180)],) else: a = min(max(abs(lim)), 180) b = max(min(abs(lim)), -180) return ([-a, -b], [b, a])
[docs] def get_array_limit( unit_range: List[float], unit_array: numpy.ndarray ) -> Tuple[int, int]: """ Get the array indices corresponding to the unit range. Inputs: unit_range (List[float]): The unit range as a list of two values. unit_array (numpy.ndarray): The array of unit values. Outputs: Tuple[int, int]: The indices corresponding to the unit range. """ arg_min = numpy.argmin(abs(unit_array - unit_range[0])) arg_max = numpy.argmin(abs(unit_array - unit_range[1])) return (arg_min, arg_max)
[docs] def create_mask_average( data_shape: tuple, data_ranges: List[List[Tuple[int, int]]], ): """ This method skips the first dimension as it is supposed to be frame dimension. True -> to keep False -> to mask """ mask_slice = numpy.ones(data_shape, dtype=bool) if len(data_ranges) != len(data_shape): logger.error( f"Number of dimensions in dataset_ranges ({len(data_ranges)}) does not match with data_shape" ) return mask_slice for ind_dim, ranges_dimension in enumerate(data_ranges): mask_ = numpy.zeros(data_shape, dtype=bool) idx = [slice(None)] * len(data_shape) for r in ranges_dimension: if not isinstance(r, (list, tuple)): logger.error(f"Array range {r} is not a list or tuple.") continue if len(r) != 2: logger.error( f"Array range {r} is not valid. It should contain two elements." ) continue idx[ind_dim] = slice(int(r[0]), int(r[1])) mask_[tuple(idx)] = True mask_slice &= mask_ return mask_slice
[docs] def calculate_average( dataset_intensity: Optional[numpy.ndarray], dataset_variance: Optional[numpy.ndarray] = None, dataset_sum_signal: Optional[numpy.ndarray] = None, dataset_sum_norm: Optional[numpy.ndarray] = None, dataset_sum_variance: Optional[numpy.ndarray] = None, axis_average: int = 1, array1_range: Optional[List[Tuple[int, int]]] = None, array2_range: Optional[List[Tuple[int, int]]] = None, Dummy: int = -10, calculate_variance: bool = False, **kwargs, ) -> Tuple[ Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], ]: """ Calculate the average intensity, signal normalization, variance, and sigma for a dataset. Inputs: dataset_intensity (Optional[numpy.ndarray]): The intensity dataset. dataset_sum_signal (Optional[numpy.ndarray], optional): The sum signal dataset. Defaults to None. dataset_sum_norm (Optional[numpy.ndarray], optional): The sum normalization dataset. Defaults to None. dataset_sum_variance (Optional[numpy.ndarray], optional): The sum variance dataset. Defaults to None. array_ranges (Optional[List[Tuple[int, int]]], optional): The ranges for averaging. Defaults to None. axis_average (int, optional): The axis along which to average. Defaults to 1. 1 is for azimuth axis, 2 is for radial axis. Dummy (int, optional): The dummy value to replace invalid data. Defaults to -10. Outputs: Tuple[Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray]]: The average intensity, signal normalization, variance, and sigma datasets. """ if axis_average >= dataset_intensity.ndim: raise ValueError( f"Axis {axis_average} is out of bounds for dataset with {dataset_intensity.ndim} dimensions." ) array1_range = array1_range or [(0, -1)] array2_range = array2_range or [(0, -1)] # Create the mask and apply to all dataset mask_slice = create_mask_average( data_shape=dataset_intensity.shape[1:], data_ranges=[array1_range, array2_range], ) dataset_intensity_masked = numpy.where(mask_slice, dataset_intensity, Dummy) if dataset_variance is not None: dataset_variance_masked = numpy.where(mask_slice, dataset_variance, Dummy) else: dataset_variance_masked = None # Route 1: average = sum(dataset_intensity) / counts; var(average) = sum(variance) / (counts * counts) # dataset_average_intensity = numpy.mean( # dataset_intensity_masked, # axis=axis_average, # where=(dataset_intensity_masked != Dummy), # ) dataset_mask = numpy.where(dataset_intensity_masked == Dummy, Dummy, 1) sum_signal = numpy.sum( dataset_intensity_masked, axis=axis_average, where=(dataset_intensity_masked != Dummy), dtype=dataset_intensity.dtype, ) counts = numpy.sum( dataset_mask, axis=axis_average, where=(dataset_intensity_masked != Dummy), dtype=dataset_intensity.dtype, ) dataset_average_intensity = numpy.where(counts == 0, numpy.nan, sum_signal / counts) if dataset_variance_masked is not None: sum_variance = numpy.sum( dataset_variance_masked, axis=axis_average, where=(dataset_variance_masked != Dummy), dtype=dataset_intensity.dtype, ) dataset_average_variance = numpy.where( counts == 0, numpy.nan, sum_variance / (counts * counts) ) dataset_average_sigma = numpy.where( counts == 0, numpy.nan, numpy.sqrt(sum_variance) / counts ) else: dataset_average_variance = None dataset_average_sigma = None return ( dataset_average_intensity, dataset_average_variance, dataset_average_sigma, dataset_intensity_masked, )