from __future__ import annotations
import logging
import time
from functools import lru_cache
from typing import Optional
import numexpr
import numpy
from .utils.gpu import cupy, cupy_available, log_allocated_gpu_memory
from .utils.io import get_array_dark, get_array_flat, get_array_mask
from .utils.mathutils import calculate_dataset_variance
from .utils.pyfai import AzimuthalIntegrator, get_azimuthal_integrator
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def _normalize_dataset_cupy(
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask=None,
array_dark=None,
array_normalization=None,
dummy: float = -10,
datatype: str = "float32",
**kwargs,
):
log_allocated_gpu_memory()
t1 = time.perf_counter()
dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalization = numpy.zeros_like(dataset_signal, dtype=datatype)
# One by one
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
for index_frame, data_signal, monitor_value in zip(
range(nb_frames), dataset_signal, monitor_values
):
data_signal_cu = cupy.asarray(data_signal, dtype=datatype)
# Remove dark from raw (not from norm)
if array_dark is not None:
data_signal_cu -= array_dark
if dataset_variance is not None:
data_variance_cu = cupy.asarray(
dataset_variance[index_frame], dtype=datatype
)
else:
data_variance_cu = None
# Get final normalization array
normalization_array_cu_frame = cupy.where(
array_normalization == dummy, dummy, array_normalization * monitor_value
)
# Replace zeros in normalization array with dummy to avoid division by zero
cupy.copyto(
normalization_array_cu_frame,
dummy,
where=array_normalization == 0.0,
)
# Normalize the signal
data_signal_normalized_cu = cupy.where(
normalization_array_cu_frame == dummy,
normalization_array_cu_frame,
data_signal_cu / normalization_array_cu_frame,
)
data_variance_normalized_cu = None
data_sigma_normalized_cu = None
# Normalize the variance and sigma
if data_variance_cu is not None:
data_variance_normalized_cu = cupy.where(
normalization_array_cu_frame == dummy,
dummy,
data_variance_cu
/ (normalization_array_cu_frame * normalization_array_cu_frame),
)
data_sigma_normalized_cu = cupy.where(
normalization_array_cu_frame == dummy,
dummy,
cupy.sqrt(data_variance_cu) / cupy.abs(normalization_array_cu_frame),
)
data_signal_normalized_cu = cupy.where(
data_signal_cu == dummy, dummy, data_signal_normalized_cu
)
# Allocate numpy arrays in place
dataset_normalized_signal[index_frame] = data_signal_normalized_cu.get()
if data_variance_normalized_cu is not None:
dataset_normalized_variance[index_frame] = data_variance_normalized_cu.get()
if data_sigma_normalized_cu is not None:
dataset_normalized_sigma[index_frame] = data_sigma_normalized_cu.get()
dataset_normalization[index_frame] = normalization_array_cu_frame.get()
t3 = time.perf_counter()
logger.debug(f"Total time for normalization: {t3 - t1:.2f} seconds")
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
dataset_normalization,
)
def _normalize_dataset_numpy(
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask: numpy.ndarray = None,
array_dark: numpy.ndarray = None,
array_normalization: numpy.ndarray = None,
dummy: float = -10,
datatype: str = "float32",
**kwargs,
):
dataset_signal_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_variance_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_sigma_normalized = numpy.zeros_like(dataset_signal, dtype=datatype)
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
if len(monitor_values) != nb_frames:
monitor_values = monitor_values[0:nb_frames]
# Remove dark from raw (not from norm)
if array_dark is not None:
dataset_signal -= array_dark
# Get final normalization dataset with nb frames
dataset_normalization = (
numpy.broadcast_to(array_normalization, dataset_signal.shape).copy()
* monitor_values[:, None, None]
)
# Normalize the signal
dataset_signal_normalized = numpy.where(
dataset_normalization == 0.0,
dummy,
dataset_signal / dataset_normalization,
)
# Normalize the variance and sigma
if dataset_variance is not None:
dataset_variance_normalized = numpy.where(
dataset_normalization == 0.0,
dummy,
dataset_variance / (dataset_normalization * dataset_normalization),
)
dataset_sigma_normalized = numpy.where(
dataset_normalization == 0.0,
dummy,
numpy.sqrt(dataset_variance) / numpy.abs(dataset_normalization),
)
else:
dataset_variance_normalized = None
dataset_sigma_normalized = None
# Mask all results
if array_mask is not None:
dataset_signal_normalized = numpy.where(
array_mask, dummy, dataset_signal_normalized
)
if dataset_variance_normalized is not None:
dataset_variance_normalized = numpy.where(
array_mask, dummy, dataset_variance_normalized
)
if dataset_sigma_normalized is not None:
dataset_sigma_normalized = numpy.where(
array_mask, dummy, dataset_sigma_normalized
)
return (
dataset_signal_normalized,
dataset_variance_normalized,
dataset_sigma_normalized,
dataset_normalization,
)
def _normalize_dataset_opencl(
detector_distortion,
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask: numpy.ndarray = None,
array_flat: numpy.ndarray = None,
array_dark: numpy.ndarray = None,
array_polarization: numpy.ndarray = None,
array_solidangle: numpy.ndarray = None,
normalization_factor: float = 1.0,
Dummy: float = -10,
DDummy: float = 0.01,
datatype: str = "float32",
**kwargs,
):
from pyFAI.distortion import Distortion
dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalization = numpy.zeros_like(dataset_signal, dtype=datatype) # noqa
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
distortion = Distortion(
detector_distortion, method="csr", device="gpu", mask=array_mask, empty=Dummy
)
distortion.reset(prepare=True) # enforce initialization
if distortion.integrator is None:
distortion.calc_init()
for index_frame, data_signal, monitor_value in zip(
range(nb_frames), dataset_signal, monitor_values
):
variance = None
if dataset_variance is not None:
variance = dataset_variance[index_frame]
result = distortion.integrator.integrate_ng(
data=data_signal,
variance=variance,
flat=array_flat,
dark=array_dark,
solidangle=array_solidangle,
polarization=array_polarization,
dummy=Dummy,
delta_dummy=DDummy,
normalization_factor=monitor_value / normalization_factor,
out_merged=False,
)
dataset_normalized_signal[index_frame] = result.intensity.reshape(
data_signal.shape
)
if result.variance is not None:
dataset_normalized_variance[index_frame] = result.variance
if result.sigma is not None:
dataset_normalized_sigma[index_frame] = result.sigma.reshape(
data_signal.shape
)
# TODO get normalization array from distortion if possible
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
dataset_normalization,
)
def _normalize_dataset_cython(
dataset_signal: numpy.ndarray,
dataset_variance: Optional[numpy.ndarray] = None,
monitor_values: numpy.ndarray = None,
array_mask: numpy.ndarray = None,
array_flat: numpy.ndarray = None,
array_dark: numpy.ndarray = None,
array_polarization: numpy.ndarray = None,
array_solidangle: numpy.ndarray = None,
normalization_factor: float = 1.0,
Dummy: float = -10,
DDummy: float = 0.01,
datatype: str = "float32",
**kwargs,
):
from pyFAI.ext.preproc import preproc as preproc_cy
dataset_normalized_signal = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
dataset_normalization = numpy.zeros_like(dataset_signal, dtype=datatype)
preproc_params = {
"dark": array_dark,
"flat": array_flat,
"mask": array_mask,
"solidangle": array_solidangle,
"polarization": array_polarization,
"dummy": Dummy,
"delta_dummy": DDummy,
"empty": None,
}
nb_frames = len(dataset_signal)
if monitor_values is None:
monitor_values = numpy.ones(nb_frames, dtype=datatype)
elif isinstance(monitor_values, (int, float)):
monitor_values = numpy.full(nb_frames, monitor_values, dtype=datatype)
for index_frame, data_signal, monitor_value in zip(
range(nb_frames), dataset_signal, monitor_values
):
if dataset_variance is None:
dataset_normalized_signal[index_frame] = preproc_cy(
raw=data_signal,
variance=None,
normalization_factor=monitor_value / normalization_factor,
**preproc_params,
)
else:
proc_data = preproc_cy(
raw=data_signal,
variance=dataset_variance[index_frame],
normalization_factor=monitor_value / normalization_factor,
**preproc_params,
)
pp_signal = proc_data[:, :, 0] # noqa
pp_variance = proc_data[:, :, 1] # noqa
pp_normalisation = proc_data[:, :, 2] # noqa
dataset_normalized_signal[index_frame] = numexpr.evaluate(
f"where(pp_normalisation == 0.0, {Dummy}, pp_signal / pp_normalisation)"
)
dataset_normalized_variance[index_frame] = numexpr.evaluate(
f"where(pp_normalisation == 0.0, {Dummy}, pp_variance / (pp_normalisation * pp_normalisation))"
)
dataset_normalized_sigma[index_frame] = numexpr.evaluate(
f"where(pp_normalisation == 0.0, {Dummy}, sqrt(pp_variance) / abs(pp_normalisation))"
)
dataset_normalization[index_frame] = pp_normalisation
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
dataset_normalization,
)
ALGORITHMS_NORMALIZATION = {
"cython": {
"method": _normalize_dataset_cython,
"use_cupy": False,
"name": "cython",
},
"opencl": {
"method": _normalize_dataset_opencl,
"use_cupy": False,
"name": "opencl",
},
"numpy": {"method": _normalize_dataset_numpy, "use_cupy": False, "name": "numpy"},
"cupy": {"method": _normalize_dataset_cupy, "use_cupy": True, "name": "cupy"},
}
DEFAULT_ALGORITHM_NORMALIZATION = "numpy"
def _parse_norm_algorithm(algorithm: str) -> dict:
if algorithm not in ALGORITHMS_NORMALIZATION:
logger.warning(
f"Algorithm '{algorithm}' is not available. Using '{DEFAULT_ALGORITHM_NORMALIZATION}' instead."
)
algorithm = DEFAULT_ALGORITHM_NORMALIZATION
elif algorithm == "cupy" and not cupy_available():
logger.warning(
f"CuPy is not available. Using {DEFAULT_ALGORITHM_NORMALIZATION} instead."
)
algorithm = DEFAULT_ALGORITHM_NORMALIZATION
logger.debug(f"Performing normalization with algorithm: {algorithm}")
return ALGORITHMS_NORMALIZATION[algorithm]
[docs]
def normalize_dataset(
azimuthal_integrator: AzimuthalIntegrator,
dataset_signal: numpy.ndarray,
monitor_values: numpy.ndarray = None,
NormalizationFactor: float = 1.0,
filename_mask: str = None,
filename_dark: str = None,
filename_flat: str = None,
absolute_solidangle: bool = True,
correct_solidangle: bool = True,
correct_polarization: bool = True,
binning: tuple = (1, 1),
Dummy=None,
DDummy=None,
variance_formula=None,
polarization_factor=None,
polarization_axis_offset=0,
datatype="float32",
algorithm: str = "cython",
dark_filter: str = "quantil",
dark_filter_quantil_lower: float = 0.1,
dark_filter_quantil_upper: float = 0.9,
**kwargs,
):
t0 = time.perf_counter()
datatype = _parse_datatype(datatype=datatype)
data_signal_shape = azimuthal_integrator.detector.shape
algorithm = _parse_norm_algorithm(algorithm=algorithm)
use_cupy = algorithm["use_cupy"]
array_mask = get_array_mask(
filename_mask=filename_mask,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
persistent=True,
)
array_flat = get_array_flat(
filename_flat=filename_flat,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=Dummy,
delta_dummy=DDummy,
filename_mask=filename_mask,
use_cupy=use_cupy,
persistent=True,
)
array_dark = get_array_dark(
filename_dark=filename_dark,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=Dummy,
delta_dummy=DDummy,
filename_mask=filename_mask,
dark_filter=dark_filter,
dark_filter_quantil_lower=dark_filter_quantil_lower,
dark_filter_quantil_upper=dark_filter_quantil_upper,
use_cupy=use_cupy,
persistent=True,
)
array_polarization = get_array_polarization(
azimuthal_integrator=azimuthal_integrator,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis_offset,
use_cupy=use_cupy,
correct=correct_polarization,
persistent=True,
)
array_solidangle = get_array_solidangle(
azimuthal_integrator=azimuthal_integrator,
absolute=absolute_solidangle,
correct=correct_solidangle,
use_cupy=use_cupy,
persistent=True,
)
# Patch due to inconsistent chiArray and position_array methods in pyFAI
if "chi_center" in azimuthal_integrator._cached_array:
azimuthal_integrator._cached_array.pop("chi_center")
array_normalization = get_array_normalization(
azimuthal_integrator=azimuthal_integrator,
normalization_factor=NormalizationFactor,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis_offset,
correct_polarization=correct_polarization,
absolute_solidangle=absolute_solidangle,
correct_solidangle=correct_solidangle,
filename_flat=filename_flat,
filename_mask=filename_mask,
dummy=Dummy,
delta_dummy=DDummy,
binning=binning,
datatype=datatype,
use_cupy=use_cupy,
persistent=True,
)
if variance_formula and array_dark is not None:
if use_cupy and cupy_available():
dark = array_dark.get()
else:
dark = array_dark
else:
dark = None
dataset_variance = calculate_dataset_variance(
dataset_signal=dataset_signal,
variance_formula=variance_formula,
dark=dark,
)
params_normalization = {
"dataset_signal": dataset_signal,
"dataset_variance": dataset_variance,
"monitor_values": monitor_values,
"array_flat": array_flat,
"array_dark": array_dark,
"array_mask": array_mask,
"array_solidangle": array_solidangle,
"array_polarization": array_polarization,
"array_normalization": array_normalization,
"dummy": Dummy,
"delta_dummy": DDummy,
"datatype": datatype,
"normalization_factor": NormalizationFactor,
"detector_distortion": azimuthal_integrator.detector,
}
(
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
dataset_normalization,
) = algorithm["method"](
**params_normalization,
)
t1 = time.perf_counter()
logger.debug(
f"Normalization completed in {t1 - t0:.2f} seconds using {algorithm['name']} algorithm."
)
if dataset_normalized_variance is None:
dataset_normalized_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
if dataset_normalized_sigma is None:
dataset_normalized_sigma = numpy.zeros_like(dataset_signal, dtype=datatype)
return (
dataset_normalized_signal,
dataset_normalized_variance,
dataset_normalized_sigma,
dataset_normalization,
)
def _parse_datatype(datatype):
if datatype in ("float32", numpy.float32):
datatype = numpy.dtype("float32")
elif datatype in ("float64", numpy.float64):
datatype = numpy.dtype("float64")
return datatype
[docs]
def get_array_polarization(
azimuthal_integrator: AzimuthalIntegrator = None,
polarization_factor: float = None,
polarization_axis: int = 0,
use_cupy: bool = False,
correct: bool = True,
persistent: bool = True,
) -> numpy.ndarray:
"""
Generate the array of the polarization correction, eventually applying the mask and dummy pixel filtering
params:
- azimuthal_integrator (AzimuthalIntegrator): the azimuthal integrator object
- polarization_factor (float): the polarization factor
- polarization_axis (int): the polarization axis
- use_cupy (bool): if True, returns a cupy.asarray
- correct (bool): if True, applies the polarization correction
- persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes
returns:
- numpy.ndarray or cupy.ndarray: the array of the polarization correction
"""
if not persistent:
return _get_array_polarization(
azimuthal_integrator=azimuthal_integrator,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
correct=correct,
use_cupy=use_cupy,
)
return _get_persistent_array_polarization(
data_signal_shape=azimuthal_integrator.detector.shape,
dist=azimuthal_integrator.dist,
wavelength=azimuthal_integrator.wavelength,
psize_1=azimuthal_integrator.detector.pixel1,
psize_2=azimuthal_integrator.detector.pixel2,
poni1=azimuthal_integrator.poni1,
poni2=azimuthal_integrator.poni2,
rot1=azimuthal_integrator.rot1,
rot2=azimuthal_integrator.rot2,
rot3=azimuthal_integrator.rot3,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
correct=correct,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_polarization(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
polarization_factor: float = None,
polarization_axis: int = 0,
correct: bool = True,
use_cupy: bool = False,
):
logger.info(
f"No cache hit. Creating new polarization array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {polarization_factor=}, {polarization_axis=}, {use_cupy=}"
)
azimuthal_integrator = get_azimuthal_integrator(
data_signal_shape=data_signal_shape,
SampleDistance=dist,
WaveLength=wavelength,
poni1=poni1,
poni2=poni2,
PSize_1=psize_1,
PSize_2=psize_2,
DetectorRotation_1=rot2,
DetectorRotation_2=rot1,
DetectorRotation_3=rot3,
persistent=True,
)
return _get_array_polarization(
azimuthal_integrator=azimuthal_integrator,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
correct=correct,
use_cupy=use_cupy,
)
def _get_array_polarization(
azimuthal_integrator: AzimuthalIntegrator,
polarization_factor: float = None,
polarization_axis: int = 0,
correct: bool = True,
use_cupy: bool = False,
):
if not correct:
if use_cupy and cupy_available():
return cupy.ones(azimuthal_integrator.detector.shape, dtype="float32")
return numpy.ones(azimuthal_integrator.detector.shape, dtype="float32")
array_polarization = azimuthal_integrator.polarization(
factor=polarization_factor,
axis_offset=polarization_axis,
)
if use_cupy and cupy_available():
return cupy.asarray(array_polarization)
return array_polarization
[docs]
def get_array_solidangle(
azimuthal_integrator: AzimuthalIntegrator,
absolute: bool = True,
correct: bool = True,
use_cupy: bool = False,
persistent: bool = True,
) -> numpy.ndarray:
"""
Generate the solid-angle array associated to an AzimuthalIntegrator.
params:
- azimuthal_integrator (AzimuthalIntegrator): the azimuthal integrator object
- absolute (bool): if True, the solid-angle values are absolute, otherwise they are relative
- correct (bool): if True, the solid-angle array is used for correction
- use_cupy (bool): if True, the returned array is a cupy array, otherwise it is a numpy array
- persistent (bool): if True, the solid-angle array is cached for future use, otherwise it is generated on the fly for each call
returns:
- solid_angle_array (numpy.ndarray or cupy.ndarray): the solid-angle array
"""
...
if not persistent:
return _get_array_solidangle(
azimuthal_integrator=azimuthal_integrator,
absolute=absolute,
correct=correct,
use_cupy=use_cupy,
)
return _get_persistent_array_solidangle(
data_signal_shape=azimuthal_integrator.detector.shape,
dist=azimuthal_integrator.dist,
wavelength=azimuthal_integrator.wavelength,
psize_1=azimuthal_integrator.detector.pixel1,
psize_2=azimuthal_integrator.detector.pixel2,
poni1=azimuthal_integrator.poni1,
poni2=azimuthal_integrator.poni2,
rot1=azimuthal_integrator.rot1,
rot2=azimuthal_integrator.rot2,
rot3=azimuthal_integrator.rot3,
absolute=absolute,
correct=correct,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_solidangle(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
absolute: bool = True,
correct: bool = True,
use_cupy: bool = False,
) -> numpy.ndarray:
logger.info(
f"No cache hit. Creating new solid angle array with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=} {absolute=}, {correct=}, {use_cupy=}"
)
azimuthal_integrator = get_azimuthal_integrator(
data_signal_shape=data_signal_shape,
SampleDistance=dist,
WaveLength=wavelength,
poni1=poni1,
poni2=poni2,
PSize_1=psize_1,
PSize_2=psize_2,
DetectorRotation_1=rot2,
DetectorRotation_2=rot1,
DetectorRotation_3=rot3,
persistent=True,
)
return _get_array_solidangle(
azimuthal_integrator=azimuthal_integrator,
absolute=absolute,
correct=correct,
use_cupy=use_cupy,
)
def _get_array_solidangle(
azimuthal_integrator: AzimuthalIntegrator,
absolute: bool = True,
correct: bool = True,
use_cupy: bool = False,
) -> numpy.ndarray:
if not correct:
if use_cupy and cupy_available():
return cupy.ones(azimuthal_integrator.detector.shape, dtype="float32")
return numpy.ones(azimuthal_integrator.detector.shape, dtype="float32")
array_solidangle = azimuthal_integrator.solidAngleArray(absolute=absolute)
if use_cupy and cupy_available():
return cupy.asarray(array_solidangle)
return array_solidangle
[docs]
def get_array_normalization(
azimuthal_integrator: AzimuthalIntegrator,
normalization_factor: float = None,
polarization_factor: float = None,
polarization_axis: int = 0,
correct_polarization: bool = True,
absolute_solidangle: bool = True,
correct_solidangle: bool = True,
filename_flat: str = None,
filename_mask: str = None,
dummy: int = None,
delta_dummy: float = None,
binning: tuple = (1, 1),
use_cupy: bool = False,
datatype: str = "float32",
persistent: bool = True,
):
"""
Generate the array of the normalization correction, except the monitor value (per frame)
params:
- azimuthal_integrator (AzimuthalIntegrator): the azimuthal integrator object
- normalization_factor (float): the normalization factor
- polarization_factor (float): the polarization factor
- polarization_axis (int): the polarization axis
- correct_polarization (bool): if True, applies the polarization correction
- absolute_solidangle (bool): if True, applies the absolute solid-angle correction
- correct_solidangle (bool): if True, applies the solid-angle correction
- filename_flat (str): the filename of the flat-field correction
- filename_mask (str): the filename of the mask
- dummy (int): if not None, the value of the dummy pixels to filter
- delta_dummy (float): if dummy is not None, the tolerance around the dummy value
- binning (tuple): binning of the data signal, used for binning unification
- use_cupy (bool): if True, returns a cupy.asarray
- datatype (str): format of the imported array, if None, datatype is respected
- persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes
returns:
- numpy.ndarray or cupy.ndarray: the array of the normalization correction
"""
if not persistent:
return _get_array_normalization(
azimuthal_integrator=azimuthal_integrator,
normalization_factor=normalization_factor,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
correct_polarization=correct_polarization,
filename_flat=filename_flat,
filename_mask=filename_mask,
absolute_solidangle=absolute_solidangle,
correct_solidangle=correct_solidangle,
dummy=dummy,
delta_dummy=delta_dummy,
binning=binning,
use_cupy=use_cupy,
datatype=datatype,
)
return _get_persistent_array_normalization(
data_signal_shape=azimuthal_integrator.detector.shape,
dist=azimuthal_integrator.dist,
wavelength=azimuthal_integrator.wavelength,
psize_1=azimuthal_integrator.detector.pixel1,
psize_2=azimuthal_integrator.detector.pixel2,
poni1=azimuthal_integrator.poni1,
poni2=azimuthal_integrator.poni2,
rot1=azimuthal_integrator.rot1,
rot2=azimuthal_integrator.rot2,
rot3=azimuthal_integrator.rot3,
normalization_factor=normalization_factor,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
correct_polarization=correct_polarization,
filename_flat=filename_flat,
filename_mask=filename_mask,
absolute_solidangle=absolute_solidangle,
correct_solidangle=correct_solidangle,
dummy=dummy,
delta_dummy=delta_dummy,
binning=binning,
use_cupy=use_cupy,
datatype=datatype,
)
@lru_cache(maxsize=5)
def _get_persistent_array_normalization(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
normalization_factor: float = None,
polarization_factor: float = None,
polarization_axis: int = 0,
correct_polarization: bool = True,
filename_flat: str = None,
filename_mask: str = None,
absolute_solidangle: bool = True,
correct_solidangle: bool = True,
dummy: int = None,
delta_dummy: float = None,
binning: tuple = (1, 1),
use_cupy: bool = False,
datatype: str = "float32",
):
logger.info("No cache hit. Creating new normalization array.")
azimuthal_integrator = get_azimuthal_integrator(
data_signal_shape=data_signal_shape,
SampleDistance=dist,
WaveLength=wavelength,
poni1=poni1,
poni2=poni2,
PSize_1=psize_1,
PSize_2=psize_2,
DetectorRotation_1=rot2,
DetectorRotation_2=rot1,
DetectorRotation_3=rot3,
persistent=True,
)
return _get_array_normalization(
azimuthal_integrator=azimuthal_integrator,
normalization_factor=normalization_factor,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
correct_polarization=correct_polarization,
absolute_solidangle=absolute_solidangle,
correct_solidangle=correct_solidangle,
filename_flat=filename_flat,
filename_mask=filename_mask,
dummy=dummy,
delta_dummy=delta_dummy,
binning=binning,
datatype=datatype,
use_cupy=use_cupy,
)
def _get_array_normalization(
azimuthal_integrator: AzimuthalIntegrator,
normalization_factor: float = None,
polarization_factor: float = None,
polarization_axis: int = 0,
correct_polarization: bool = True,
absolute_solidangle: bool = True,
correct_solidangle: bool = True,
filename_flat: str = None,
filename_mask: str = None,
dummy: int = None,
delta_dummy: float = None,
binning: tuple = (1, 1),
datatype: str = "float32",
use_cupy: bool = False,
):
data_signal_shape = azimuthal_integrator.detector.shape
if use_cupy and cupy_available():
normalization_array = cupy.ones(
data_signal_shape,
dtype=datatype,
)
else:
normalization_array = numpy.ones(
data_signal_shape,
dtype=datatype,
)
array_polarization = get_array_polarization(
azimuthal_integrator=azimuthal_integrator,
polarization_factor=polarization_factor,
polarization_axis=polarization_axis,
use_cupy=use_cupy,
correct=correct_polarization,
persistent=True,
)
if array_polarization is not None:
normalization_array *= array_polarization
array_solidangle = get_array_solidangle(
azimuthal_integrator=azimuthal_integrator,
absolute=absolute_solidangle,
correct=correct_solidangle,
use_cupy=use_cupy,
persistent=True,
)
if array_solidangle is not None:
normalization_array *= array_solidangle
if normalization_factor is not None:
normalization_array /= normalization_factor
array_flat_field = get_array_flat(
filename_flat=filename_flat,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
use_cupy=use_cupy,
persistent=True,
)
if array_flat_field is not None:
normalization_array *= array_flat_field
array_mask = get_array_mask(
filename_mask=filename_mask,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
)
if array_mask is not None and dummy:
if use_cupy and cupy_available():
cupy.copyto(normalization_array, dummy, where=array_mask)
else:
numpy.copyto(normalization_array, dummy, where=array_mask)
return normalization_array
[docs]
def calculate_normalization_values(
monitor_values,
azimuthal_integrator=None,
psize_1=None,
psize_2=None,
dist=None,
normalization_factor=None,
):
if normalization_factor is None:
normalization_factor = 1.0
if azimuthal_integrator is not None:
psize_1 = psize_1 or azimuthal_integrator.detector.pixel1
psize_2 = psize_2 or azimuthal_integrator.detector.pixel2
dist = dist or azimuthal_integrator.dist
if monitor_values is None or psize_1 is None or psize_2 is None or dist is None:
return normalization_factor
solid_angle = psize_1 * psize_2 / dist**2
return monitor_values * solid_angle / normalization_factor