Source code for ewoksid02.utils.normalization

import logging
import time
from typing import Optional
from functools import lru_cache

import numexpr
import numpy
from pyFAI.version import MAJOR

if MAJOR >= 2025:
    from pyFAI.integrator.azimuthal import AzimuthalIntegrator
else:
    from pyFAI.azimuthalIntegrator import AzimuthalIntegrator
try:
    import cupy

    from .cupyutils import log_allocated_gpu_memory
except ImportError:
    cupy = None
    CUPY_AVAILABLE = False
    CUPY_MEM_POOL = None
else:
    CUPY_AVAILABLE = True
    CUPY_MEM_POOL = cupy.get_default_memory_pool()

from ewoksid02.utils.pyfai import _get_persistent_azimuthal_integrator

from .io import (
    get_persistent_array_mask,
    get_persistent_array_dark,
    get_persistent_array_flat,
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def _normalize_dataset_cupy(
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask=None,
    array_dark=None,
    array_normalization=None,
    dummy: float = -10,
    datatype: str = "float32",
    **kwargs,
):
    log_allocated_gpu_memory()
    t1 = time.perf_counter()
    dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
    # logger.info(f"Time to initialize numpy output arrays: {t2 - t1:.2f} seconds")

    # One by one
    nb_frames = len(dataset_signal)
    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)

    for index_frame, data_signal, monitor_value in zip(
        range(nb_frames), dataset_signal, monitor_values
    ):
        data_signal_cu = cupy.asarray(data_signal, dtype=datatype)

        # Remove dark from raw (not from norm)
        if array_dark is not None:
            data_signal_cu -= array_dark

        if dataset_variance is not None:
            data_variance_cu = cupy.asarray(
                dataset_variance[index_frame], dtype=datatype
            )
        else:
            data_variance_cu = None

        # Get final normalization array
        normalization_array_cu_frame = array_normalization * monitor_value

        # Normalize the signal
        data_signal_normalized_cu = cupy.where(
            normalization_array_cu_frame == 0.0,
            dummy,
            data_signal_cu / normalization_array_cu_frame,
        )
        data_variance_normalized_cu = None
        data_sigma_normalized_cu = None

        # Normalize the variance and sigma
        if data_variance_cu is not None:
            data_variance_normalized_cu = cupy.where(
                normalization_array_cu_frame == 0.0,
                dummy,
                data_variance_cu
                / (normalization_array_cu_frame * normalization_array_cu_frame),
            )
            data_sigma_normalized_cu = cupy.where(
                normalization_array_cu_frame == 0.0,
                dummy,
                cupy.sqrt(data_variance_cu) / cupy.abs(normalization_array_cu_frame),
            )

        # Mask all results
        if array_mask is not None:
            data_signal_normalized_cu = cupy.where(
                array_mask, dummy, data_signal_normalized_cu
            )
            if data_variance_cu is not None:
                data_variance_normalized_cu = cupy.where(
                    array_mask, dummy, data_variance_normalized_cu
                )
                data_sigma_normalized_cu = cupy.where(
                    array_mask, dummy, data_sigma_normalized_cu
                )

        # Allocate numpy arrays in place
        dataset_normalized_signal[index_frame] = data_signal_normalized_cu.get()
        if data_variance_normalized_cu is not None:
            dataset_normalized_variance[index_frame] = data_variance_normalized_cu.get()
        if data_sigma_normalized_cu is not None:
            dataset_normalized_sigma[index_frame] = data_sigma_normalized_cu.get()

    t3 = time.perf_counter()
    logger.debug(f"Total time for normalization: {t3 - t1:.2f} seconds")
    return (
        dataset_normalized_signal,
        dataset_normalized_variance,
        dataset_normalized_sigma,
    )


