Source code for ewoksid02.utils.pyfai

import logging
from typing import Optional
import numexpr
import numpy
import os
from pyFAI import detector_factory
from pyFAI.method_registry import IntegrationMethod, Method
from pyFAI.worker import Worker
from functools import lru_cache
from ewoksid02.utils.io import (
    get_persistent_array_mask,
    get_persistent_array_dark,
)
from ewoksid02.utils.normalization import AzimuthalIntegrator

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


@lru_cache(maxsize=5)
def _get_persistent_detector_instance(
    data_signal_shape: tuple,
    psize_1: float,
    psize_2: float,
) -> Optional[object]:
    """
    Create a detector instance based on the dataset and pixel sizes.

    Parameters:
        data_signal_shape (tuple): Shape of the 2D data signal frame.
        psize_1 (float): Pixel size in the first dimension.
        psize_2 (float): Pixel size in the second dimension.

    Returns:
        Optional[object]: A detector instance or None if the dataset is None.
    """
    logger.info(
        f"No cache hit. Creating new detector instance with {data_signal_shape=}, {psize_1=}, {psize_2=}"
    )
    detector_config = {
        "pixel1": psize_2,
        "pixel2": psize_1,
    }
    detector_instance = detector_factory(name="detector", config=detector_config)
    detector_instance.shape = data_signal_shape
    return detector_instance


