Source code for ewoksid02.normalization

from __future__ import annotations

import logging
import time
from functools import lru_cache
from typing import Optional

import numexpr
import numpy

from .utils.gpu import cupy, cupy_available, log_allocated_gpu_memory
from .utils.io import get_array_dark, get_array_flat, get_array_mask
from .utils.mathutils import calculate_dataset_variance
from .utils.pyfai import AzimuthalIntegrator, get_azimuthal_integrator

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def _normalize_dataset_cupy(
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask=None,
    array_dark=None,
    array_normalization=None,
    dummy: float = -10,
    datatype: str = "float32",
    **kwargs,
):
    log_allocated_gpu_memory()
    t1 = time.perf_counter()
    dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalization = numpy.zeros_like(dataset_signal, dtype=datatype)

    # One by one
    nb_frames = len(dataset_signal)
    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)

    for index_frame, data_signal, monitor_value in zip(
        range(nb_frames), dataset_signal, monitor_values
    ):
        data_signal_cu = cupy.asarray(data_signal, dtype=datatype)

        # Remove dark from raw (not from norm)
        if array_dark is not None:
            data_signal_cu -= array_dark

        if dataset_variance is not None:
            data_variance_cu = cupy.asarray(
                dataset_variance[index_frame], dtype=datatype
            )
        else:
            data_variance_cu = None

        # Get final normalization array
        normalization_array_cu_frame = cupy.where(
            array_normalization == dummy, dummy, array_normalization * monitor_value
        )
        # Replace zeros in normalization array with dummy to avoid division by zero
        cupy.copyto(
            normalization_array_cu_frame,
            dummy,
            where=array_normalization == 0.0,
        )

        # Normalize the signal
        data_signal_normalized_cu = cupy.where(
            normalization_array_cu_frame == dummy,
            normalization_array_cu_frame,
            data_signal_cu / normalization_array_cu_frame,
        )
        data_variance_normalized_cu = None
        data_sigma_normalized_cu = None

        # Normalize the variance and sigma
        if data_variance_cu is not None:
            data_variance_normalized_cu = cupy.where(
                normalization_array_cu_frame == dummy,
                dummy,
                data_variance_cu
                / (normalization_array_cu_frame * normalization_array_cu_frame),
            )
            data_sigma_normalized_cu = cupy.where(
                normalization_array_cu_frame == dummy,
                dummy,
                cupy.sqrt(data_variance_cu) / cupy.abs(normalization_array_cu_frame),
            )

        data_signal_normalized_cu = cupy.where(
            data_signal_cu == dummy, dummy, data_signal_normalized_cu
        )

        # Allocate numpy arrays in place
        dataset_normalized_signal[index_frame] = data_signal_normalized_cu.get()
        if data_variance_normalized_cu is not None:
            dataset_normalized_variance[index_frame] = data_variance_normalized_cu.get()
        if data_sigma_normalized_cu is not None:
            dataset_normalized_sigma[index_frame] = data_sigma_normalized_cu.get()
        dataset_normalization[index_frame] = normalization_array_cu_frame.get()

    t3 = time.perf_counter()
    logger.debug(f"Total time for normalization: {t3 - t1:.2f} seconds")
    return (
        dataset_normalized_signal,
        dataset_normalized_variance,
        dataset_normalized_sigma,
        dataset_normalization,
    )


