Source code for ewoksid02.utils.average

import logging
from typing import List, Optional, Tuple

import numexpr
import numpy

logger = logging.getLogger(__name__)


[docs] def get_centrosymmetric_limits(azimuth_range: List[float]) -> Tuple[List[float], ...]: """ Calculate the centrosymmetric limits for a given azimuth range. Inputs: azimuth_range (List[float]): The azimuth range as a list of two values. Outputs: Tuple[List[float], ...]: A tuple containing one or two lists of centrosymmetric limits. """ if azimuth_range[0] == azimuth_range[1]: return ([-180, 180],) lim = numpy.array(azimuth_range) if lim.prod() <= 0: lim = max(abs(lim)) return ([max(-lim, -180), min(lim, 180)],) else: a = min(max(abs(lim)), 180) b = max(min(abs(lim)), -180) return ([-a, -b], [b, a])
[docs] def get_array_limit( azimuth_range: List[float], azimuth_array: numpy.ndarray ) -> Tuple[int, int]: """ Get the array indices corresponding to the azimuth range. Inputs: azimuth_range (List[float]): The azimuth range as a list of two values. azimuth_array (numpy.ndarray): The array of azimuth values. Outputs: Tuple[int, int]: The indices corresponding to the azimuth range. """ arg_min = numpy.argmin(abs(azimuth_array - azimuth_range[0])) arg_max = numpy.argmin(abs(azimuth_array - azimuth_range[1])) return (arg_min, arg_max)
[docs] def calculate_average( dataset_intensity: Optional[numpy.ndarray], dataset_sum_signal: Optional[numpy.ndarray] = None, dataset_sum_norm: Optional[numpy.ndarray] = None, dataset_sum_variance: Optional[numpy.ndarray] = None, array_ranges: Optional[List[Tuple[int, int]]] = None, Dummy: int = -10, calculate_variance: bool = False, **kwargs, ) -> Tuple[ Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], ]: """ Calculate the average intensity, signal normalization, variance, and sigma for a dataset. Inputs: dataset_intensity (Optional[numpy.ndarray]): The intensity dataset. dataset_sum_signal (Optional[numpy.ndarray], optional): The sum signal dataset. Defaults to None. dataset_sum_norm (Optional[numpy.ndarray], optional): The sum normalization dataset. Defaults to None. dataset_sum_variance (Optional[numpy.ndarray], optional): The sum variance dataset. Defaults to None. array_ranges (Optional[List[Tuple[int, int]]], optional): The ranges for averaging. Defaults to None. Dummy (int, optional): The dummy value to replace invalid data. Defaults to -10. Outputs: Tuple[Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray], Optional[numpy.ndarray]]: The average intensity, signal normalization, variance, and sigma datasets. """ sum_signal = None sum_norm = None sum_variance = None dataset_average_intensity = numpy.zeros( (dataset_intensity.shape[0], dataset_intensity.shape[-1]) ) dataset_average_signal_norm = numpy.zeros_like(dataset_average_intensity) dataset_average_variance = numpy.zeros_like(dataset_average_intensity) dataset_average_sigma = numpy.zeros_like(dataset_average_intensity) if dataset_sum_signal is not None: sum_signal = numpy.zeros( (dataset_sum_signal.shape[0], dataset_sum_signal.shape[-1]) ) if dataset_sum_norm is not None: sum_norm = numpy.zeros((dataset_sum_norm.shape[0], dataset_sum_norm.shape[-1])) if dataset_sum_variance is not None: sum_variance = numpy.zeros( (dataset_sum_variance.shape[0], dataset_sum_variance.shape[-1]) ) array_ranges = array_ranges or [(0, -1)] mask_slice = numpy.zeros_like(dataset_intensity) for array_range in array_ranges: if not isinstance(array_range, (list, tuple)): logger.error(f"Array range {array_range} is not a list or tuple.") continue if len(array_range) != 2: logger.error( f"Array range {array_range} is not valid. It should contain two elements." ) continue lim_0 = int(array_range[0]) lim_1 = int(array_range[1]) mask_slice[:, lim_0:lim_1, :] = 1 dataset_average_intensity = numexpr.evaluate( "where(mask_slice == 0, Dummy, dataset_intensity)" ) dataset_average_intensity[dataset_average_intensity == Dummy] = numpy.nan dataset_average_intensity = numpy.nanmean(dataset_average_intensity, axis=1) if dataset_sum_signal is not None: sum_signal = numexpr.evaluate( "where(mask_slice == 0, Dummy, dataset_sum_signal)" ) sum_signal[sum_signal == Dummy] = numpy.nan sum_signal = numpy.nansum(sum_signal, axis=1) if dataset_sum_norm is not None: sum_norm = numexpr.evaluate("where(mask_slice == 0, Dummy, dataset_sum_norm)") sum_norm[sum_norm == Dummy] = numpy.nan sum_norm = numpy.nansum(sum_norm, axis=1) if dataset_sum_variance is not None: sum_variance = numexpr.evaluate( "where(mask_slice == 0, Dummy, dataset_sum_variance)" ) sum_variance[sum_variance == Dummy] = numpy.nan sum_variance = numpy.nansum(sum_variance, axis=1) if sum_signal is not None and sum_norm is not None: dataset_average_signal_norm = numexpr.evaluate( "where(sum_norm <= 0.0, sum_norm, sum_signal / sum_norm)" ) if sum_variance is not None and sum_norm is not None: dataset_average_variance = numexpr.evaluate( "where(sum_norm <= 0.0, sum_norm, sum_variance / (sum_norm * sum_norm))" ) dataset_average_sigma = numexpr.evaluate( "where(sum_norm <= 0.0, sum_norm, sqrt(sum_variance) / sum_norm)" ) return ( dataset_average_intensity, dataset_average_signal_norm, dataset_average_variance, dataset_average_sigma, )