def _normalize_dataset_numpy(
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask: numpy.ndarray = None,
    array_dark: numpy.ndarray = None,
    array_normalization: numpy.ndarray = None,
    dummy: float = -10,
    datatype: str = "float32",
    **kwargs,
):
    dataset_signal_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_variance_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_sigma_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)

    nb_frames = len(dataset_signal)
    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
    if len(monitor_values) != nb_frames:
        monitor_values = monitor_values[0:nb_frames]

    # Remove dark from raw (not from norm)
    if array_dark is not None:
        dataset_signal -= array_dark

    # Get final normalization dataset with nb frames
    dataset_normalization = (
        numpy.broadcast_to(array_normalization, dataset_signal.shape).copy()
        * monitor_values[:, None, None]
    )

    # Normalize the signal
    dataset_signal_normalized = numpy.where(
        dataset_normalization == 0.0,
        dummy,
        dataset_signal / dataset_normalization,
    )

    # Normalize the variance and sigma
    if dataset_variance is not None:
        dataset_variance_normalized = numpy.where(
            dataset_normalization == 0.0,
            dummy,
            dataset_variance / (dataset_normalization * dataset_normalization),
        )
        dataset_sigma_normalized = numpy.where(
            dataset_normalization == 0.0,
            dummy,
            numpy.sqrt(dataset_variance) / numpy.abs(dataset_normalization),
        )
    else:
        dataset_variance_normalized = None
        dataset_sigma_normalized = None

    # Mask all results
    if array_mask is not None:
        dataset_signal_normalized = numpy.where(
            array_mask, dummy, dataset_signal_normalized
        )
        if dataset_variance_normalized is not None:
            dataset_variance_normalized = numpy.where(
                array_mask, dummy, dataset_variance_normalized
            )
        if dataset_sigma_normalized is not None:
            dataset_sigma_normalized = numpy.where(
                array_mask, dummy, dataset_sigma_normalized
            )

    return (
        dataset_signal_normalized,
        dataset_variance_normalized,
        dataset_sigma_normalized,
    )


def _normalize_dataset_opencl(
    detector_distortion,
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask: numpy.ndarray = None,
    array_flat: numpy.ndarray = None,
    array_dark: numpy.ndarray = None,
    array_polarization: numpy.ndarray = None,
    array_solidangle: numpy.ndarray = None,
    normalization_factor: float = 1.0,
    Dummy: float = -10,
    DDummy: float = 0.01,
    datatype: str = "float32",
    **kwargs,
):
    from pyFAI.distortion import Distortion

    dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)

    nb_frames = len(dataset_signal)
    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)

    distortion = Distortion(
        detector_distortion, method="csr", device="gpu", mask=array_mask, empty=Dummy
    )
    distortion.reset(prepare=True)  # enforce initialization

    if distortion.integrator is None:
        distortion.calc_init()

    for index_frame, data_signal, monitor_value in zip(
        range(nb_frames), dataset_signal, monitor_values
    ):
        variance = None
        if dataset_variance is not None:
            variance = dataset_variance[index_frame]
            result = distortion.integrator.integrate_ng(
                data=data_signal,
                variance=variance,
                flat=array_flat,
                dark=array_dark,
                solidangle=array_solidangle,
                polarization=array_polarization,
                dummy=Dummy,
                delta_dummy=DDummy,
                normalization_factor=monitor_value / normalization_factor,
                out_merged=False,
            )
            dataset_normalized_signal[index_frame] = result.intensity.reshape(
                data_signal.shape
            )
            if result.variance is not None:
                dataset_normalized_variance[index_frame] = result.variance
            if result.sigma is not None:
                dataset_normalized_sigma[index_frame] = result.sigma.reshape(
                    data_signal.shape
                )

    return (
        dataset_normalized_signal,
        dataset_normalized_variance,
        dataset_normalized_sigma,
    )


