Source code for ewoksid02.tasks.secondaryscatteringtask

from ewoksid02.id02_format import Path
from ewoksid02.tasks.id02processingtask import ID02ProcessingTask
from ewoksid02.utils.secondaryscattering import process_dataset_2scat
from ewoksid02.utils.mathutils import (
    is_dataset,
    get_variance_from_sigma,
)
from ewoksid02.headers import HS32

DEFAULT_WINDOW_ROI_SIZE = 120
DEFAULT_ALGORITHM_2SCAT = "cupy"


[docs] class SecondaryScatteringTask( ID02ProcessingTask, optional_input_names=[ "filename_window_wagon", "WindowRoiSize", "Dummy", "Center_1", "Center_2", "algorithm_2scat", "pre_caving", "filename_mask_static", "filename_mask_reference", "flip_caving", "save_secondary_scattering", "BSize_1", "BSize_2", ], output_names=[ "secondary_scattering", ], ): """The `SecondaryScatteringTask` class is responsible for calculating and correcting the scattering coming from the window that separates the wagon from the flying tube. It can be used to correct any scattering source that is close to the detector. Optional Inputs: - filename_window_wagon (str): Path to the mask file used for defining the scattering window WAXS pattern. - WindowRoiSize (float): Distance parameter for subdata extraction during secondary scattering correction. - Dummy (float): Value to perform a pre-caving step (to mask the detector gaps) - Center_1 (float): Beam center in the first dimension. - Center_2 (float): Beam center in the second dimension. - algorithm_2scat (str): Implementation to perform the secondary scattering correction and pre-caving (numpy or cupy). - pre_caving (bool): To perform a caving step before the correction. - filename_mask_static (str): Path to the mask file used for the caving operation. - filename_mask_reference (str): Path to the reference mask file (kind of a negative mask). - flip_caving (bool): Cave the image with its flipped projection, both horizontal and vertical. WARNING: it is physically not correct! - save_secondary_scattering (bool): Flag to save the secondary scattering dataset. Default is `False`. - BSize_1 (float): Pixel binning 1. - BSize_2 (float): Pixel binning 2. Outputs: - secondary_scattering (numpy.ndarray): Dataset with the calculated secondary scattering. """
[docs] def get_processing_parameters(self) -> dict: if not self.processing_params: processing_params = { "filename_window_wagon": self.get_input_value( "filename_window_wagon", HS32.get_mask_window(headers=self.headers), ), "WindowRoiSize": self.get_parameter( "WindowRoiSize", ), "Dummy": self.get_parameter("Dummy"), "Center_1": self.get_parameter("Center_1"), "Center_2": self.get_parameter("Center_2"), "binning": ( int(self.get_parameter("BSize_1")), int(self.get_parameter("BSize_2")), ), "algorithm_2scat": self.get_input_value( "algorithm_2scat", DEFAULT_ALGORITHM_2SCAT ), "pre_caving": self.get_input_value("pre_caving", True), "filename_mask_static": self.get_input_value( "filename_mask_static", HS32.get_mask_gaps_filename(headers=self.headers), ), "filename_mask_reference": self.get_input_value( "filename_mask_reference", HS32.get_mask_beamstop_filename(headers=self.headers), ), "flip_caving": self.get_input_value( "flip_caving", bool(self.headers.get("nw_cave_flip")), ), } self.processing_params = processing_params return self.processing_params
[docs] def process(self) -> None: processing_params = self.get_processing_parameters() dataset_variance = None if is_dataset(self.dataset_variance): dataset_variance = self.dataset_variance elif is_dataset(self.dataset_sigma): dataset_variance = get_variance_from_sigma( dataset_sigma=self.dataset_sigma, Dummy=processing_params.get("Dummy", 0.0), ) ( corrected_dataset_signal, corrected_dataset_variance, corrected_dataset_sigma, secondary_scattering, ) = process_dataset_2scat( dataset_signal=self.dataset_signal, dataset_variance=dataset_variance, **processing_params, ) if corrected_dataset_signal is None: self.outputs.dataset_signal = self.dataset_signal else: self.outputs.dataset_signal = corrected_dataset_signal if corrected_dataset_variance is None: self.outputs.dataset_variance = dataset_variance else: self.outputs.dataset_variance = corrected_dataset_variance if corrected_dataset_sigma is None: self.outputs.dataset_sigma = self.dataset_sigma else: self.outputs.dataset_sigma = corrected_dataset_sigma self.outputs.secondary_scattering = secondary_scattering
[docs] def save(self) -> None: super().save() if self.get_input_value("save_secondary_scattering", False): with self.open_id02_file( filename=self.processing_filename, mode="a", ) as processed_file: processed_file.update_dataset( added_dataset=self.outputs.secondary_scattering, h5_dataset_path=str( Path(processed_file.nxdata_path) / "secondary_scattering" ), index_read=self.index_range_last, datatype=self.get_input_value("datatype", "float32"), )