import logging
from typing import Optional
import numexpr
import numpy
import os
from pyFAI import detector_factory
from pyFAI.method_registry import IntegrationMethod, Method
from pyFAI.worker import Worker
from functools import lru_cache
from ewoksid02.utils.io import (
get_persistent_array_mask,
get_persistent_array_dark,
)
from ewoksid02.utils.normalization import AzimuthalIntegrator
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@lru_cache(maxsize=5)
def _get_persistent_detector_instance(
data_signal_shape: tuple,
psize_1: float,
psize_2: float,
) -> Optional[object]:
"""
Create a detector instance based on the dataset and pixel sizes.
Parameters:
data_signal_shape (tuple): Shape of the 2D data signal frame.
psize_1 (float): Pixel size in the first dimension.
psize_2 (float): Pixel size in the second dimension.
Returns:
Optional[object]: A detector instance or None if the dataset is None.
"""
logger.info(
f"No cache hit. Creating new detector instance with {data_signal_shape=}, {psize_1=}, {psize_2=}"
)
detector_config = {
"pixel1": psize_2,
"pixel2": psize_1,
}
detector_instance = detector_factory(name="detector", config=detector_config)
detector_instance.shape = data_signal_shape
return detector_instance
[docs]
def get_persistent_azimuthal_integrator(
data_signal_shape: tuple,
**kwargs,
):
SampleDistance = kwargs.get("SampleDistance")
Center_1 = kwargs.get("Center_1")
Center_2 = kwargs.get("Center_2")
PSize_1 = kwargs.get("PSize_1")
PSize_2 = kwargs.get("PSize_2")
BSize_1 = kwargs.get("BSize_1")
BSize_2 = kwargs.get("BSize_2")
wavelength = kwargs.get("WaveLength")
rot1 = kwargs.get("DetectorRotation_2")
rot2 = kwargs.get("DetectorRotation_1")
rot3 = kwargs.get("DetectorRotation_3")
poni1 = kwargs.get("poni1")
poni2 = kwargs.get("poni2")
if poni1 is None:
poni1 = Center_2 * PSize_2
if poni2 is None:
poni2 = Center_1 * PSize_1
if (BSize_1 is not None and BSize_2 is not None) and (BSize_1 != 1 or BSize_2 != 1):
logger.warning(f"Binning is activated in this detector: ({BSize_1}, {BSize_2})")
return _get_persistent_azimuthal_integrator(
data_signal_shape=data_signal_shape,
dist=SampleDistance,
poni1=poni1,
poni2=poni2,
psize_1=PSize_1,
psize_2=PSize_2,
rot1=rot1,
rot2=rot2,
rot3=rot3,
wavelength=wavelength,
)
@lru_cache(maxsize=5)
def _get_persistent_azimuthal_integrator(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
):
logger.info(
f"No cache hit. Creating new azimuthal integrator with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=}"
)
detector = _get_persistent_detector_instance(
data_signal_shape=data_signal_shape,
psize_1=psize_1,
psize_2=psize_2,
)
ai_kwargs = {
"dist": dist,
"poni1": poni1,
"poni2": poni2,
"rot1": rot1,
"rot2": rot2,
"rot3": rot3,
"wavelength": wavelength,
"detector": detector,
}
ai = AzimuthalIntegrator(**ai_kwargs)
ai.solidAngleArray(absolute=False)
return ai
[docs]
def get_gpu_method():
if (
IntegrationMethod.select_method(dim=1, split="full", algo="csr", impl="opencl")[
0
].impl
== "OpenCL"
):
return Method(dim=2, split="full", algo="csr", impl="opencl", target=(0, 0))
return Method(dim=2, split="full", algo="csr", impl="cython", target=None)
[docs]
def guess_npt2_rad(azimuthal_integrator: AzimuthalIntegrator) -> int:
qmax = azimuthal_integrator.array_from_unit(
typ="center", unit="q_nm^-1", scale=True
).max()
dqmin = (
azimuthal_integrator.array_from_unit(
typ="delta", unit="q_nm^-1", scale=True
).min()
* 2.0
)
return int(qmax / dqmin)
[docs]
def get_persistent_pyfai_worker(
azimuthal_integrator: AzimuthalIntegrator,
filename_mask: str = None,
integration_options: dict = {},
npt2_rad: int = None,
npt2_azim: int = None,
unit: str = None,
Dummy: int = None,
DDummy: float = None,
method: str = get_gpu_method(),
binning: tuple = (1, 1),
**kwargs,
):
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_persistent_pyfai_worker(
data_signal_shape=azimuthal_integrator.detector.shape,
dist=azimuthal_integrator.dist,
wavelength=azimuthal_integrator.wavelength,
psize_1=azimuthal_integrator.pixel1,
psize_2=azimuthal_integrator.pixel2,
poni1=azimuthal_integrator.poni1,
poni2=azimuthal_integrator.poni2,
rot1=azimuthal_integrator.rot1,
rot2=azimuthal_integrator.rot2,
rot3=azimuthal_integrator.rot3,
filename_mask=filename_mask,
mtime_mask=mtime_mask,
npt2_rad=npt2_rad,
npt2_azim=npt2_azim,
unit=unit,
Dummy=Dummy,
DDummy=DDummy,
method=method,
binning=binning,
**integration_options,
)
@lru_cache(maxsize=5)
def _get_persistent_pyfai_worker(
data_signal_shape: tuple,
dist: float,
wavelength: float,
psize_1: float,
psize_2: float,
center_1: float = None,
center_2: float = None,
poni1: float = None,
poni2: float = None,
rot1: float = 0.0,
rot2: float = 0.0,
rot3: float = 0.0,
filename_mask: str = None,
mtime_mask: float = None,
npt2_rad: int = None,
npt2_azim: int = None,
unit: str = None,
Dummy: int = None,
DDummy: float = None,
method: str = get_gpu_method(),
binning: tuple = (1, 1),
**integration_options,
):
logger.info("No cache hit. Creating new pyFAI worker.")
azimuthal_integrator = _get_persistent_azimuthal_integrator(
data_signal_shape=data_signal_shape,
dist=dist,
poni1=poni1,
poni2=poni2,
psize_1=psize_1,
psize_2=psize_2,
rot1=rot1,
rot2=rot2,
rot3=rot3,
wavelength=wavelength,
)
# integration options
applied_integration_options = {
"npt2_rad": int(npt2_rad),
"npt2_azim": int(npt2_azim),
"unit": unit,
"dummy": Dummy,
"delta_dummy": DDummy,
"method": (
(method.split, method.algo, method.impl)
if isinstance(method, (IntegrationMethod, Method))
else method
),
}
applied_integration_options.update(**integration_options)
worker = Worker(
azimuthalIntegrator=azimuthal_integrator,
shapeIn=azimuthal_integrator.detector.shape,
shapeOut=(
applied_integration_options.pop("npt2_azim"),
applied_integration_options.pop("npt2_rad"),
),
unit=applied_integration_options.pop("unit"),
dummy=applied_integration_options.pop("dummy"),
delta_dummy=applied_integration_options.pop("delta_dummy"),
method=applied_integration_options.pop("method", get_gpu_method()),
extra_options=applied_integration_options,
)
worker.correct_solid_angle = False
worker.output = "raw"
worker.error_model = "azimuthal"
worker.ai._empty = worker.dummy
array_mask = get_persistent_array_mask(
filename_mask=filename_mask,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=False,
)
if array_mask is not None:
worker.mask_image = array_mask
worker.ai.mask = array_mask
return worker
[docs]
def process_dataset_azim(
dataset_signal: numpy.ndarray,
azimuthal_integrator: AzimuthalIntegrator,
dataset_variance: numpy.ndarray = None,
filename_dark_azimuthal: str = None,
filename_mask_azimuthal: str = None,
npt2_rad: int = None,
npt2_azim: int = None,
unit: str = None,
Dummy: int = None,
DDummy: int = None,
method: str = None,
datatype: str = "float32",
**kwargs,
):
dataset_signal_azim = None
dataset_variance_azim = None
dataset_sigma_azim = None
dataset_sumsignal_azim = None
dataset_sumnorm_azim = None
dataset_sumvariance_azim = None
do_variance_formula = kwargs.get("do_variance_formula")
variance_formula = kwargs.get("variance_formula")
if dataset_variance is not None:
...
elif dataset_variance is None and do_variance_formula and variance_formula:
variance_function = numexpr.NumExpr(
variance_formula, [("data", datatype), ("dark", datatype)]
)
dark = None
if filename_dark_azimuthal:
dark = get_persistent_array_dark(
filename_dark=filename_dark_azimuthal,
data_signal_shape=dataset_signal[0].shape,
dummy=Dummy,
delta_dummy=DDummy,
**kwargs,
)
dataset_variance = numpy.zeros_like(dataset_signal, dtype=datatype)
for frame_index, data_signal in enumerate(dataset_signal):
dataset_variance[frame_index] = variance_function(
data_signal, 0 if dark is None else dark
)
worker = get_persistent_pyfai_worker(
azimuthal_integrator=azimuthal_integrator,
filename_mask=filename_mask_azimuthal,
npt2_rad=npt2_rad,
npt2_azim=npt2_azim,
unit=unit,
Dummy=Dummy,
DDummy=DDummy,
method=method,
**kwargs,
)
output_dataset_shape = (
len(dataset_signal),
worker.nbpt_azim,
worker.nbpt_rad,
)
dataset_signal_azim = numpy.zeros(output_dataset_shape, dtype=datatype)
dataset_sumsignal_azim = numpy.zeros(output_dataset_shape, dtype=datatype)
dataset_sumnorm_azim = numpy.zeros(output_dataset_shape, dtype=datatype)
if dataset_variance is not None:
dataset_variance_azim = numpy.zeros(output_dataset_shape, dtype=datatype)
dataset_sigma_azim = numpy.zeros(output_dataset_shape, dtype=datatype)
dataset_sumvariance_azim = numpy.zeros(output_dataset_shape, dtype=datatype)
logger.debug(f"PyFAI worker used method: {worker.method}")
for frame_index, data_signal in enumerate(dataset_signal):
if dataset_variance is not None:
data_variance = dataset_variance[frame_index]
else:
data_variance = None
result2d_azim = worker.process(
data=data_signal,
variance=data_variance,
)
dataset_signal_azim[frame_index] = numpy.ascontiguousarray(
result2d_azim.intensity, dtype=datatype
) # (sum_signal / sum_normalization)
dataset_sumsignal_azim[frame_index] = numpy.ascontiguousarray(
result2d_azim.sum_signal, dtype=datatype
)
dataset_sumnorm_azim[frame_index] = numpy.ascontiguousarray(
result2d_azim.sum_normalization, dtype=datatype
)
if data_variance is not None:
sum_var = numpy.ascontiguousarray( # noqa
result2d_azim.sum_variance, dtype=datatype
)
sum_norm = numpy.ascontiguousarray( # noqa
result2d_azim.sum_normalization, dtype=datatype
)
dataset_variance_azim[frame_index] = numexpr.evaluate(
f"where(sum_norm <= 0.0, {worker.dummy}, sum_var / (sum_norm * sum_norm))"
)
dataset_sigma_azim[frame_index] = numpy.ascontiguousarray(
result2d_azim.sigma, dtype=datatype
)
dataset_sumvariance_azim[frame_index] = numpy.ascontiguousarray(
result2d_azim.sum_variance, dtype=datatype
)
radial_array = result2d_azim.radial
if radial_array is not None:
radial_array = radial_array.astype(datatype)
azimuthal_array = result2d_azim.azimuthal
if azimuthal_array is not None:
azimuthal_array = azimuthal_array.astype(datatype)
return (
dataset_signal_azim,
dataset_variance_azim,
dataset_sigma_azim,
dataset_sumsignal_azim,
dataset_sumnorm_azim,
dataset_sumvariance_azim,
radial_array,
azimuthal_array,
)