def _normalize_dataset_cython(
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask: numpy.ndarray = None,
    array_flat: numpy.ndarray = None,
    array_dark: numpy.ndarray = None,
    array_polarization: numpy.ndarray = None,
    array_solidangle: numpy.ndarray = None,
    normalization_factor: float = 1.0,
    Dummy: float = -10,
    DDummy: float = 0.01,
    datatype: str = "float32",
    **kwargs,
):
    from pyFAI.ext.preproc import preproc as preproc_cy

    dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)

    preproc_params = {
        "dark": array_dark,
        "flat": array_flat,
        "mask": array_mask,
        "solidangle": array_solidangle,
        "polarization": array_polarization,
        "dummy": Dummy,
        "delta_dummy": DDummy,
        "empty": None,
    }

    nb_frames = len(dataset_signal)

    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)

    for index_frame, data_signal, monitor_value in zip(
        range(nb_frames), dataset_signal, monitor_values
    ):
        if dataset_variance is None:
            dataset_normalized_signal[index_frame] = preproc_cy(
                raw=data_signal,
                variance=None,
                normalization_factor=monitor_value / normalization_factor,
                **preproc_params,
            )
        else:
            proc_data = preproc_cy(
                raw=data_signal,
                variance=dataset_variance[index_frame],
                normalization_factor=monitor_value / normalization_factor,
                **preproc_params,
            )
            pp_signal = proc_data[:, :, 0]  # noqa
            pp_variance = proc_data[:, :, 1]  # noqa
            pp_normalisation = proc_data[:, :, 2]  # noqa

            dataset_normalized_signal[index_frame] = numexpr.evaluate(
                f"where(pp_normalisation == 0.0, {Dummy}, pp_signal / pp_normalisation)"
            )
            dataset_normalized_variance[index_frame] = numexpr.evaluate(
                f"where(pp_normalisation == 0.0, {Dummy}, pp_variance / (pp_normalisation * pp_normalisation))"
            )
            dataset_normalized_sigma[index_frame] = numexpr.evaluate(
                f"where(pp_normalisation == 0.0, {Dummy}, sqrt(pp_variance) / abs(pp_normalisation))"
            )

    return (
        dataset_normalized_signal,
        dataset_normalized_variance,
        dataset_normalized_sigma,
    )


ALGORITHMS_NORMALIZATION = {
    "cython": {
        "method": _normalize_dataset_cython,
        "use_cupy": False,
        "name": "cython",
    },
    "opencl": {
        "method": _normalize_dataset_opencl,
        "use_cupy": False,
        "name": "opencl",
    },
    "numpy": {"method": _normalize_dataset_numpy, "use_cupy": False, "name": "numpy"},
    "cupy": {"method": _normalize_dataset_cupy, "use_cupy": True, "name": "cupy"},
}
DEFAULT_ALGORITHM_NORMALIZATION = "numpy"


def _parse_norm_algorithm(algorithm: str) -> dict:
    if algorithm not in ALGORITHMS_NORMALIZATION:
        logger.warning(
            f"Algorithm '{algorithm}' is not available. Using '{DEFAULT_ALGORITHM_NORMALIZATION}' instead."
        )
        algorithm = DEFAULT_ALGORITHM_NORMALIZATION
    elif algorithm == "cupy" and not CUPY_AVAILABLE:
        logger.warning(
            f"CuPy is not available. Using {DEFAULT_ALGORITHM_NORMALIZATION} instead."
        )
        algorithm = DEFAULT_ALGORITHM_NORMALIZATION
    logger.debug(f"Performing normalization with algorithm: {algorithm}")
    return ALGORITHMS_NORMALIZATION[algorithm]


def _parse_datatype(datatype):
    if datatype in ("float32", numpy.float32):
        datatype = numpy.dtype("float32")
    elif datatype in ("float64", numpy.float64):
        datatype = numpy.dtype("float64")
    return datatype


