Source code for ewoksid02.utils.pyfai

import logging
import os
from functools import lru_cache
from typing import Optional

import numexpr
import numpy
from pyFAI import detector_factory
from pyFAI.method_registry import IntegrationMethod, Method
from pyFAI.units import to_unit  # noqa
from pyFAI.version import MAJOR
from pyFAI.worker import Worker

if MAJOR >= 2025:
    from pyFAI.integrator.azimuthal import AzimuthalIntegrator
else:
    from pyFAI.azimuthalIntegrator import AzimuthalIntegrator

from ewoksid02.utils.io import get_array_dark, get_array_mask

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


[docs] def get_detector_instance( data_signal_shape: tuple, PSize_1: float, PSize_2: float, persistent: bool = True, ) -> Optional[object]: """ Create a detector instance based on the dataset and pixel sizes. Parameters: data_signal_shape (tuple): Shape of the 2D data signal frame. PSize_1 (float): Pixel size in the first dimension. PSize_2 (float): Pixel size in the second dimension. Returns: Optional[object]: A detector instance or None if the dataset is None. """ if not persistent: return _get_detector_instance( data_signal_shape=data_signal_shape, PSize_1=PSize_1, PSize_2=PSize_2, ) return _get_persistent_detector_instance( data_signal_shape=data_signal_shape, PSize_1=PSize_1, PSize_2=PSize_2, )
@lru_cache(maxsize=5) def _get_persistent_detector_instance( data_signal_shape: tuple, PSize_1: float, PSize_2: float, ) -> Optional[object]: logger.info( f"No cache hit. Creating new detector instance with {data_signal_shape=}, {PSize_1=}, {PSize_2=}" ) return _get_detector_instance( data_signal_shape=data_signal_shape, PSize_1=PSize_1, PSize_2=PSize_2, ) def _get_detector_instance( data_signal_shape: tuple, PSize_1: float, PSize_2: float, ) -> Optional[object]: if data_signal_shape is None: return None detector_config = { "pixel1": PSize_2, "pixel2": PSize_1, } detector_instance = detector_factory(name="detector", config=detector_config) detector_instance.shape = data_signal_shape return detector_instance
[docs] def get_azimuthal_integrator( data_signal_shape: tuple, SampleDistance: float, WaveLength: float, PSize_1: float = None, PSize_2: float = None, Center_1: float = None, Center_2: float = None, poni1: float = None, poni2: float = None, DetectorRotation_1: float = 0.0, DetectorRotation_2: float = 0.0, DetectorRotation_3: float = 0.0, persistent: bool = True, ) -> AzimuthalIntegrator: if poni1 is None: poni1 = Center_2 * PSize_2 if poni2 is None: poni2 = Center_1 * PSize_1 if not persistent: return _get_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=SampleDistance, wavelength=WaveLength, PSize_1=PSize_1, PSize_2=PSize_2, poni1=poni1, poni2=poni2, rot1=DetectorRotation_2, rot2=DetectorRotation_1, rot3=DetectorRotation_3, ) return _get_persistent_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=SampleDistance, wavelength=WaveLength, psize_1=PSize_1, psize_2=PSize_2, poni1=poni1, poni2=poni2, rot1=DetectorRotation_2, rot2=DetectorRotation_1, rot3=DetectorRotation_3, )
@lru_cache(maxsize=5) def _get_persistent_azimuthal_integrator( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, ): logger.info( f"No cache hit. Creating new azimuthal integrator with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=}" ) return _get_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=dist, wavelength=wavelength, PSize_1=psize_1, PSize_2=psize_2, poni1=poni1, poni2=poni2, rot1=rot1, rot2=rot2, rot3=rot3, ) def _get_azimuthal_integrator( data_signal_shape: tuple, dist: float, wavelength: float, PSize_1: float, PSize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, ): detector = get_detector_instance( data_signal_shape=data_signal_shape, PSize_1=PSize_1, PSize_2=PSize_2, persistent=True, ) ai_kwargs = { "dist": dist, "poni1": poni1, "poni2": poni2, "rot1": rot1, "rot2": rot2, "rot3": rot3, "wavelength": wavelength, "detector": detector, } ai = AzimuthalIntegrator(**ai_kwargs) ai.solidAngleArray(absolute=False) return ai
[docs] def get_gpu_method(): if ( IntegrationMethod.select_method(dim=1, split="full", algo="csr", impl="opencl")[ 0 ].impl == "OpenCL" ): return Method(dim=2, split="full", algo="csr", impl="opencl", target=(0, 0)) return Method(dim=2, split="full", algo="csr", impl="cython", target=None)
[docs] def guess_npt2_rad(azimuthal_integrator: AzimuthalIntegrator) -> int: qmax = azimuthal_integrator.array_from_unit( typ="center", unit="q_nm^-1", scale=True ).max() dqmin = ( azimuthal_integrator.array_from_unit( typ="delta", unit="q_nm^-1", scale=True ).min() * 2.0 ) return int(qmax / dqmin)
[docs] def get_persistent_pyfai_worker( azimuthal_integrator: AzimuthalIntegrator, filename_mask: str = None, integration_options: dict = {}, npt2_rad: int = None, npt2_azim: int = None, unit: str = None, Dummy: int = None, DDummy: float = None, method: str = get_gpu_method(), binning: tuple = (1, 1), **kwargs, ): if filename_mask and os.path.exists(filename_mask): mtime_mask = os.path.getmtime(filename_mask) else: mtime_mask = None filename_mask = None return _get_persistent_pyfai_worker( data_signal_shape=azimuthal_integrator.detector.shape, dist=azimuthal_integrator.dist, wavelength=azimuthal_integrator.wavelength, psize_1=azimuthal_integrator.pixel1, psize_2=azimuthal_integrator.pixel2, poni1=azimuthal_integrator.poni1, poni2=azimuthal_integrator.poni2, rot1=azimuthal_integrator.rot1, rot2=azimuthal_integrator.rot2, rot3=azimuthal_integrator.rot3, filename_mask=filename_mask, mtime_mask=mtime_mask, npt2_rad=npt2_rad, npt2_azim=npt2_azim, unit=unit, Dummy=Dummy, DDummy=DDummy, method=method, binning=binning, **integration_options, )
@lru_cache(maxsize=5) def _get_persistent_pyfai_worker( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, center_1: float = None, center_2: float = None, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, filename_mask: str = None, mtime_mask: float = None, npt2_rad: int = None, npt2_azim: int = None, unit: str = None, Dummy: int = None, DDummy: float = None, method: str = get_gpu_method(), binning: tuple = (1, 1), **integration_options, ): logger.info("No cache hit. Creating new pyFAI worker.") azimuthal_integrator = _get_persistent_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=dist, poni1=poni1, poni2=poni2, psize_1=psize_1, psize_2=psize_2, rot1=rot1, rot2=rot2, rot3=rot3, wavelength=wavelength, ) # integration options applied_integration_options = { "npt2_rad": int(npt2_rad), "npt2_azim": int(npt2_azim), "unit": unit, "dummy": Dummy, "delta_dummy": DDummy, "method": ( (method.split, method.algo, method.impl) if isinstance(method, (IntegrationMethod, Method)) else method ), } applied_integration_options.update(**integration_options) worker = Worker( azimuthalIntegrator=azimuthal_integrator, shapeIn=azimuthal_integrator.detector.shape, shapeOut=( applied_integration_options.pop("npt2_azim"), applied_integration_options.pop("npt2_rad"), ), unit=applied_integration_options.pop("unit"), dummy=applied_integration_options.pop("dummy"), delta_dummy=applied_integration_options.pop("delta_dummy"), method=applied_integration_options.pop("method", get_gpu_method()), extra_options=applied_integration_options, ) worker.correct_solid_angle = False worker.output = "raw" worker.error_model = "azimuthal" worker.ai._empty = worker.dummy array_mask = get_array_mask( filename_mask=filename_mask, data_signal_shape=data_signal_shape, binning=binning, use_cupy=False, persistent=True, ) if array_mask is not None: worker.mask_image = array_mask worker.ai.mask = array_mask return worker
[docs] def process_dataset_azim( dataset_signal: numpy.ndarray, azimuthal_integrator: AzimuthalIntegrator, dataset_variance: numpy.ndarray = None, filename_dark_azimuthal: str = None, filename_mask_azimuthal: str = None, npt2_rad: int = None, npt2_azim: int = None, unit: str = None, Dummy: int = None, DDummy: int = None, method: str = None, datatype: str = "float32", **kwargs, ): dataset_signal_azim = None dataset_variance_azim = None dataset_sigma_azim = None dataset_sumsignal_azim = None dataset_sumnorm_azim = None dataset_sumvariance_azim = None do_variance_formula = kwargs.get("do_variance_formula") variance_formula = kwargs.get("variance_formula") if dataset_variance is not None: ... elif dataset_variance is None and do_variance_formula and variance_formula: variance_function = numexpr.NumExpr( variance_formula, [("data", datatype), ("dark", datatype)] ) dark = None if filename_dark_azimuthal: dark = get_array_dark( filename_dark=filename_dark_azimuthal, data_signal_shape=dataset_signal[0].shape, dummy=Dummy, delta_dummy=DDummy, **kwargs, ) dataset_variance = numpy.zeros_like(dataset_signal, dtype=datatype) for frame_index, data_signal in enumerate(dataset_signal): dataset_variance[frame_index] = variance_function( data_signal, 0 if dark is None else dark ) worker = get_persistent_pyfai_worker( azimuthal_integrator=azimuthal_integrator, filename_mask=filename_mask_azimuthal, npt2_rad=npt2_rad, npt2_azim=npt2_azim, unit=unit, Dummy=Dummy, DDummy=DDummy, method=method, **kwargs, ) output_dataset_shape = ( len(dataset_signal), worker.nbpt_azim, worker.nbpt_rad, ) dataset_signal_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sumsignal_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sumnorm_azim = numpy.zeros(output_dataset_shape, dtype=datatype) if dataset_variance is not None: dataset_variance_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sigma_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sumvariance_azim = numpy.zeros(output_dataset_shape, dtype=datatype) logger.debug(f"PyFAI worker used method: {worker.method}") for frame_index, data_signal in enumerate(dataset_signal): if dataset_variance is not None: data_variance = dataset_variance[frame_index] else: data_variance = None result2d_azim = worker.process( data=data_signal, variance=data_variance, ) dataset_signal_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.intensity, dtype=datatype ) # (sum_signal / sum_normalization) dataset_sumsignal_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sum_signal, dtype=datatype ) # The way we do, result.sum_normalization == result.count, as the only normalization left is the number of pixels contributing to each bin dataset_sumnorm_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sum_normalization, dtype=datatype ) if data_variance is not None: sum_var = numpy.ascontiguousarray( # noqa result2d_azim.sum_variance, dtype=datatype ) sum_norm = numpy.ascontiguousarray( # noqa result2d_azim.sum_normalization, dtype=datatype ) dataset_variance_azim[frame_index] = numexpr.evaluate( f"where(sum_norm <= 0.0, {worker.dummy}, sum_var / (sum_norm * sum_norm))" ) dataset_sigma_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sigma, dtype=datatype ) dataset_sumvariance_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sum_variance, dtype=datatype ) radial_array = result2d_azim.radial if radial_array is not None: radial_array = radial_array.astype(datatype) azimuthal_array = result2d_azim.azimuthal if azimuthal_array is not None: azimuthal_array = azimuthal_array.astype(datatype) return ( dataset_signal_azim, dataset_variance_azim, dataset_sigma_azim, dataset_sumsignal_azim, dataset_sumnorm_azim, dataset_sumvariance_azim, radial_array, azimuthal_array, )