def _normalize_dataset_numpy(
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask: numpy.ndarray = None,
    array_dark: numpy.ndarray = None,
    array_normalization: numpy.ndarray = None,
    dummy: float = -10,
    datatype: str = "float32",
    **kwargs,
):
    dataset_signal_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_variance_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_sigma_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)

    nb_frames = len(dataset_signal)
    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
    if len(monitor_values) != nb_frames:
        monitor_values = monitor_values[0:nb_frames]

    # Remove dark from raw (not from norm)
    if array_dark is not None:
        dataset_signal -= array_dark

    # Get final normalization dataset with nb frames
    dataset_normalization = (
        numpy.broadcast_to(array_normalization, dataset_signal.shape).copy()
        * monitor_values[:, None, None]
    )

    # Normalize the signal
    dataset_signal_normalized = numpy.where(
        dataset_normalization == 0.0,
        dummy,
        dataset_signal / dataset_normalization,
    )

    # Normalize the variance and sigma
    if dataset_variance is not None:
        dataset_variance_normalized = numpy.where(
            dataset_normalization == 0.0,
            dummy,
            dataset_variance / (dataset_normalization * dataset_normalization),
        )
        dataset_sigma_normalized = numpy.where(
            dataset_normalization == 0.0,
            dummy,
            numpy.sqrt(dataset_variance) / numpy.abs(dataset_normalization),
        )
    else:
        dataset_variance_normalized = None
        dataset_sigma_normalized = None

    # Mask all results
    if array_mask is not None:
        dataset_signal_normalized = numpy.where(
            array_mask, dummy, dataset_signal_normalized
        )
        if dataset_variance_normalized is not None:
            dataset_variance_normalized = numpy.where(
                array_mask, dummy, dataset_variance_normalized
            )
        if dataset_sigma_normalized is not None:
            dataset_sigma_normalized = numpy.where(
                array_mask, dummy, dataset_sigma_normalized
            )

    return (
        dataset_signal_normalized,
        dataset_variance_normalized,
        dataset_sigma_normalized,
        dataset_normalization,
    )


def _normalize_dataset_opencl(
    detector_distortion,
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask: numpy.ndarray = None,
    array_flat: numpy.ndarray = None,
    array_dark: numpy.ndarray = None,
    array_polarization: numpy.ndarray = None,
    array_solidangle: numpy.ndarray = None,
    normalization_factor: float = 1.0,
    Dummy: float = -10,
    DDummy: float = 0.01,
    datatype: str = "float32",
    **kwargs,
):
    from pyFAI.distortion import Distortion

    dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalization = numpy.zeros_like(dataset_signal, dtype=datatype)  # noqa

    nb_frames = len(dataset_signal)
    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)

    distortion = Distortion(
        detector_distortion, method="csr", device="gpu", mask=array_mask, empty=Dummy
    )
    distortion.reset(prepare=True)  # enforce initialization

    if distortion.integrator is None:
        distortion.calc_init()

    for index_frame, data_signal, monitor_value in zip(
        range(nb_frames), dataset_signal, monitor_values
    ):
        variance = None
        if dataset_variance is not None:
            variance = dataset_variance[index_frame]
            result = distortion.integrator.integrate_ng(
                data=data_signal,
                variance=variance,
                flat=array_flat,
                dark=array_dark,
                solidangle=array_solidangle,
                polarization=array_polarization,
                dummy=Dummy,
                delta_dummy=DDummy,
                normalization_factor=monitor_value / normalization_factor,
                out_merged=False,
            )
            dataset_normalized_signal[index_frame] = result.intensity.reshape(
                data_signal.shape
            )
            if result.variance is not None:
                dataset_normalized_variance[index_frame] = result.variance
            if result.sigma is not None:
                dataset_normalized_sigma[index_frame] = result.sigma.reshape(
                    data_signal.shape
                )
            # TODO get normalization array from distortion if possible

    return (
        dataset_normalized_signal,
        dataset_normalized_variance,
        dataset_normalized_sigma,
        dataset_normalization,
    )