[docs] def get_array_polarization( azimuthal_integrator: AzimuthalIntegrator, polarization_factor: float = None, polarization_axis: int = 0, use_cupy: bool = False, ): array_polarization = azimuthal_integrator.polarization( factor=polarization_factor, axis_offset=polarization_axis, ) if use_cupy and CUPY_AVAILABLE: return cupy.asarray(array_polarization) return array_polarization
@lru_cache(maxsize=5) def _get_persistent_array_polarization( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, polarization_factor: float = None, polarization_axis: int = 0, use_cupy: bool = False, ): logger.info( f"No cache hit. Creating new polarization array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {polarization_factor=}, {polarization_axis=}, {use_cupy=}" ) azimuthal_integrator = _get_persistent_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=dist, poni1=poni1, poni2=poni2, psize_1=psize_1, psize_2=psize_2, rot1=rot1, rot2=rot2, rot3=rot3, wavelength=wavelength, ) return get_array_polarization( azimuthal_integrator=azimuthal_integrator, polarization_factor=polarization_factor, polarization_axis=polarization_axis, use_cupy=use_cupy, )
[docs] def get_array_solidangle( azimuthal_integrator: AzimuthalIntegrator, absolute: bool = True, use_cupy: bool = False, ): array_solidangle = azimuthal_integrator.solidAngleArray(absolute=absolute) if use_cupy and CUPY_AVAILABLE: return cupy.asarray(array_solidangle) return array_solidangle
@lru_cache(maxsize=5) def _get_persistent_array_solidangle( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, absolute: bool = True, use_cupy: bool = False, ): logger.info( f"No cache hit. Creating new solid angle array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {absolute=}, {use_cupy=}" ) azimuthal_integrator = _get_persistent_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=dist, poni1=poni1, poni2=poni2, psize_1=psize_1, psize_2=psize_2, rot1=rot1, rot2=rot2, rot3=rot3, wavelength=wavelength, ) return get_array_solidangle( azimuthal_integrator=azimuthal_integrator, absolute=absolute, use_cupy=use_cupy, ) @lru_cache(maxsize=5) def _get_persistent_array_normalization( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, normalization_factor: float = None, polarization_factor: float = None, polarization_axis: int = 0, filename_flat: str = None, filename_mask: str = None, absolute_solidangle: bool = True, dummy: int = None, delta_dummy: float = None, binning: tuple = (1, 1), use_cupy: bool = False, datatype: str = "float32", ): logger.info("No cache hit. Creating new normalization array.") if use_cupy and CUPY_AVAILABLE: normalization_array = cupy.ones( data_signal_shape, dtype=datatype, ) else: normalization_array = numpy.ones( data_signal_shape, dtype=datatype, ) array_polarization = _get_persistent_array_polarization( data_signal_shape=data_signal_shape, dist=dist, wavelength=wavelength, psize_1=psize_1, psize_2=psize_2, poni1=poni1, poni2=poni2, rot1=rot1, rot2=rot2, rot3=rot3, polarization_factor=polarization_factor, polarization_axis=polarization_axis, use_cupy=use_cupy, ) if array_polarization is not None: normalization_array *= array_polarization array_flat_field = get_persistent_array_flat( filename_flat=filename_flat, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, filename_mask=filename_mask, use_cupy=use_cupy, ) if array_flat_field is not None: normalization_array *= array_flat_field array_solidangle = _get_persistent_array_solidangle( data_signal_shape=data_signal_shape, dist=dist, wavelength=wavelength, psize_1=psize_1, psize_2=psize_2, poni1=poni1, poni2=poni2, rot1=rot1, rot2=rot2, rot3=rot3, absolute=absolute_solidangle, use_cupy=use_cupy, ) if array_solidangle is not None: normalization_array *= array_solidangle if normalization_factor is not None: normalization_array /= normalization_factor array_mask = get_persistent_array_mask( filename_mask=filename_mask, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, ) if array_mask is not None and dummy: if use_cupy and CUPY_AVAILABLE: normalization_array = cupy.where(array_mask, dummy, normalization_array) else: normalization_array = numpy.where(array_mask, dummy, normalization_array) return normalization_array
[docs] def calculate_dataset_variance( dataset_signal: numpy.ndarray, variance_formula: str = None, dark: numpy.ndarray = None, ): if not variance_formula: return variance_function = numexpr.NumExpr( variance_formula, [("data", numpy.float64), ("dark", numpy.float64)] ) dataset_variance = variance_function(dataset_signal, 0 if dark is None else dark) return dataset_variance
[docs] def normalize_dataset( azimuthal_integrator: AzimuthalIntegrator, dataset_signal: numpy.ndarray, monitor_values: numpy.ndarray = None, NormalizationFactor: float = 1.0, filename_mask_normalization: str = None, filename_dark_normalization: str = None, filename_flat_normalization: str = None, absolute_solidangle: bool = True, binning: tuple = (1, 1), Dummy=None, DDummy=None, variance_formula=None, polarization_factor=None, polarization_axis_offset=0, datatype="float32", algorithm_normalization: str = "cython", dark_filter: str = "quantil", dark_filter_quantil_lower: float = 0.1, dark_filter_quantil_upper: float = 0.9, **kwargs, ): t0 = time.perf_counter() datatype = _parse_datatype(datatype=datatype) data_signal_shape = azimuthal_integrator.detector.shape algorithm_normalization = _parse_norm_algorithm(algorithm=algorithm_normalization) use_cupy = algorithm_normalization["use_cupy"] common_params = { "use_cupy": use_cupy, "data_signal_shape": data_signal_shape, "binning": binning, "dummy": DDummy, "delta_dummy": DDummy, } array_mask = get_persistent_array_mask( filename_mask=filename_mask_normalization, **common_params, ) array_flat = get_persistent_array_flat( filename_flat=filename_flat_normalization, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=Dummy, delta_dummy=DDummy, filename_mask=filename_mask_normalization, use_cupy=use_cupy, ) array_dark = get_persistent_array_dark( filename_dark=filename_dark_normalization, filename_mask=filename_mask_normalization, dark_filter=dark_filter, dark_filter_quantil_lower=dark_filter_quantil_lower, dark_filter_quantil_upper=dark_filter_quantil_upper, **common_params, ) ai_params = { "data_signal_shape": azimuthal_integrator.detector.shape, "dist": azimuthal_integrator.dist, "wavelength": azimuthal_integrator.wavelength, "psize_1": azimuthal_integrator.detector.pixel1, "psize_2": azimuthal_integrator.detector.pixel2, "poni1": azimuthal_integrator.poni1, "poni2": azimuthal_integrator.poni2, "rot1": azimuthal_integrator.rot1, "rot2": azimuthal_integrator.rot2, "rot3": azimuthal_integrator.rot3, } array_polarization = _get_persistent_array_polarization( **ai_params, polarization_factor=polarization_factor, polarization_axis=polarization_axis_offset, use_cupy=use_cupy, ) array_solidangle = _get_persistent_array_solidangle( **ai_params, absolute=True, use_cupy=use_cupy, ) # Patch due to inconsistent chiArray and position_array methods in pyFAI if "chi_center" in azimuthal_integrator._cached_array: azimuthal_integrator._cached_array.pop("chi_center") array_normalization = _get_persistent_array_normalization( normalization_factor=NormalizationFactor, polarization_factor=polarization_factor, polarization_axis=polarization_axis_offset, filename_flat=filename_flat_normalization, filename_mask=filename_mask_normalization, absolute_solidangle=absolute_solidangle, datatype=datatype, use_cupy=use_cupy, binning=binning, dummy=Dummy, delta_dummy=DDummy, **ai_params, ) if variance_formula and array_dark is not None: if use_cupy and CUPY_AVAILABLE: dark = array_dark.get() else: dark = array_dark else: dark = None dataset_variance = calculate_dataset_variance( dataset_signal=dataset_signal, variance_formula=variance_formula, dark=dark, ) params_normalization = { "dataset_signal": dataset_signal, "dataset_variance": dataset_variance, "monitor_values": monitor_values, "array_flat": array_flat, "array_dark": array_dark, "array_mask": array_mask, "array_solidangle": array_solidangle, "array_polarization": array_polarization, "array_normalization": array_normalization, "dummy": Dummy, "delta_dummy": DDummy, "datatype": datatype, "normalization_factor": NormalizationFactor, "detector_distortion": azimuthal_integrator.detector, } ( dataset_normalized_signal, dataset_normalized_variance, dataset_normalized_sigma, ) = algorithm_normalization["method"]( **params_normalization, ) t1 = time.perf_counter() logger.debug( f"Normalization completed in {t1 - t0:.2f} seconds using {algorithm_normalization['name']} algorithm." ) if dataset_normalized_variance is None: dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype) if dataset_normalized_sigma is None: dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype) return ( dataset_normalized_signal, dataset_normalized_variance, dataset_normalized_sigma, )
[docs] def calculate_normalization_values( monitor_values, azimuthal_integrator=None, psize_1=None, psize_2=None, dist=None, normalization_factor=None, ): if normalization_factor is None: normalization_factor = 1.0 if azimuthal_integrator is not None: psize_1 = psize_1 or azimuthal_integrator.detector.pixel1 psize_2 = psize_2 or azimuthal_integrator.detector.pixel2 dist = dist or azimuthal_integrator.dist if monitor_values is None or psize_1 is None or psize_2 is None or dist is None: return normalization_factor solid_angle = psize_1 * psize_2 / dist**2 return monitor_values * solid_angle / normalization_factor