[docs] def get_persistent_azimuthal_integrator( data_signal_shape: tuple, **kwargs, ): SampleDistance = kwargs.get("SampleDistance") Center_1 = kwargs.get("Center_1") Center_2 = kwargs.get("Center_2") PSize_1 = kwargs.get("PSize_1") PSize_2 = kwargs.get("PSize_2") BSize_1 = kwargs.get("BSize_1") BSize_2 = kwargs.get("BSize_2") wavelength = kwargs.get("WaveLength") rot1 = kwargs.get("DetectorRotation_2") rot2 = kwargs.get("DetectorRotation_1") rot3 = kwargs.get("DetectorRotation_3") poni1 = kwargs.get("poni1") poni2 = kwargs.get("poni2") if poni1 is None: poni1 = Center_2 * PSize_2 if poni2 is None: poni2 = Center_1 * PSize_1 if (BSize_1 is not None and BSize_2 is not None) and (BSize_1 != 1 or BSize_2 != 1): logger.warning(f"Binning is activated in this detector: ({BSize_1}, {BSize_2})") return _get_persistent_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=SampleDistance, poni1=poni1, poni2=poni2, psize_1=PSize_1, psize_2=PSize_2, rot1=rot1, rot2=rot2, rot3=rot3, wavelength=wavelength, )
@lru_cache(maxsize=5) def _get_persistent_azimuthal_integrator( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, ): logger.info( f"No cache hit. Creating new azimuthal integrator with {data_signal_shape=}, {dist=}, {wavelength=}, {psize_1=}, {psize_2=}, {poni1=}, {poni2=}, {rot1=}, {rot2=}, {rot3=}" ) detector = _get_persistent_detector_instance( data_signal_shape=data_signal_shape, psize_1=psize_1, psize_2=psize_2, ) ai_kwargs = { "dist": dist, "poni1": poni1, "poni2": poni2, "rot1": rot1, "rot2": rot2, "rot3": rot3, "wavelength": wavelength, "detector": detector, } ai = AzimuthalIntegrator(**ai_kwargs) ai.solidAngleArray(absolute=False) return ai
[docs] def get_gpu_method(): if ( IntegrationMethod.select_method(dim=1, split="full", algo="csr", impl="opencl")[ 0 ].impl == "OpenCL" ): return Method(dim=2, split="full", algo="csr", impl="opencl", target=(0, 0)) return Method(dim=2, split="full", algo="csr", impl="cython", target=None)
[docs] def guess_npt2_rad(azimuthal_integrator: AzimuthalIntegrator) -> int: qmax = azimuthal_integrator.array_from_unit( typ="center", unit="q_nm^-1", scale=True ).max() dqmin = ( azimuthal_integrator.array_from_unit( typ="delta", unit="q_nm^-1", scale=True ).min() * 2.0 ) return int(qmax / dqmin)
[docs] def get_persistent_pyfai_worker( azimuthal_integrator: AzimuthalIntegrator, filename_mask: str = None, integration_options: dict = {}, npt2_rad: int = None, npt2_azim: int = None, unit: str = None, Dummy: int = None, DDummy: float = None, method: str = get_gpu_method(), binning: tuple = (1, 1), **kwargs, ): if filename_mask and os.path.exists(filename_mask): mtime_mask = os.path.getmtime(filename_mask) else: mtime_mask = None filename_mask = None return _get_persistent_pyfai_worker( data_signal_shape=azimuthal_integrator.detector.shape, dist=azimuthal_integrator.dist, wavelength=azimuthal_integrator.wavelength, psize_1=azimuthal_integrator.pixel1, psize_2=azimuthal_integrator.pixel2, poni1=azimuthal_integrator.poni1, poni2=azimuthal_integrator.poni2, rot1=azimuthal_integrator.rot1, rot2=azimuthal_integrator.rot2, rot3=azimuthal_integrator.rot3, filename_mask=filename_mask, mtime_mask=mtime_mask, npt2_rad=npt2_rad, npt2_azim=npt2_azim, unit=unit, Dummy=Dummy, DDummy=DDummy, method=method, binning=binning, **integration_options, )
@lru_cache(maxsize=5) def _get_persistent_pyfai_worker( data_signal_shape: tuple, dist: float, wavelength: float, psize_1: float, psize_2: float, center_1: float = None, center_2: float = None, poni1: float = None, poni2: float = None, rot1: float = 0.0, rot2: float = 0.0, rot3: float = 0.0, filename_mask: str = None, mtime_mask: float = None, npt2_rad: int = None, npt2_azim: int = None, unit: str = None, Dummy: int = None, DDummy: float = None, method: str = get_gpu_method(), binning: tuple = (1, 1), **integration_options, ): logger.info("No cache hit. Creating new pyFAI worker.") azimuthal_integrator = _get_persistent_azimuthal_integrator( data_signal_shape=data_signal_shape, dist=dist, poni1=poni1, poni2=poni2, psize_1=psize_1, psize_2=psize_2, rot1=rot1, rot2=rot2, rot3=rot3, wavelength=wavelength, ) # integration options applied_integration_options = { "npt2_rad": int(npt2_rad), "npt2_azim": int(npt2_azim), "unit": unit, "dummy": Dummy, "delta_dummy": DDummy, "method": ( (method.split, method.algo, method.impl) if isinstance(method, (IntegrationMethod, Method)) else method ), } applied_integration_options.update(**integration_options) worker = Worker( azimuthalIntegrator=azimuthal_integrator, shapeIn=azimuthal_integrator.detector.shape, shapeOut=( applied_integration_options.pop("npt2_azim"), applied_integration_options.pop("npt2_rad"), ), unit=applied_integration_options.pop("unit"), dummy=applied_integration_options.pop("dummy"), delta_dummy=applied_integration_options.pop("delta_dummy"), method=applied_integration_options.pop("method", get_gpu_method()), extra_options=applied_integration_options, ) worker.correct_solid_angle = False worker.output = "raw" worker.error_model = "azimuthal" worker.ai._empty = worker.dummy array_mask = get_persistent_array_mask( filename_mask=filename_mask, data_signal_shape=data_signal_shape, binning=binning, use_cupy=False, ) if array_mask is not None: worker.mask_image = array_mask worker.ai.mask = array_mask return worker
[docs] def process_dataset_azim( dataset_signal: numpy.ndarray, azimuthal_integrator: AzimuthalIntegrator, dataset_variance: numpy.ndarray = None, filename_dark_azimuthal: str = None, filename_mask_azimuthal: str = None, npt2_rad: int = None, npt2_azim: int = None, unit: str = None, Dummy: int = None, DDummy: int = None, method: str = None, datatype: str = "float32", **kwargs, ): dataset_signal_azim = None dataset_variance_azim = None dataset_sigma_azim = None dataset_sumsignal_azim = None dataset_sumnorm_azim = None dataset_sumvariance_azim = None do_variance_formula = kwargs.get("do_variance_formula") variance_formula = kwargs.get("variance_formula") if dataset_variance is not None: ... elif dataset_variance is None and do_variance_formula and variance_formula: variance_function = numexpr.NumExpr( variance_formula, [("data", datatype), ("dark", datatype)] ) dark = None if filename_dark_azimuthal: dark = get_persistent_array_dark( filename_dark=filename_dark_azimuthal, data_signal_shape=dataset_signal[0].shape, dummy=Dummy, delta_dummy=DDummy, **kwargs, ) dataset_variance = numpy.zeros_like(dataset_signal, dtype=datatype) for frame_index, data_signal in enumerate(dataset_signal): dataset_variance[frame_index] = variance_function( data_signal, 0 if dark is None else dark ) worker = get_persistent_pyfai_worker( azimuthal_integrator=azimuthal_integrator, filename_mask=filename_mask_azimuthal, npt2_rad=npt2_rad, npt2_azim=npt2_azim, unit=unit, Dummy=Dummy, DDummy=DDummy, method=method, **kwargs, ) output_dataset_shape = ( len(dataset_signal), worker.nbpt_azim, worker.nbpt_rad, ) dataset_signal_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sumsignal_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sumnorm_azim = numpy.zeros(output_dataset_shape, dtype=datatype) if dataset_variance is not None: dataset_variance_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sigma_azim = numpy.zeros(output_dataset_shape, dtype=datatype) dataset_sumvariance_azim = numpy.zeros(output_dataset_shape, dtype=datatype) logger.debug(f"PyFAI worker used method: {worker.method}") for frame_index, data_signal in enumerate(dataset_signal): if dataset_variance is not None: data_variance = dataset_variance[frame_index] else: data_variance = None result2d_azim = worker.process( data=data_signal, variance=data_variance, ) dataset_signal_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.intensity, dtype=datatype ) # (sum_signal / sum_normalization) dataset_sumsignal_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sum_signal, dtype=datatype ) dataset_sumnorm_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sum_normalization, dtype=datatype ) if data_variance is not None: sum_var = numpy.ascontiguousarray( # noqa result2d_azim.sum_variance, dtype=datatype ) sum_norm = numpy.ascontiguousarray( # noqa result2d_azim.sum_normalization, dtype=datatype ) dataset_variance_azim[frame_index] = numexpr.evaluate( f"where(sum_norm <= 0.0, {worker.dummy}, sum_var / (sum_norm * sum_norm))" ) dataset_sigma_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sigma, dtype=datatype ) dataset_sumvariance_azim[frame_index] = numpy.ascontiguousarray( result2d_azim.sum_variance, dtype=datatype ) radial_array = result2d_azim.radial if radial_array is not None: radial_array = radial_array.astype(datatype) azimuthal_array = result2d_azim.azimuthal if azimuthal_array is not None: azimuthal_array = azimuthal_array.astype(datatype) return ( dataset_signal_azim, dataset_variance_azim, dataset_sigma_azim, dataset_sumsignal_azim, dataset_sumnorm_azim, dataset_sumvariance_azim, radial_array, azimuthal_array, )