def _normalize_dataset_cython(
    dataset_signal: numpy.ndarray,
    dataset_variance: Optional[numpy.ndarray] = None,
    monitor_values: numpy.ndarray = None,
    array_mask: numpy.ndarray = None,
    array_flat: numpy.ndarray = None,
    array_dark: numpy.ndarray = None,
    array_polarization: numpy.ndarray = None,
    array_solidangle: numpy.ndarray = None,
    normalization_factor: float = 1.0,
    Dummy: float = -10,
    DDummy: float = 0.01,
    datatype: str = "float32",
    **kwargs,
):
    from pyFAI.ext.preproc import preproc as preproc_cy

    dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
    dataset_normalization = numpy.zeros_like(dataset_signal, dtype=datatype)

    preproc_params = {
        "dark": array_dark,
        "flat": array_flat,
        "mask": array_mask,
        "solidangle": array_solidangle,
        "polarization": array_polarization,
        "dummy": Dummy,
        "delta_dummy": DDummy,
        "empty": None,
    }

    nb_frames = len(dataset_signal)

    if monitor_values is None:
        monitor_values = numpy.ones(nb_frames, dtype=datatype)
    elif isinstance(monitor_values, (int, float)):
        monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)

    for index_frame, data_signal, monitor_value in zip(
        range(nb_frames), dataset_signal, monitor_values
    ):
        if dataset_variance is None:
            dataset_normalized_signal[index_frame] = preproc_cy(
                raw=data_signal,
                variance=None,
                normalization_factor=monitor_value / normalization_factor,
                **preproc_params,
            )
        else:
            proc_data = preproc_cy(
                raw=data_signal,
                variance=dataset_variance[index_frame],
                normalization_factor=monitor_value / normalization_factor,
                **preproc_params,
            )
            pp_signal = proc_data[:, :, 0]  # noqa
            pp_variance = proc_data[:, :, 1]  # noqa
            pp_normalisation = proc_data[:, :, 2]  # noqa

            dataset_normalized_signal[index_frame] = numexpr.evaluate(
                f"where(pp_normalisation == 0.0, {Dummy}, pp_signal / pp_normalisation)"
            )
            dataset_normalized_variance[index_frame] = numexpr.evaluate(
                f"where(pp_normalisation == 0.0, {Dummy}, pp_variance / (pp_normalisation * pp_normalisation))"
            )
            dataset_normalized_sigma[index_frame] = numexpr.evaluate(
                f"where(pp_normalisation == 0.0, {Dummy}, sqrt(pp_variance) / abs(pp_normalisation))"
            )
            dataset_normalization[index_frame] = pp_normalisation

    return (
        dataset_normalized_signal,
        dataset_normalized_variance,
        dataset_normalized_sigma,
        dataset_normalization,
    )


ALGORITHMS_NORMALIZATION = {
    "cython": {
        "method": _normalize_dataset_cython,
        "use_cupy": False,
        "name": "cython",
    },
    "opencl": {
        "method": _normalize_dataset_opencl,
        "use_cupy": False,
        "name": "opencl",
    },
    "numpy": {"method": _normalize_dataset_numpy, "use_cupy": False, "name": "numpy"},
    "cupy": {"method": _normalize_dataset_cupy, "use_cupy": True, "name": "cupy"},
}
DEFAULT_ALGORITHM_NORMALIZATION = "numpy"


def _parse_norm_algorithm(algorithm: str) -> dict:
    if algorithm not in ALGORITHMS_NORMALIZATION:
        logger.warning(
            f"Algorithm '{algorithm}' is not available. Using '{DEFAULT_ALGORITHM_NORMALIZATION}' instead."
        )
        algorithm = DEFAULT_ALGORITHM_NORMALIZATION
    elif algorithm == "cupy" and not cupy_available():
        logger.warning(
            f"CuPy is not available. Using {DEFAULT_ALGORITHM_NORMALIZATION} instead."
        )
        algorithm = DEFAULT_ALGORITHM_NORMALIZATION
    logger.debug(f"Performing normalization with algorithm: {algorithm}")
    return ALGORITHMS_NORMALIZATION[algorithm]


