Source code for ewoksid02.utils.caving

import numpy

from .io import get_persistent_array_mask

try:
    import cupy
    from .cupyutils import log_allocated_gpu_memory
except ImportError:
    CUPY_AVAILABLE = False
    CUPY_MEM_POOL = None
    cupy = numpy
else:
    CUPY_AVAILABLE = True
    CUPY_MEM_POOL = cupy.get_default_memory_pool()
import logging

MASK_PIXELS_AVAILABLE = None

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


[docs] def get_index_shifted( data_shape: tuple, center_x: int, center_y: int, ): """Calculates the x,y index vectors, shifted to recreate the centro-symmetric pixels Inputs: - data_shape (tuple): shape of the data array (2 dimensional) - center_x (int): beam center in the first data dimension - center_y (int): beam center in the second data dimension Outputs: Tuple: pair of shifted x,y index vectors """ y, x = numpy.meshgrid( numpy.arange(data_shape[0]), numpy.arange(data_shape[1]), indexing="ij", sparse=True, ) x_ = 2 * int(center_x) - x + 1 y_ = 2 * int(center_y) - y + 1 # Think it's correct return x_, y_
[docs] def get_position_vectors( data_shape: tuple, use_cupy: bool = False, ): """Creates the x,y index vector of a 2-dimensional data array Inputs: - data_shape (tuple): shape of the data array (2 dimensional) - use_cupy (bool): transfer the vectors to the GPU, ready to process using cupy Outputs: - Tuple: pair of x-y index vector """ y_vector, x_vector = numpy.meshgrid( numpy.arange(data_shape[0]), numpy.arange(data_shape[1]), indexing="ij", sparse=True, ) if use_cupy: return cupy.asarray(x_vector), cupy.asarray(y_vector) return x_vector, y_vector
[docs] def shift_position_vectors( x_vector: numpy.ndarray, y_vector: numpy.ndarray, center_x: int, center_y: int, use_cupy: bool = False, ): """Shift a pair of index vectors, to point to centro-symmetric brothers Inputs: - x_vector (numpy.ndarray) : index vector along the first dimension of array - y_vector (numpy.ndarray) : index vector along the second dimension of array - center_x (int): beam center in the first data dimension - center_y (int): beam center in the second data dimension - use_cupy (bool): the return result is transfered to the GPU using cupy Outputs: - Tuple: pair of shifted x,y index vectors """ center_x = int(center_x) center_y = int(center_y) x_shifted = 2 * center_x - x_vector + 1 y_shifted = 2 * center_y - y_vector + 1 if use_cupy: return cupy.asarray(x_shifted), cupy.asarray(y_shifted) return x_shifted, y_shifted
[docs] def get_mask_limits( data_shape: tuple = None, center_x: int = None, center_y: int = None, x_shifted: numpy.ndarray = None, y_shifted: numpy.ndarray = None, only_x_dim: bool = False, only_y_dim: bool = False, use_cupy: bool = False, ): """ Mask limits is the primitive mask, it defines the area where the cave can be applied. It's the area of the pixels whose centro-symmetric brother falls in the detector surface. If the center of the beam matches with the center of the detector, the whole detector area can be caved. If the center falls into the edge of the detector, no pixel can be caved. Inputs: - data_shape (tuple): shape of the data array (2 dimensional) - center_x (int): beam center in the first data dimension - center_y (int): beam center in the second data dimension - x_shifted (numpy.ndarray) : shifted index vector along the first data dimension - y_shifted (numpy.ndarray) : shifted index vector along the second data dimension - only_x_dim (bool): if True, returns the mask of the first dimension limits - only_y_dim (bool): if True, returns the mask of the second dimension limits - use_cupy (bool): the return result is transfered to the GPU using cupy Outputs: - numpy.ndarray / cupy.ndarray: array to mask the pixels whose centrosymmetric brother falls out of the detector area. """ if x_shifted is None or y_shifted is None: x_vector, y_vector = get_position_vectors( data_shape=data_shape, use_cupy=use_cupy, ) x_shifted, y_shifted = shift_position_vectors( x_vector=x_vector, y_vector=y_vector, center_x=center_x, center_y=center_y, use_cupy=use_cupy, ) x_max = x_shifted.shape[-1] y_max = y_shifted.shape[0] if use_cupy: mask_limits_x_cupy = cupy.logical_and(x_shifted > 0, x_shifted < x_max) if only_x_dim: return mask_limits_x_cupy mask_limits_y_cupy = cupy.logical_and(y_shifted > 0, y_shifted < y_max) if only_y_dim: return mask_limits_y_cupy mask_limits_cupy = mask_limits_x_cupy & mask_limits_y_cupy # boolean return mask_limits_cupy mask_limits_x = numpy.logical_and(x_shifted > 0, x_shifted < x_max) if only_x_dim: return mask_limits_x mask_limits_y = numpy.logical_and(y_shifted > 0, y_shifted < y_max) if only_y_dim: return mask_limits_y mask_limits = mask_limits_x & mask_limits_y # boolean return mask_limits
def _mask_caving( data_shape: tuple, Center_1: int, Center_2: int, mask_reference: numpy.ndarray = None, vertical_symmetry: bool = True, horizontal_symmetry: bool = True, use_cupy=False, ) -> numpy.ndarray: """Data-free method, generates the mask with the pixels available. Those pixels whose intensity can be substitued by its centrosymmetric, horz-symmetric or vert-symmetric brother. It's determined by the position of the beam center, (the closer to the array center, the more pixels available), and the mask_reference, that determines which pixels will not be caved. Inputs: - data_shape (tuple): shape of the data array (2 dimensional) - Center_1 (int): beam center in the first data dimension - Center_2 (int): beam center in the second data dimension - mask_reference (numpy.ndarray): mask with the pixels whose intensity should not be used, that means that their symmetric brother should not be caved - vertical_symmetry (bool): the caving performs a vertical flip around the beam center - horizontal_symmetry (bool) : the caving performs a horizontal flip around the beam center - use_cupy (bool): returns a cupy array Outputs: - numpy.ndarray, or cupy.ndarray with the available pixels, taken into account the detector limits and the mask reference """ # Dataset-free method # With a reference, this method takes ~20 ms for all operations, < 1ms without reference y_vector, x_vector = numpy.meshgrid( numpy.arange(data_shape[0]), numpy.arange(data_shape[1]), indexing="ij", sparse=True, ) x_shifted = 2 * int(Center_1) - x_vector + 1 y_shifted = 2 * int(Center_2) - y_vector + 1 x_max = x_shifted.shape[-1] y_max = y_shifted.shape[0] if vertical_symmetry and horizontal_symmetry: # Centrosymmetric caving (standard) mask_limits = numpy.logical_and( x_shifted > 0, x_shifted < x_max ) & numpy.logical_and(y_shifted > 0, y_shifted < y_max) if mask_reference is None: mask_caving = mask_limits else: mask_caving = mask_limits ^ ( mask_limits & mask_reference[y_shifted, x_shifted] ) elif horizontal_symmetry: # Horizontal flipping around the beam mask_limits = numpy.logical_and(x_shifted > 0, x_shifted < x_max) if mask_reference is None: mask_caving = mask_limits else: mask_caving = mask_limits ^ ( mask_limits & mask_reference[y_vector, x_shifted] ) elif vertical_symmetry: # Vertical flippinf around the beam mask_limits = numpy.logical_and(y_shifted > 0, y_shifted < y_max) if mask_reference is None: mask_caving = mask_limits else: mask_caving = mask_limits ^ ( mask_limits & mask_reference[y_shifted, x_vector] ) else: mask_limits = numpy.logical_and( x_vector > 0, x_vector < x_max ) & numpy.logical_and(y_vector > 0, y_vector < y_max) if mask_reference is None: mask_caving = mask_limits else: mask_caving = mask_limits ^ ( mask_limits & mask_reference[y_vector, x_vector] ) if use_cupy: return cupy.asarray(mask_caving).astype(bool) return mask_caving.astype(bool) def _mask_to_cave( data: numpy.ndarray, Dummy: int = None, mask_static: numpy.ndarray = None ) -> numpy.ndarray: """ Generates the mask with the pixels that we want to substitute. It's determined by a dummy value and/or by a mask. Inputs: - data (numpy.ndarray): data signal, 2 or 3 dimensional - Dummy (int): pixels in data with dummy value will be replaced - mask_static (numpy.ndarray): pixels that will be replaced Outputs: - numpy.ndarray: mask whose pixels we want to replace """ # ~ 1 ms per frame if only Dummy # ~ 2 ms per frame if both dummy and mask_static # ~0 if only mask_static if Dummy and mask_static is not None: mask_to_cave = (data == Dummy) + mask_static elif Dummy: mask_to_cave = data == Dummy elif mask_static is not None: mask_to_cave = mask_static else: raise ValueError( "Caving has to be determined by a dummy value and/or a static mask" ) return mask_to_cave def _process_data_caving( data: numpy.ndarray, pixels_to_cave: numpy.ndarray, pixels_to_substitute: numpy.ndarray, x_vector: numpy.ndarray, y_vector: numpy.ndarray, Dummy: int = None, mask_dummy_pixels: bool = True, use_cupy: bool = False, ): """Replace the pixels in data according to a mask. The replaced data correspond to a specific index-transformation, determined by input vectors. Inputs: - data (numpy.ndarray): data signal, 2 or 3 dimensional - mask_to_cave (numpy.ndarray): pixels that will be masked with Dummy or caved (substituted by a symmetric brother) - x_vector (numpy.ndarray): index vector used to shift the data along the first dimension of data array - y_vector (numpy.ndarray): index vector used to shift the data along the second dimension of data array - Dummy (int): in this method, the dummy value is used to replace the pixels whose replacement falls also into the mask_caving - use_cupy (bool): to use cupy, all the inputs must be already in the GPU Outputs: - numpy.ndarray: data with replaced pixels (caved) """ # Works for both numpy, cupy, 2 or 3 dimensional, for all cavings mask_complete = pixels_to_cave & pixels_to_substitute if use_cupy: data_caved = cupy.copy(data) if mask_dummy_pixels and Dummy: cupy.copyto(data_caved, Dummy, where=pixels_to_cave) if data.ndim == 3: cupy.copyto( data_caved, data_caved[:, y_vector, x_vector], where=mask_complete ) elif data.ndim == 2: cupy.copyto(data_caved, data_caved[y_vector, x_vector], where=mask_complete) else: data_caved = numpy.copy(data) if mask_dummy_pixels and Dummy: numpy.copyto(data_caved, Dummy, where=pixels_to_cave) if data.ndim == 3: numpy.copyto( data_caved, data_caved[:, y_vector, x_vector], where=mask_complete ) elif data.ndim == 2: numpy.copyto( data_caved, data_caved[y_vector, x_vector], where=mask_complete ) return data_caved def _process_dataset_caving_numpy( dataset: numpy.ndarray, Center_1: int, Center_2: int, Dummy: int = None, mask_static: numpy.ndarray = None, mask_reference: numpy.ndarray = None, flip_caving: bool = False, flip_horizontally_preference: bool = True, **kwargs, ) -> numpy.ndarray: """Performs the caving on a 3-dimensional dataset using numpy methods Inputs: - dataset (numpy.ndarray): data signal, 3-dimensional - Center_1 (int): beam center in the first data dimension - Center_2 (int): beam center in the second data dimension - Dummy (int): pixels in data with dummy value will be replaced - mask_static (numpy.ndarray): pixels that will be replaced - mask_reference (numpy.ndarray): mask with the pixels whose intensity should not be used, that means that their symmetric brother should not be caved - flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed) - flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving - **kwargs Outputs: - numpy.ndarray: data with replaced pixels (caved) using numpy methods """ if dataset.ndim == 2: data_shape = dataset.shape dataset = dataset[numpy.newaxis, ...] elif dataset.ndim == 3: data_shape = dataset.shape[1:] y_vector, x_vector = numpy.meshgrid( numpy.arange(data_shape[0]), numpy.arange(data_shape[1]), indexing="ij", sparse=True, ) x_shifted = 2 * int(Center_1) - x_vector + 1 y_shifted = 2 * int(Center_2) - y_vector + 1 mask_pixels_available_centrosymmetric = kwargs.get( "mask_pixels_available_centrosymmetric" ) mask_pixels_available_horizontal = kwargs.get("mask_pixels_available_horizontal") mask_pixels_available_vertical = kwargs.get("mask_pixels_available_vertical") if mask_pixels_available_centrosymmetric is None: mask_pixels_available_centrosymmetric = _mask_caving( data_shape, Center_1, Center_2, mask_reference, vertical_symmetry=True, horizontal_symmetry=True, ) if flip_caving and mask_pixels_available_horizontal is None: mask_pixels_available_horizontal = _mask_caving( data_shape, Center_1, Center_2, mask_reference, vertical_symmetry=False, horizontal_symmetry=True, ) if flip_caving and mask_pixels_available_vertical is None: mask_pixels_available_vertical = _mask_caving( data_shape, Center_1, Center_2, mask_reference, vertical_symmetry=True, horizontal_symmetry=False, ) # This is the mask with the pixels that will be, either masked with Dummy or caved mask_to_cave = _mask_to_cave(data=dataset, Dummy=Dummy, mask_static=mask_static) data_caved = _process_data_caving( data=dataset, Dummy=Dummy, mask_dummy_pixels=True, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_centrosymmetric, x_vector=x_shifted, y_vector=y_shifted, use_cupy=False, ) if flip_caving and Dummy: if flip_horizontally_preference: # First horizontal caving mask_to_cave = data_caved == Dummy data_caved = _process_data_caving( data=data_caved, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_horizontal, x_vector=x_shifted, y_vector=y_vector, use_cupy=False, ) # Second vertical caving mask_to_cave = data_caved == Dummy data_caved = _process_data_caving( data=data_caved, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_vertical, x_vector=x_vector, y_vector=y_shifted, use_cupy=False, ) else: # First vertical caving mask_to_cave = data_caved == Dummy data_caved = _process_data_caving( data=data_caved, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_vertical, x_vector=x_vector, y_vector=y_shifted, use_cupy=False, ) # Second horizontal caving mask_to_cave = data_caved == Dummy data_caved = _process_data_caving( data=data_caved, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_horizontal, x_vector=x_shifted, y_vector=y_vector, use_cupy=False, ) return data_caved def _process_dataset_caving_cupy( dataset: numpy.ndarray, Center_1: int, Center_2: int, Dummy: int = None, mask_static: numpy.ndarray = None, mask_reference: numpy.ndarray = None, flip_caving: bool = False, flip_horizontally_preference: bool = True, **kwargs, ) -> numpy.ndarray: """Performs the caving on a 3-dimensional dataset using cupy methods (frame by frame) Inputs: - dataset (numpy.ndarray): data signal, 3-dimensional - Center_1 (int): beam center in the first data dimension - Center_2 (int): beam center in the second data dimension - Dummy (int): pixels in data with dummy value will be replaced - mask_static (numpy.ndarray): pixels that will be replaced - mask_reference (numpy.ndarray): mask with the pixels whose intensity should not be used, that means that their symmetric brother should not be caved - flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed) - flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving - **kwargs Outputs: - numpy.ndarray: data with replaced pixels (caved) using numpy methods """ log_allocated_gpu_memory() if dataset.ndim == 2: data_shape = dataset.shape dataset = dataset[numpy.newaxis, ...] elif dataset.ndim == 3: data_shape = dataset.shape[1:] y_vector, x_vector = numpy.meshgrid( numpy.arange(data_shape[0]), numpy.arange(data_shape[1]), indexing="ij", sparse=True, ) x_shifted = 2 * int(Center_1) - x_vector + 1 y_shifted = 2 * int(Center_2) - y_vector + 1 x_shifted_cupy = cupy.asarray(x_shifted) y_shifted_cupy = cupy.asarray(y_shifted) x_vector_cupy = cupy.asarray(x_vector) y_vector_cupy = cupy.asarray(y_vector) mask_pixels_available_centrosymmetric = kwargs.get( "mask_pixels_available_centrosymmetric" ) mask_pixels_available_horizontal = kwargs.get("mask_pixels_available_horizontal") mask_pixels_available_vertical = kwargs.get("mask_pixels_available_vertical") if mask_pixels_available_centrosymmetric is None: mask_pixels_available_centrosymmetric = _mask_caving( data_shape, Center_1, Center_2, mask_reference, vertical_symmetry=True, horizontal_symmetry=True, ) if flip_caving and mask_pixels_available_horizontal is None: mask_pixels_available_horizontal = _mask_caving( data_shape, Center_1, Center_2, mask_reference, vertical_symmetry=False, horizontal_symmetry=True, ) if flip_caving and mask_pixels_available_vertical is None: mask_pixels_available_vertical = _mask_caving( data_shape, Center_1, Center_2, mask_reference, vertical_symmetry=True, horizontal_symmetry=False, ) if mask_static is not None: mask_static_cupy = cupy.asarray(mask_static) else: mask_static_cupy = None mask_pixels_available_centrosymmetric_cupy = cupy.asarray( mask_pixels_available_centrosymmetric ) if flip_caving: mask_pixels_available_horizontal_cupy = cupy.asarray( mask_pixels_available_horizontal ) mask_pixels_available_vertical_cupy = cupy.asarray( mask_pixels_available_vertical ) else: mask_pixels_available_horizontal_cupy = None mask_pixels_available_vertical_cupy = None dataset_caved = numpy.zeros_like(dataset) for index_frame, data in enumerate(dataset): data_cupy = cupy.asarray(data) mask_to_cave = _mask_to_cave(data_cupy, Dummy, mask_static_cupy) # mask_complete = mask_to_cave & mask_pixels_available_centrosymmetric_cupy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=True, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_centrosymmetric_cupy, x_vector=x_shifted_cupy, y_vector=y_shifted_cupy, use_cupy=True, ) if flip_caving and Dummy: if flip_horizontally_preference: # First horizontal caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_horizontal_cupy, x_vector=x_shifted_cupy, y_vector=y_vector_cupy, use_cupy=True, ) # Second vertical caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_vertical_cupy, x_vector=x_vector_cupy, y_vector=y_shifted_cupy, use_cupy=True, ) else: # First vertical caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_vertical_cupy, x_vector=x_vector_cupy, y_vector=y_shifted_cupy, use_cupy=True, ) # Second horizontal caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_horizontal_cupy, x_vector=x_shifted_cupy, y_vector=y_vector_cupy, use_cupy=True, ) dataset_caved[index_frame] = data_cupy.get() return dataset_caved def _process_data_caving_cupy( data_cupy: cupy.ndarray, mask_pixels_available_centrosymmetric_cupy: cupy.ndarray, x_shifted_cupy: cupy.ndarray, y_shifted_cupy: cupy.ndarray, Dummy: int = None, mask_static_cupy: cupy.ndarray = None, flip_caving: bool = False, flip_horizontally_preference: bool = True, mask_pixels_available_horizontal_cupy: cupy.ndarray = None, mask_pixels_available_vertical_cupy: cupy.ndarray = None, x_vector_cupy: cupy.ndarray = None, y_vector_cupy: cupy.ndarray = None, ) -> cupy.ndarray: """Performs the caving on a 3-dimensional dataset using cupy methods (frame by frame) Inputs: - data_cupy (cupy.ndarray): data signal, 2-dimensional, already on GPU - mask_pixels_available_centrosymmetric_cupy (cupy.ndarray): mask with the available pixels to do centrosymmetric caving (already on GPU) - x_shifted_cupy (cupy.ndarray): shifted vector of index along the first dimension of data array, already on GPU - y_shifted_cupy (cupy.ndarray): shifted vector of index along the second dimension of data array, already on GPU - Dummy (int): pixels in data with dummy value will be replaced - mask_static_cupy (cupy.ndarray): mask with the pixels that will be replaced, already on GPU - flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed) - flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving - mask_pixels_available_horizontal_cupy (cupy.ndarray): mask with the available pixels to do horizontal-symmetric caving (already on GPU) - mask_pixels_available_vertical_cupy (cupy.ndarray): mask with the available pixels to do vertical-symmetric caving (already on GPU) - x_vector_cupy (cupy.ndarray): vector of index along the first dimension of data array, already on GPU - y_vector_cupy (cupy.ndarray): vector of index along the second dimension of data array, already on GPU Outputs: - cupy.ndarray: data with replaced pixels (caved), still on GPU """ mask_to_cave = _mask_to_cave(data_cupy, Dummy, mask_static_cupy) data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=True, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_centrosymmetric_cupy, x_vector=x_shifted_cupy, y_vector=y_shifted_cupy, use_cupy=True, ) if flip_caving and Dummy: if flip_horizontally_preference: # First horizontal caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_horizontal_cupy, x_vector=x_shifted_cupy, y_vector=y_vector_cupy, use_cupy=True, ) # Second vertical caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_vertical_cupy, x_vector=x_vector_cupy, y_vector=y_shifted_cupy, use_cupy=True, ) else: # First vertical caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_vertical_cupy, x_vector=x_vector_cupy, y_vector=y_shifted_cupy, use_cupy=True, ) # Second horizontal caving mask_to_cave = data_cupy == Dummy data_cupy = _process_data_caving( data=data_cupy, Dummy=Dummy, mask_dummy_pixels=False, pixels_to_cave=mask_to_cave, pixels_to_substitute=mask_pixels_available_horizontal_cupy, x_vector=x_shifted_cupy, y_vector=y_vector_cupy, use_cupy=True, ) return data_cupy
[docs] def process_data_caving( data: numpy.ndarray, Center_1: int, Center_2: int, Dummy: int = None, filename_mask_static: str = None, filename_mask_reference: str = None, algorithm_cave: str = "numpy", flip_caving: bool = False, flip_horizontally_preference: bool = True, **kwargs, ) -> numpy.ndarray: """Performs the caving on a 3-dimensional dataset using numpy methods Inputs: - data (numpy.ndarray): data signal, 2 or 3-dimensional - Center_1 (int): beam center in the first data dimension - Center_2 (int): beam center in the second data dimension - Dummy (int): pixels in data with dummy value will be replaced - filename_mask_static (str): path to the file with the mask whose pixels we want to replace - filename_mask_reference (str): path to the file with the mask whose pixels should not be used, that means that their symmetric brother should not be caved - algorithm (str): implementation to perform the caving: numpy or cupy - flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed) - flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving - **kwargs Outputs: - numpy.ndarray: data with replaced pixels (caved) using numpy methods """ if data.ndim == 2: data_signal_shape = data.shape data = data[numpy.newaxis, ...] elif data.ndim == 3: data_signal_shape = data.shape[1:] if algorithm_cave not in ALGORITHMS_AVAILABLE: logger.warning( f"Algorithm '{algorithm_cave}' is not available. Using '{DEFAULT_ALGORITHM}' instead." ) algorithm_cave = DEFAULT_ALGORITHM elif algorithm_cave == "cupy" and not CUPY_AVAILABLE: logger.warning(f"CuPy is not available. Using {DEFAULT_ALGORITHM} instead.") algorithm_cave = DEFAULT_ALGORITHM binning = kwargs.get("binning") use_cupy = True if algorithm_cave == "cupy" else False # Load static masks mask_static = None if filename_mask_static: mask_static = get_persistent_array_mask( filename_mask=filename_mask_static, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, ) if mask_static is not None: mask_static = mask_static.astype(bool) mask_reference = None if filename_mask_reference: mask_reference = get_persistent_array_mask( filename_mask=filename_mask_reference, data_signal_shape=data_signal_shape, binning=binning, use_cupy=False, # This reference mask won't be cupy ) if mask_reference is not None: mask_reference = mask_reference.astype(bool) # This is the mask that contains the only pixels which can be substituted, limited by the detector limits and the reference mask (it's data independent) mask_pixels_available_centrosymmetric = _mask_caving( data_signal_shape, Center_1, Center_2, mask_reference, vertical_symmetry=True, horizontal_symmetry=True, use_cupy=use_cupy, ) if flip_caving: mask_pixels_available_horizontal = _mask_caving( data_signal_shape, Center_1, Center_2, mask_reference, vertical_symmetry=False, horizontal_symmetry=True, use_cupy=use_cupy, ) mask_pixels_available_vertical = _mask_caving( data_signal_shape, Center_1, Center_2, mask_reference, vertical_symmetry=True, horizontal_symmetry=False, use_cupy=use_cupy, ) else: mask_pixels_available_horizontal = None mask_pixels_available_vertical = None params_caving = { "dataset": data, "Center_1": Center_1, "Center_2": Center_2, "Dummy": Dummy, "mask_static": mask_static, "mask_reference": mask_reference, "flip_caving": flip_caving, "flip_horizontally_preference": flip_horizontally_preference, "mask_pixels_available_centrosymmetric": mask_pixels_available_centrosymmetric, "mask_pixels_available_horizontal": mask_pixels_available_horizontal, "mask_pixels_available_vertical": mask_pixels_available_vertical, } result = ALGORITHMS_AVAILABLE[algorithm_cave]["algorithm"]( **params_caving, ) return result
ALGORITHMS_AVAILABLE = { "numpy": {"algorithm": _process_dataset_caving_numpy, "use_cupy": False}, "cupy": {"algorithm": _process_dataset_caving_cupy, "use_cupy": True}, } DEFAULT_ALGORITHM = "numpy"