import logging
import time
from typing import Optional
from functools import lru_cache
import numexpr
import numpy
from pyFAI.version import MAJOR
if MAJOR >= 2025:
from pyFAI.integrator.azimuthal import AzimuthalIntegrator
else:
from pyFAI.azimuthalIntegrator import AzimuthalIntegrator
try:
import cupy
from .cupyutils import log_allocated_gpu_memory
except ImportError:
cupy = None
CUPY_AVAILABLE = False
CUPY_MEM_POOL = None
else:
CUPY_AVAILABLE = True
CUPY_MEM_POOL = cupy.get_default_memory_pool()
from ewoksid02.utils.pyfai import _get_persistent_azimuthal_integrator
from .io import (
get_persistent_array_mask,
get_persistent_array_dark,
get_persistent_array_flat,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def _normalize_dataset_cupy(
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask=None,
array_dark=None,
array_normalization=None,
dummy: float = -10,
datatype: str = "float32",
**kwargs,
):
log_allocated_gpu_memory()
t1 = time.perf_counter()
dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
# logger.info(f"Time to initialize numpy output arrays: {t2 - t1:.2f} seconds")
# One by one
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
for index_frame, data_signal, monitor_value in zip(
range(nb_frames), dataset_signal, monitor_values
):
data_signal_cu = cupy.asarray(data_signal, dtype=datatype)
# Remove dark from raw (not from norm)
if array_dark is not None:
data_signal_cu -= array_dark
if dataset_variance is not None:
data_variance_cu = cupy.asarray(
dataset_variance[index_frame], dtype=datatype
)
else:
data_variance_cu = None
# Get final normalization array
normalization_array_cu_frame = array_normalization * monitor_value
# Normalize the signal
data_signal_normalized_cu = cupy.where(
normalization_array_cu_frame == 0.0,
dummy,
data_signal_cu / normalization_array_cu_frame,
)
data_variance_normalized_cu = None
data_sigma_normalized_cu = None
# Normalize the variance and sigma
if data_variance_cu is not None:
data_variance_normalized_cu = cupy.where(
normalization_array_cu_frame == 0.0,
dummy,
data_variance_cu
/ (normalization_array_cu_frame * normalization_array_cu_frame),
)
data_sigma_normalized_cu = cupy.where(
normalization_array_cu_frame == 0.0,
dummy,
cupy.sqrt(data_variance_cu) / cupy.abs(normalization_array_cu_frame),
)
# Mask all results
if array_mask is not None:
data_signal_normalized_cu = cupy.where(
array_mask, dummy, data_signal_normalized_cu
)
if data_variance_cu is not None:
data_variance_normalized_cu = cupy.where(
array_mask, dummy, data_variance_normalized_cu
)
data_sigma_normalized_cu = cupy.where(
array_mask, dummy, data_sigma_normalized_cu
)
# Allocate numpy arrays in place
dataset_normalized_signal[index_frame] = data_signal_normalized_cu.get()
if data_variance_normalized_cu is not None:
dataset_normalized_variance[index_frame] = data_variance_normalized_cu.get()
if data_sigma_normalized_cu is not None:
dataset_normalized_sigma[index_frame] = data_sigma_normalized_cu.get()
t3 = time.perf_counter()
logger.debug(f"Total time for normalization: {t3 - t1:.2f} seconds")
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
)
def _normalize_dataset_numpy(
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask: numpy.ndarray = None,
array_dark: numpy.ndarray = None,
array_normalization: numpy.ndarray = None,
dummy: float = -10,
datatype: str = "float32",
**kwargs,
):
dataset_signal_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_variance_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_sigma_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
if len(monitor_values) != nb_frames:
monitor_values = monitor_values[0:nb_frames]
# Remove dark from raw (not from norm)
if array_dark is not None:
dataset_signal -= array_dark
# Get final normalization dataset with nb frames
dataset_normalization = (
numpy.broadcast_to(array_normalization, dataset_signal.shape).copy()
* monitor_values[:, None, None]
)
# Normalize the signal
dataset_signal_normalized = numpy.where(
dataset_normalization == 0.0,
dummy,
dataset_signal / dataset_normalization,
)
# Normalize the variance and sigma
if dataset_variance is not None:
dataset_variance_normalized = numpy.where(
dataset_normalization == 0.0,
dummy,
dataset_variance / (dataset_normalization * dataset_normalization),
)
dataset_sigma_normalized = numpy.where(
dataset_normalization == 0.0,
dummy,
numpy.sqrt(dataset_variance) / numpy.abs(dataset_normalization),
)
else:
dataset_variance_normalized = None
dataset_sigma_normalized = None
# Mask all results
if array_mask is not None:
dataset_signal_normalized = numpy.where(
array_mask, dummy, dataset_signal_normalized
)
if dataset_variance_normalized is not None:
dataset_variance_normalized = numpy.where(
array_mask, dummy, dataset_variance_normalized
)
if dataset_sigma_normalized is not None:
dataset_sigma_normalized = numpy.where(
array_mask, dummy, dataset_sigma_normalized
)
return (
dataset_signal_normalized,
dataset_variance_normalized,
dataset_sigma_normalized,
)
def _normalize_dataset_opencl(
detector_distortion,
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask: numpy.ndarray = None,
array_flat: numpy.ndarray = None,
array_dark: numpy.ndarray = None,
array_polarization: numpy.ndarray = None,
array_solidangle: numpy.ndarray = None,
normalization_factor: float = 1.0,
Dummy: float = -10,
DDummy: float = 0.01,
datatype: str = "float32",
**kwargs,
):
from pyFAI.distortion import Distortion
dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
distortion = Distortion(
detector_distortion, method="csr", device="gpu", mask=array_mask, empty=Dummy
)
distortion.reset(prepare=True) # enforce initialization
if distortion.integrator is None:
distortion.calc_init()
for index_frame, data_signal, monitor_value in zip(
range(nb_frames), dataset_signal, monitor_values
):
variance = None
if dataset_variance is not None:
variance = dataset_variance[index_frame]
result = distortion.integrator.integrate_ng(
data=data_signal,
variance=variance,
flat=array_flat,
dark=array_dark,
solidangle=array_solidangle,
polarization=array_polarization,
dummy=Dummy,
delta_dummy=DDummy,
normalization_factor=monitor_value / normalization_factor,
out_merged=False,
)
dataset_normalized_signal[index_frame] = result.intensity.reshape(
data_signal.shape
)
if result.variance is not None:
dataset_normalized_variance[index_frame] = result.variance
if result.sigma is not None:
dataset_normalized_sigma[index_frame] = result.sigma.reshape(
data_signal.shape
)
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
)
def _normalize_dataset_cython(
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask: numpy.ndarray = None,
array_flat: numpy.ndarray = None,
array_dark: numpy.ndarray = None,
array_polarization: numpy.ndarray = None,
array_solidangle: numpy.ndarray = None,
normalization_factor: float = 1.0,
Dummy: float = -10,
DDummy: float = 0.01,
datatype: str = "float32",
**kwargs,
):
from pyFAI.ext.preproc import preproc as preproc_cy
dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
preproc_params = {
"dark": array_dark,
"flat": array_flat,
"mask": array_mask,
"solidangle": array_solidangle,
"polarization": array_polarization,
"dummy": Dummy,
"delta_dummy": DDummy,
"empty": None,
}
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
for index_frame, data_signal, monitor_value in zip(
range(nb_frames), dataset_signal, monitor_values
):
if dataset_variance is None:
dataset_normalized_signal[index_frame] = preproc_cy(
raw=data_signal,
variance=None,
normalization_factor=monitor_value / normalization_factor,
**preproc_params,
)
else:
proc_data = preproc_cy(
raw=data_signal,
variance=dataset_variance[index_frame],
normalization_factor=monitor_value / normalization_factor,
**preproc_params,
)
pp_signal = proc_data[:, :, 0] # noqa
pp_variance = proc_data[:, :, 1] # noqa
pp_normalisation = proc_data[:, :, 2] # noqa
dataset_normalized_signal[index_frame] = numexpr.evaluate(
f"where(pp_normalisation == 0.0, {Dummy}, pp_signal / pp_normalisation)"
)
dataset_normalized_variance[index_frame] = numexpr.evaluate(
f"where(pp_normalisation == 0.0, {Dummy}, pp_variance / (pp_normalisation * pp_normalisation))"
)
dataset_normalized_sigma[index_frame] = numexpr.evaluate(
f"where(pp_normalisation == 0.0, {Dummy}, sqrt(pp_variance) / abs(pp_normalisation))"
)
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
)
ALGORITHMS_NORMALIZATION = {
"cython": {
"method": _normalize_dataset_cython,
"use_cupy": False,
"name": "cython",
},
"opencl": {
"method": _normalize_dataset_opencl,
"use_cupy": False,
"name": "opencl",
},
"numpy": {"method": _normalize_dataset_numpy, "use_cupy": False, "name": "numpy"},
"cupy": {"method": _normalize_dataset_cupy, "use_cupy": True, "name": "cupy"},
}
DEFAULT_ALGORITHM_NORMALIZATION = "numpy"
def _parse_norm_algorithm(algorithm: str) -> dict:
if algorithm not in ALGORITHMS_NORMALIZATION:
logger.warning(
f"Algorithm '{algorithm}' is not available. Using '{DEFAULT_ALGORITHM_NORMALIZATION}' instead."
)
algorithm = DEFAULT_ALGORITHM_NORMALIZATION
elif algorithm == "cupy" and not CUPY_AVAILABLE:
logger.warning(
f"CuPy is not available. Using {DEFAULT_ALGORITHM_NORMALIZATION} instead."
)
algorithm = DEFAULT_ALGORITHM_NORMALIZATION
logger.debug(f"Performing normalization with algorithm: {algorithm}")
return ALGORITHMS_NORMALIZATION[algorithm]
def _parse_datatype(datatype):
if datatype in ("float32", numpy.float32):
datatype = numpy.dtype("float32")
elif datatype in ("float64", numpy.float64):
datatype = numpy.dtype("float64")
return datatype
[docs]
def get_array_polarization(
azimuthal_integrator: AzimuthalIntegrator,
polarization_factor: float = None,
polarization_axis: int = 0,
use_cupy: bool = False,
):
array_polarization = azimuthal_integrator.polarization(
factor=polarization_factor,
axis_offset=polarization_axis,
)
if use_cupy and CUPY_AVAILABLE:
return cupy.asarray(array_polarization)
return array_polarization
@lru_cache(maxsize=5)
def _get_persistent_array_polarization(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
polarization_factor: float = None,
polarization_axis: int = 0,
use_cupy: bool = False,
):
logger.info(
f"No cache hit. Creating new polarization array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {polarization_factor=}, {polarization_axis=}, {use_cupy=}"
)
azimuthal_integrator = _get_persistent_azimuthal_integrator(
data_signal_shape=data_signal_shape,
dist=dist,
poni1=poni1,
poni2=poni2,
psize_1=psize_1,
psize_2=psize_2,
rot1=rot1,
rot2=rot2,
rot3=rot3,
wavelength=wavelength,
)
return get_array_polarization(
azimuthal_integrator=azimuthal_integrator,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
use_cupy=use_cupy,
)
[docs]
def get_array_solidangle(
azimuthal_integrator: AzimuthalIntegrator,
absolute: bool = True,
use_cupy: bool = False,
):
array_solidangle = azimuthal_integrator.solidAngleArray(absolute=absolute)
if use_cupy and CUPY_AVAILABLE:
return cupy.asarray(array_solidangle)
return array_solidangle
@lru_cache(maxsize=5)
def _get_persistent_array_solidangle(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
absolute: bool = True,
use_cupy: bool = False,
):
logger.info(
f"No cache hit. Creating new solid angle array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {absolute=}, {use_cupy=}"
)
azimuthal_integrator = _get_persistent_azimuthal_integrator(
data_signal_shape=data_signal_shape,
dist=dist,
poni1=poni1,
poni2=poni2,
psize_1=psize_1,
psize_2=psize_2,
rot1=rot1,
rot2=rot2,
rot3=rot3,
wavelength=wavelength,
)
return get_array_solidangle(
azimuthal_integrator=azimuthal_integrator,
absolute=absolute,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_normalization(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
normalization_factor: float = None,
polarization_factor: float = None,
polarization_axis: int = 0,
filename_flat: str = None,
filename_mask: str = None,
absolute_solidangle: bool = True,
dummy: int = None,
delta_dummy: float = None,
binning: tuple = (1, 1),
use_cupy: bool = False,
datatype: str = "float32",
):
logger.info("No cache hit. Creating new normalization array.")
if use_cupy and CUPY_AVAILABLE:
normalization_array = cupy.ones(
data_signal_shape,
dtype=datatype,
)
else:
normalization_array = numpy.ones(
data_signal_shape,
dtype=datatype,
)
array_polarization = _get_persistent_array_polarization(
data_signal_shape=data_signal_shape,
dist=dist,
wavelength=wavelength,
psize_1=psize_1,
psize_2=psize_2,
poni1=poni1,
poni2=poni2,
rot1=rot1,
rot2=rot2,
rot3=rot3,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
use_cupy=use_cupy,
)
if array_polarization is not None:
normalization_array *= array_polarization
array_flat_field = get_persistent_array_flat(
filename_flat=filename_flat,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
use_cupy=use_cupy,
)
if array_flat_field is not None:
normalization_array *= array_flat_field
array_solidangle = _get_persistent_array_solidangle(
data_signal_shape=data_signal_shape,
dist=dist,
wavelength=wavelength,
psize_1=psize_1,
psize_2=psize_2,
poni1=poni1,
poni2=poni2,
rot1=rot1,
rot2=rot2,
rot3=rot3,
absolute=absolute_solidangle,
use_cupy=use_cupy,
)
if array_solidangle is not None:
normalization_array *= array_solidangle
if normalization_factor is not None:
normalization_array /= normalization_factor
array_mask = get_persistent_array_mask(
filename_mask=filename_mask,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
)
if array_mask is not None and dummy:
if use_cupy and CUPY_AVAILABLE:
normalization_array = cupy.where(array_mask, dummy, normalization_array)
else:
normalization_array = numpy.where(array_mask, dummy, normalization_array)
return normalization_array
[docs]
def calculate_dataset_variance(
dataset_signal: numpy.ndarray,
variance_formula: str = None,
dark: numpy.ndarray = None,
):
if not variance_formula:
return
variance_function = numexpr.NumExpr(
variance_formula, [("data", numpy.float64), ("dark", numpy.float64)]
)
dataset_variance = variance_function(dataset_signal, 0 if dark is None else dark)
return dataset_variance
[docs]
def normalize_dataset(
azimuthal_integrator: AzimuthalIntegrator,
dataset_signal: numpy.ndarray,
monitor_values: numpy.ndarray = None,
NormalizationFactor: float = 1.0,
filename_mask_normalization: str = None,
filename_dark_normalization: str = None,
filename_flat_normalization: str = None,
absolute_solidangle: bool = True,
binning: tuple = (1, 1),
Dummy=None,
DDummy=None,
variance_formula=None,
polarization_factor=None,
polarization_axis_offset=0,
datatype="float32",
algorithm_normalization: str = "cython",
dark_filter: str = "quantil",
dark_filter_quantil_lower: float = 0.1,
dark_filter_quantil_upper: float = 0.9,
**kwargs,
):
t0 = time.perf_counter()
datatype = _parse_datatype(datatype=datatype)
data_signal_shape = azimuthal_integrator.detector.shape
algorithm_normalization = _parse_norm_algorithm(algorithm=algorithm_normalization)
use_cupy = algorithm_normalization["use_cupy"]
common_params = {
"use_cupy": use_cupy,
"data_signal_shape": data_signal_shape,
"binning": binning,
"dummy": DDummy,
"delta_dummy": DDummy,
}
array_mask = get_persistent_array_mask(
filename_mask=filename_mask_normalization,
**common_params,
)
array_flat = get_persistent_array_flat(
filename_flat=filename_flat_normalization,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=Dummy,
delta_dummy=DDummy,
filename_mask=filename_mask_normalization,
use_cupy=use_cupy,
)
array_dark = get_persistent_array_dark(
filename_dark=filename_dark_normalization,
filename_mask=filename_mask_normalization,
dark_filter=dark_filter,
dark_filter_quantil_lower=dark_filter_quantil_lower,
dark_filter_quantil_upper=dark_filter_quantil_upper,
**common_params,
)
ai_params = {
"data_signal_shape": azimuthal_integrator.detector.shape,
"dist": azimuthal_integrator.dist,
"wavelength": azimuthal_integrator.wavelength,
"psize_1": azimuthal_integrator.detector.pixel1,
"psize_2": azimuthal_integrator.detector.pixel2,
"poni1": azimuthal_integrator.poni1,
"poni2": azimuthal_integrator.poni2,
"rot1": azimuthal_integrator.rot1,
"rot2": azimuthal_integrator.rot2,
"rot3": azimuthal_integrator.rot3,
}
array_polarization = _get_persistent_array_polarization(
**ai_params,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis_offset,
use_cupy=use_cupy,
)
array_solidangle = _get_persistent_array_solidangle(
**ai_params,
absolute=True,
use_cupy=use_cupy,
)
# Patch due to inconsistent chiArray and position_array methods in pyFAI
if "chi_center" in azimuthal_integrator._cached_array:
azimuthal_integrator._cached_array.pop("chi_center")
array_normalization = _get_persistent_array_normalization(
normalization_factor=NormalizationFactor,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis_offset,
filename_flat=filename_flat_normalization,
filename_mask=filename_mask_normalization,
absolute_solidangle=absolute_solidangle,
datatype=datatype,
use_cupy=use_cupy,
binning=binning,
dummy=Dummy,
delta_dummy=DDummy,
**ai_params,
)
if variance_formula and array_dark is not None:
if use_cupy and CUPY_AVAILABLE:
dark = array_dark.get()
else:
dark = array_dark
else:
dark = None
dataset_variance = calculate_dataset_variance(
dataset_signal=dataset_signal,
variance_formula=variance_formula,
dark=dark,
)
params_normalization = {
"dataset_signal": dataset_signal,
"dataset_variance": dataset_variance,
"monitor_values": monitor_values,
"array_flat": array_flat,
"array_dark": array_dark,
"array_mask": array_mask,
"array_solidangle": array_solidangle,
"array_polarization": array_polarization,
"array_normalization": array_normalization,
"dummy": Dummy,
"delta_dummy": DDummy,
"datatype": datatype,
"normalization_factor": NormalizationFactor,
"detector_distortion": azimuthal_integrator.detector,
}
(
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
) = algorithm_normalization["method"](
**params_normalization,
)
t1 = time.perf_counter()
logger.debug(
f"Normalization completed in {t1 - t0:.2f} seconds using {algorithm_normalization['name']} algorithm."
)
if dataset_normalized_variance is None:
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
if dataset_normalized_sigma is None:
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
)
[docs]
def calculate_normalization_values(
monitor_values,
azimuthal_integrator=None,
psize_1=None,
psize_2=None,
dist=None,
normalization_factor=None,
):
if normalization_factor is None:
normalization_factor = 1.0
if azimuthal_integrator is not None:
psize_1 = psize_1 or azimuthal_integrator.detector.pixel1
psize_2 = psize_2 or azimuthal_integrator.detector.pixel2
dist = dist or azimuthal_integrator.dist
if monitor_values is None or psize_1 is None or psize_2 is None or dist is None:
return normalization_factor
solid_angle = psize_1 * psize_2 / dist**2
return monitor_values * solid_angle / normalization_factor