[docs] def normalize_dataset( azimuthal_integrator: AzimuthalIntegrator, dataset_signal: numpy.ndarray, monitor_values: numpy.ndarray = None, NormalizationFactor: float = 1.0, filename_mask: str = None, filename_dark: str = None, filename_flat: str = None, absolute_solidangle: bool = True, correct_solidangle: bool = True, correct_polarization: bool = True, binning: tuple = (1, 1), Dummy=None, DDummy=None, variance_formula=None, polarization_factor=None, polarization_axis_offset=0, datatype="float32", algorithm: str = "cython", dark_filter: str = "quantil", dark_filter_quantil_lower: float = 0.1, dark_filter_quantil_upper: float = 0.9, **kwargs, ): t0 = time.perf_counter() datatype = _parse_datatype(datatype=datatype) data_signal_shape = azimuthal_integrator.detector.shape algorithm = _parse_norm_algorithm(algorithm=algorithm) use_cupy = algorithm["use_cupy"] array_mask = get_array_mask( filename_mask=filename_mask, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, persistent=True, ) array_flat = get_array_flat( filename_flat=filename_flat, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=Dummy, delta_dummy=DDummy, filename_mask=filename_mask, use_cupy=use_cupy, persistent=True, ) array_dark = get_array_dark( filename_dark=filename_dark, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=Dummy, delta_dummy=DDummy, filename_mask=filename_mask, dark_filter=dark_filter, dark_filter_quantil_lower=dark_filter_quantil_lower, dark_filter_quantil_upper=dark_filter_quantil_upper, use_cupy=use_cupy, persistent=True, ) array_polarization = get_array_polarization( azimuthal_integrator=azimuthal_integrator, polarization_factor=polarization_factor, polarization_axis=polarization_axis_offset, use_cupy=use_cupy, correct=correct_polarization, persistent=True, ) array_solidangle = get_array_solidangle( azimuthal_integrator=azimuthal_integrator, absolute=absolute_solidangle, correct=correct_solidangle, use_cupy=use_cupy, persistent=True, ) # Patch due to inconsistent chiArray and position_array methods in pyFAI if "chi_center" in azimuthal_integrator._cached_array: azimuthal_integrator._cached_array.pop("chi_center") array_normalization = get_array_normalization( azimuthal_integrator=azimuthal_integrator, normalization_factor=NormalizationFactor, polarization_factor=polarization_factor, polarization_axis=polarization_axis_offset, correct_polarization=correct_polarization, absolute_solidangle=absolute_solidangle, correct_solidangle=correct_solidangle, filename_flat=filename_flat, filename_mask=filename_mask, dummy=Dummy, delta_dummy=DDummy, binning=binning, datatype=datatype, use_cupy=use_cupy, persistent=True, ) if variance_formula and array_dark is not None: if use_cupy and cupy_available(): dark = array_dark.get() else: dark = array_dark else: dark = None dataset_variance = calculate_dataset_variance( dataset_signal=dataset_signal, variance_formula=variance_formula, dark=dark, ) params_normalization = { "dataset_signal": dataset_signal, "dataset_variance": dataset_variance, "monitor_values": monitor_values, "array_flat": array_flat, "array_dark": array_dark, "array_mask": array_mask, "array_solidangle": array_solidangle, "array_polarization": array_polarization, "array_normalization": array_normalization, "dummy": Dummy, "delta_dummy": DDummy, "datatype": datatype, "normalization_factor": NormalizationFactor, "detector_distortion": azimuthal_integrator.detector, } ( dataset_normalized_signal, dataset_normalized_variance, dataset_normalized_sigma, dataset_normalization, ) = algorithm["method"]( **params_normalization, ) t1 = time.perf_counter() logger.debug( f"Normalization completed in {t1 - t0:.2f} seconds using {algorithm['name']} algorithm." ) if dataset_normalized_variance is None: dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype) if dataset_normalized_sigma is None: dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype) return ( dataset_normalized_signal, dataset_normalized_variance, dataset_normalized_sigma, dataset_normalization, )
def _parse_datatype(datatype): if datatype in ("float32", numpy.float32): datatype = numpy.dtype("float32") elif datatype in ("float64", numpy.float64): datatype = numpy.dtype("float64") return datatype
[docs] def get_array_polarization( azimuthal_integrator: AzimuthalIntegrator = None, polarization_factor: float = None, polarization_axis: int = 0, use_cupy: bool = False, correct: bool = True, persistent: bool = True, ) -> numpy.ndarray: """ Generate the array of the polarization correction, eventually applying the mask and dummy pixel filtering params: - azimuthal_integrator (AzimuthalIntegrator): the azimuthal integrator object - polarization_factor (float): the polarization factor - polarization_axis (int): the polarization axis - use_cupy (bool): if True, returns a cupy.asarray - correct (bool): if True, applies the polarization correction - persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes returns: - numpy.ndarray or cupy.ndarray: the array of the polarization correction """ if not persistent: return _get_array_polarization( azimuthal_integrator=azimuthal_integrator, polarization_factor=polarization_factor, polarization_axis=polarization_axis, correct=correct, use_cupy=use_cupy, ) return _get_persistent_array_polarization( data_signal_shape=azimuthal_integrator.detector.shape, dist=azimuthal_integrator.dist, wavelength=azimuthal_integrator.wavelength, psize_1=azimuthal_integrator.detector.pixel1, psize_2=azimuthal_integrator.detector.pixel2, poni1=azimuthal_integrator.poni1, poni2=azimuthal_integrator.poni2, rot1=azimuthal_integrator.rot1, rot2=azimuthal_integrator.rot2, rot3=azimuthal_integrator.rot3, polarization_factor=polarization_factor, polarization_axis=polarization_axis, correct=correct, use_cupy=use_cupy, )
@lru_cache(maxsize=5) def _get_persistent_array_polarization( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, polarization_factor: float = None, polarization_axis: int = 0, correct: bool = True, use_cupy: bool = False, ): logger.info( f"No cache hit. Creating new polarization array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {polarization_factor=}, {polarization_axis=}, {use_cupy=}" ) azimuthal_integrator = get_azimuthal_integrator( data_signal_shape=data_signal_shape, SampleDistance=dist, WaveLength=wavelength, poni1=poni1, poni2=poni2, PSize_1=psize_1, PSize_2=psize_2, DetectorRotation_1=rot2, DetectorRotation_2=rot1, DetectorRotation_3=rot3, persistent=True, ) return _get_array_polarization( azimuthal_integrator=azimuthal_integrator, polarization_factor=polarization_factor, polarization_axis=polarization_axis, correct=correct, use_cupy=use_cupy, ) def _get_array_polarization( azimuthal_integrator: AzimuthalIntegrator, polarization_factor: float = None, polarization_axis: int = 0, correct: bool = True, use_cupy: bool = False, ): if not correct: if use_cupy and cupy_available(): return cupy.ones(azimuthal_integrator.detector.shape, dtype="float32") return numpy.ones(azimuthal_integrator.detector.shape, dtype="float32") array_polarization = azimuthal_integrator.polarization( factor=polarization_factor, axis_offset=polarization_axis, ) if use_cupy and cupy_available(): return cupy.asarray(array_polarization) return array_polarization
[docs] def get_array_solidangle( azimuthal_integrator: AzimuthalIntegrator, absolute: bool = True, correct: bool = True, use_cupy: bool = False, persistent: bool = True, ) -> numpy.ndarray: """ Generate the solid-angle array associated to an AzimuthalIntegrator. params: - azimuthal_integrator (AzimuthalIntegrator): the azimuthal integrator object - absolute (bool): if True, the solid-angle values are absolute, otherwise they are relative - correct (bool): if True, the solid-angle array is used for correction - use_cupy (bool): if True, the returned array is a cupy array, otherwise it is a numpy array - persistent (bool): if True, the solid-angle array is cached for future use, otherwise it is generated on the fly for each call returns: - solid_angle_array (numpy.ndarray or cupy.ndarray): the solid-angle array """ ... if not persistent: return _get_array_solidangle( azimuthal_integrator=azimuthal_integrator, absolute=absolute, correct=correct, use_cupy=use_cupy, ) return _get_persistent_array_solidangle( data_signal_shape=azimuthal_integrator.detector.shape, dist=azimuthal_integrator.dist, wavelength=azimuthal_integrator.wavelength, psize_1=azimuthal_integrator.detector.pixel1, psize_2=azimuthal_integrator.detector.pixel2, poni1=azimuthal_integrator.poni1, poni2=azimuthal_integrator.poni2, rot1=azimuthal_integrator.rot1, rot2=azimuthal_integrator.rot2, rot3=azimuthal_integrator.rot3, absolute=absolute, correct=correct, use_cupy=use_cupy, )
@lru_cache(maxsize=5) def _get_persistent_array_solidangle( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, absolute: bool = True, correct: bool = True, use_cupy: bool = False, ) -> numpy.ndarray: logger.info( f"No cache hit. Creating new solid angle array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {absolute=}, {correct=}, {use_cupy=}" ) azimuthal_integrator = get_azimuthal_integrator( data_signal_shape=data_signal_shape, SampleDistance=dist, WaveLength=wavelength, poni1=poni1, poni2=poni2, PSize_1=psize_1, PSize_2=psize_2, DetectorRotation_1=rot2, DetectorRotation_2=rot1, DetectorRotation_3=rot3, persistent=True, ) return _get_array_solidangle( azimuthal_integrator=azimuthal_integrator, absolute=absolute, correct=correct, use_cupy=use_cupy, ) def _get_array_solidangle( azimuthal_integrator: AzimuthalIntegrator, absolute: bool = True, correct: bool = True, use_cupy: bool = False, ) -> numpy.ndarray: if not correct: if use_cupy and cupy_available(): return cupy.ones(azimuthal_integrator.detector.shape, dtype="float32") return numpy.ones(azimuthal_integrator.detector.shape, dtype="float32") array_solidangle = azimuthal_integrator.solidAngleArray(absolute=absolute) if use_cupy and cupy_available(): return cupy.asarray(array_solidangle) return array_solidangle
[docs] def get_array_normalization( azimuthal_integrator: AzimuthalIntegrator, normalization_factor: float = None, polarization_factor: float = None, polarization_axis: int = 0, correct_polarization: bool = True, absolute_solidangle: bool = True, correct_solidangle: bool = True, filename_flat: str = None, filename_mask: str = None, dummy: int = None, delta_dummy: float = None, binning: tuple = (1, 1), use_cupy: bool = False, datatype: str = "float32", persistent: bool = True, ): """ Generate the array of the normalization correction, except the monitor value (per frame) params: - azimuthal_integrator (AzimuthalIntegrator): the azimuthal integrator object - normalization_factor (float): the normalization factor - polarization_factor (float): the polarization factor - polarization_axis (int): the polarization axis - correct_polarization (bool): if True, applies the polarization correction - absolute_solidangle (bool): if True, applies the absolute solid-angle correction - correct_solidangle (bool): if True, applies the solid-angle correction - filename_flat (str): the filename of the flat-field correction - filename_mask (str): the filename of the mask - dummy (int): if not None, the value of the dummy pixels to filter - delta_dummy (float): if dummy is not None, the tolerance around the dummy value - binning (tuple): binning of the data signal, used for binning unification - use_cupy (bool): if True, returns a cupy.asarray - datatype (str): format of the imported array, if None, datatype is respected - persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes returns: - numpy.ndarray or cupy.ndarray: the array of the normalization correction """ if not persistent: return _get_array_normalization( azimuthal_integrator=azimuthal_integrator, normalization_factor=normalization_factor, polarization_factor=polarization_factor, polarization_axis=polarization_axis, correct_polarization=correct_polarization, filename_flat=filename_flat, filename_mask=filename_mask, absolute_solidangle=absolute_solidangle, correct_solidangle=correct_solidangle, dummy=dummy, delta_dummy=delta_dummy, binning=binning, use_cupy=use_cupy, datatype=datatype, ) return _get_persistent_array_normalization( data_signal_shape=azimuthal_integrator.detector.shape, dist=azimuthal_integrator.dist, wavelength=azimuthal_integrator.wavelength, psize_1=azimuthal_integrator.detector.pixel1, psize_2=azimuthal_integrator.detector.pixel2, poni1=azimuthal_integrator.poni1, poni2=azimuthal_integrator.poni2, rot1=azimuthal_integrator.rot1, rot2=azimuthal_integrator.rot2, rot3=azimuthal_integrator.rot3, normalization_factor=normalization_factor, polarization_factor=polarization_factor, polarization_axis=polarization_axis, correct_polarization=correct_polarization, filename_flat=filename_flat, filename_mask=filename_mask, absolute_solidangle=absolute_solidangle, correct_solidangle=correct_solidangle, dummy=dummy, delta_dummy=delta_dummy, binning=binning, use_cupy=use_cupy, datatype=datatype, )
@lru_cache(maxsize=5) def _get_persistent_array_normalization( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, normalization_factor: float = None, polarization_factor: float = None, polarization_axis: int = 0, correct_polarization: bool = True, filename_flat: str = None, filename_mask: str = None, absolute_solidangle: bool = True, correct_solidangle: bool = True, dummy: int = None, delta_dummy: float = None, binning: tuple = (1, 1), use_cupy: bool = False, datatype: str = "float32", ): logger.info("No cache hit. Creating new normalization array.") azimuthal_integrator = get_azimuthal_integrator( data_signal_shape=data_signal_shape, SampleDistance=dist, WaveLength=wavelength, poni1=poni1, poni2=poni2, PSize_1=psize_1, PSize_2=psize_2, DetectorRotation_1=rot2, DetectorRotation_2=rot1, DetectorRotation_3=rot3, persistent=True, ) return _get_array_normalization( azimuthal_integrator=azimuthal_integrator, normalization_factor=normalization_factor, polarization_factor=polarization_factor, polarization_axis=polarization_axis, correct_polarization=correct_polarization, absolute_solidangle=absolute_solidangle, correct_solidangle=correct_solidangle, filename_flat=filename_flat, filename_mask=filename_mask, dummy=dummy, delta_dummy=delta_dummy, binning=binning, datatype=datatype, use_cupy=use_cupy, ) def _get_array_normalization( azimuthal_integrator: AzimuthalIntegrator, normalization_factor: float = None, polarization_factor: float = None, polarization_axis: int = 0, correct_polarization: bool = True, absolute_solidangle: bool = True, correct_solidangle: bool = True, filename_flat: str = None, filename_mask: str = None, dummy: int = None, delta_dummy: float = None, binning: tuple = (1, 1), datatype: str = "float32", use_cupy: bool = False, ): data_signal_shape = azimuthal_integrator.detector.shape if use_cupy and cupy_available(): normalization_array = cupy.ones( data_signal_shape, dtype=datatype, ) else: normalization_array = numpy.ones( data_signal_shape, dtype=datatype, ) array_polarization = get_array_polarization( azimuthal_integrator=azimuthal_integrator, polarization_factor=polarization_factor, polarization_axis=polarization_axis, use_cupy=use_cupy, correct=correct_polarization, persistent=True, ) if array_polarization is not None: normalization_array *= array_polarization array_solidangle = get_array_solidangle( azimuthal_integrator=azimuthal_integrator, absolute=absolute_solidangle, correct=correct_solidangle, use_cupy=use_cupy, persistent=True, ) if array_solidangle is not None: normalization_array *= array_solidangle if normalization_factor is not None: normalization_array /= normalization_factor array_flat_field = get_array_flat( filename_flat=filename_flat, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, filename_mask=filename_mask, use_cupy=use_cupy, persistent=True, ) if array_flat_field is not None: normalization_array *= array_flat_field array_mask = get_array_mask( filename_mask=filename_mask, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, ) if array_mask is not None and dummy: if use_cupy and cupy_available(): cupy.copyto(normalization_array, dummy, where=array_mask) else: numpy.copyto(normalization_array, dummy, where=array_mask) return normalization_array
[docs] def calculate_normalization_values( monitor_values, azimuthal_integrator=None, psize_1=None, psize_2=None, dist=None, normalization_factor=None, ): if normalization_factor is None: normalization_factor = 1.0 if azimuthal_integrator is not None: psize_1 = psize_1 or azimuthal_integrator.detector.pixel1 psize_2 = psize_2 or azimuthal_integrator.detector.pixel2 dist = dist or azimuthal_integrator.dist if monitor_values is None or psize_1 is None or psize_2 is None or dist is None: return normalization_factor solid_angle = psize_1 * psize_2 / dist**2 return monitor_values * solid_angle / normalization_factor