from __future__ import annotations
import logging
import numpy
from .utils.gpu import cupy, cupy_available, log_allocated_gpu_memory
from .utils.io import get_array_mask
MASK_PIXELS_AVAILABLE = None
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
[docs]
def get_index_shifted(
data_shape: tuple,
center_x: int,
center_y: int,
):
"""Calculates the x,y index vectors, shifted to recreate the centro-symmetric pixels
Inputs:
- data_shape (tuple): shape of the data array (2 dimensional)
- center_x (int): beam center in the first data dimension
- center_y (int): beam center in the second data dimension
Outputs:
Tuple: pair of shifted x,y index vectors
"""
y, x = numpy.meshgrid(
numpy.arange(data_shape[0]),
numpy.arange(data_shape[1]),
indexing="ij",
sparse=True,
)
x_ = 2 * int(center_x) - x + 1
y_ = 2 * int(center_y) - y + 1
# Think it's correct
return x_, y_
[docs]
def get_position_vectors(
data_shape: tuple,
use_cupy: bool = False,
):
"""Creates the x,y index vector of a 2-dimensional data array
Inputs:
- data_shape (tuple): shape of the data array (2 dimensional)
- use_cupy (bool): transfer the vectors to the GPU, ready to process using cupy
Outputs:
- Tuple: pair of x-y index vector
"""
y_vector, x_vector = numpy.meshgrid(
numpy.arange(data_shape[0]),
numpy.arange(data_shape[1]),
indexing="ij",
sparse=True,
)
if use_cupy:
return cupy.asarray(x_vector), cupy.asarray(y_vector)
return x_vector, y_vector
[docs]
def shift_position_vectors(
x_vector: numpy.ndarray,
y_vector: numpy.ndarray,
center_x: int,
center_y: int,
use_cupy: bool = False,
):
"""Shift a pair of index vectors, to point to centro-symmetric brothers
Inputs:
- x_vector (numpy.ndarray) : index vector along the first dimension of array
- y_vector (numpy.ndarray) : index vector along the second dimension of array
- center_x (int): beam center in the first data dimension
- center_y (int): beam center in the second data dimension
- use_cupy (bool): the return result is transfered to the GPU using cupy
Outputs:
- Tuple: pair of shifted x,y index vectors
"""
center_x = int(center_x)
center_y = int(center_y)
x_shifted = 2 * center_x - x_vector + 1
y_shifted = 2 * center_y - y_vector + 1
if use_cupy:
return cupy.asarray(x_shifted), cupy.asarray(y_shifted)
return x_shifted, y_shifted
[docs]
def get_mask_limits(
data_shape: tuple = None,
center_x: int = None,
center_y: int = None,
x_shifted: numpy.ndarray = None,
y_shifted: numpy.ndarray = None,
only_x_dim: bool = False,
only_y_dim: bool = False,
use_cupy: bool = False,
):
"""
Mask limits is the primitive mask, it defines the area where the cave can be applied.
It's the area of the pixels whose centro-symmetric brother falls in the detector surface.
If the center of the beam matches with the center of the detector, the whole detector area can be caved.
If the center falls into the edge of the detector, no pixel can be caved.
Inputs:
- data_shape (tuple): shape of the data array (2 dimensional)
- center_x (int): beam center in the first data dimension
- center_y (int): beam center in the second data dimension
- x_shifted (numpy.ndarray) : shifted index vector along the first data dimension
- y_shifted (numpy.ndarray) : shifted index vector along the second data dimension
- only_x_dim (bool): if True, returns the mask of the first dimension limits
- only_y_dim (bool): if True, returns the mask of the second dimension limits
- use_cupy (bool): the return result is transfered to the GPU using cupy
Outputs:
- numpy.ndarray / cupy.ndarray: array to mask the pixels whose centrosymmetric brother falls out of the detector area.
"""
if x_shifted is None or y_shifted is None:
x_vector, y_vector = get_position_vectors(
data_shape=data_shape,
use_cupy=use_cupy,
)
x_shifted, y_shifted = shift_position_vectors(
x_vector=x_vector,
y_vector=y_vector,
center_x=center_x,
center_y=center_y,
use_cupy=use_cupy,
)
x_max = x_shifted.shape[-1]
y_max = y_shifted.shape[0]
if use_cupy:
mask_limits_x_cupy = cupy.logical_and(x_shifted > 0, x_shifted < x_max)
if only_x_dim:
return mask_limits_x_cupy
mask_limits_y_cupy = cupy.logical_and(y_shifted > 0, y_shifted < y_max)
if only_y_dim:
return mask_limits_y_cupy
mask_limits_cupy = mask_limits_x_cupy & mask_limits_y_cupy # boolean
return mask_limits_cupy
mask_limits_x = numpy.logical_and(x_shifted > 0, x_shifted < x_max)
if only_x_dim:
return mask_limits_x
mask_limits_y = numpy.logical_and(y_shifted > 0, y_shifted < y_max)
if only_y_dim:
return mask_limits_y
mask_limits = mask_limits_x & mask_limits_y # boolean
return mask_limits
def _mask_caving(
data_shape: tuple,
Center_1: int,
Center_2: int,
mask_reference: numpy.ndarray = None,
vertical_symmetry: bool = True,
horizontal_symmetry: bool = True,
use_cupy=False,
) -> numpy.ndarray:
"""Data-free method, generates the mask with the pixels available. Those pixels whose intensity can be substitued by its centrosymmetric, horz-symmetric or vert-symmetric brother.
It's determined by the position of the beam center, (the closer to the array center, the more pixels available), and the mask_reference, that determines which pixels will not be caved.
Inputs:
- data_shape (tuple): shape of the data array (2 dimensional)
- Center_1 (int): beam center in the first data dimension
- Center_2 (int): beam center in the second data dimension
- mask_reference (numpy.ndarray): mask with the pixels whose intensity should not be used, that means that their symmetric brother should not be caved
- vertical_symmetry (bool): the caving performs a vertical flip around the beam center
- horizontal_symmetry (bool) : the caving performs a horizontal flip around the beam center
- use_cupy (bool): returns a cupy array
Outputs:
- numpy.ndarray, or cupy.ndarray with the available pixels, taken into account the detector limits and the mask reference
"""
# Dataset-free method
# With a reference, this method takes ~20 ms for all operations, < 1ms without reference
y_vector, x_vector = numpy.meshgrid(
numpy.arange(data_shape[0]),
numpy.arange(data_shape[1]),
indexing="ij",
sparse=True,
)
x_shifted = 2 * int(Center_1) - x_vector + 1
y_shifted = 2 * int(Center_2) - y_vector + 1
x_max = x_shifted.shape[-1]
y_max = y_shifted.shape[0]
if vertical_symmetry and horizontal_symmetry:
# Centrosymmetric caving (standard)
mask_limits = numpy.logical_and(
x_shifted > 0, x_shifted < x_max
) & numpy.logical_and(y_shifted > 0, y_shifted < y_max)
if mask_reference is None:
mask_caving = mask_limits
else:
mask_caving = mask_limits ^ (
mask_limits & mask_reference[y_shifted, x_shifted]
)
elif horizontal_symmetry:
# Horizontal flipping around the beam
mask_limits = numpy.logical_and(x_shifted > 0, x_shifted < x_max)
if mask_reference is None:
mask_caving = mask_limits
else:
mask_caving = mask_limits ^ (
mask_limits & mask_reference[y_vector, x_shifted]
)
elif vertical_symmetry:
# Vertical flippinf around the beam
mask_limits = numpy.logical_and(y_shifted > 0, y_shifted < y_max)
if mask_reference is None:
mask_caving = mask_limits
else:
mask_caving = mask_limits ^ (
mask_limits & mask_reference[y_shifted, x_vector]
)
else:
mask_limits = numpy.logical_and(
x_vector > 0, x_vector < x_max
) & numpy.logical_and(y_vector > 0, y_vector < y_max)
if mask_reference is None:
mask_caving = mask_limits
else:
mask_caving = mask_limits ^ (
mask_limits & mask_reference[y_vector, x_vector]
)
if use_cupy:
return cupy.asarray(mask_caving).astype(bool)
return mask_caving.astype(bool)
def _mask_to_cave(
data: numpy.ndarray, Dummy: int = None, mask_static: numpy.ndarray = None
) -> numpy.ndarray:
"""
Generates the mask with the pixels that we want to substitute. It's determined by a dummy value and/or by a mask.
Inputs:
- data (numpy.ndarray): data signal, 2 or 3 dimensional
- Dummy (int): pixels in data with dummy value will be replaced
- mask_static (numpy.ndarray): pixels that will be replaced
Outputs:
- numpy.ndarray: mask whose pixels we want to replace
"""
# ~ 1 ms per frame if only Dummy
# ~ 2 ms per frame if both dummy and mask_static
# ~0 if only mask_static
if Dummy and mask_static is not None:
mask_to_cave = (data == Dummy) + mask_static
elif Dummy:
mask_to_cave = data == Dummy
elif mask_static is not None:
mask_to_cave = mask_static
else:
raise ValueError(
"Caving has to be determined by a dummy value and/or a static mask"
)
return mask_to_cave
def _process_data_caving(
data: numpy.ndarray,
pixels_to_cave: numpy.ndarray,
pixels_to_substitute: numpy.ndarray,
x_vector: numpy.ndarray,
y_vector: numpy.ndarray,
Dummy: int = None,
mask_dummy_pixels: bool = True,
use_cupy: bool = False,
):
"""Replace the pixels in data according to a mask. The replaced data correspond to a specific index-transformation, determined by input vectors.
Inputs:
- data (numpy.ndarray): data signal, 2 or 3 dimensional
- mask_to_cave (numpy.ndarray): pixels that will be masked with Dummy or caved (substituted by a symmetric brother)
- x_vector (numpy.ndarray): index vector used to shift the data along the first dimension of data array
- y_vector (numpy.ndarray): index vector used to shift the data along the second dimension of data array
- Dummy (int): in this method, the dummy value is used to replace the pixels whose replacement falls also into the mask_caving
- use_cupy (bool): to use cupy, all the inputs must be already in the GPU
Outputs:
- numpy.ndarray: data with replaced pixels (caved)
"""
# Works for both numpy, cupy, 2 or 3 dimensional, for all cavings
mask_complete = pixels_to_cave & pixels_to_substitute
if use_cupy:
data_caved = cupy.copy(data)
if mask_dummy_pixels and Dummy:
cupy.copyto(data_caved, Dummy, where=pixels_to_cave)
if data.ndim == 3:
cupy.copyto(
data_caved, data_caved[:, y_vector, x_vector], where=mask_complete
)
elif data.ndim == 2:
cupy.copyto(data_caved, data_caved[y_vector, x_vector], where=mask_complete)
else:
data_caved = numpy.copy(data)
if mask_dummy_pixels and Dummy:
numpy.copyto(data_caved, Dummy, where=pixels_to_cave)
if data.ndim == 3:
numpy.copyto(
data_caved, data_caved[:, y_vector, x_vector], where=mask_complete
)
elif data.ndim == 2:
numpy.copyto(
data_caved, data_caved[y_vector, x_vector], where=mask_complete
)
return data_caved
def _process_dataset_caving_numpy(
dataset: numpy.ndarray,
Center_1: int,
Center_2: int,
Dummy: int = None,
mask_static: numpy.ndarray = None,
mask_reference: numpy.ndarray = None,
flip_caving: bool = False,
flip_horizontally_preference: bool = True,
**kwargs,
) -> numpy.ndarray:
"""Performs the caving on a 3-dimensional dataset using numpy methods
Inputs:
- dataset (numpy.ndarray): data signal, 3-dimensional
- Center_1 (int): beam center in the first data dimension
- Center_2 (int): beam center in the second data dimension
- Dummy (int): pixels in data with dummy value will be replaced
- mask_static (numpy.ndarray): pixels that will be replaced
- mask_reference (numpy.ndarray): mask with the pixels whose intensity should not be used, that means that their symmetric brother should not be caved
- flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed)
- flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving
- **kwargs
Outputs:
- numpy.ndarray: data with replaced pixels (caved) using numpy methods
"""
if dataset.ndim == 2:
data_shape = dataset.shape
dataset = dataset[numpy.newaxis, ...]
elif dataset.ndim == 3:
data_shape = dataset.shape[1:]
y_vector, x_vector = numpy.meshgrid(
numpy.arange(data_shape[0]),
numpy.arange(data_shape[1]),
indexing="ij",
sparse=True,
)
x_shifted = 2 * int(Center_1) - x_vector + 1
y_shifted = 2 * int(Center_2) - y_vector + 1
mask_pixels_available_centrosymmetric = kwargs.get(
"mask_pixels_available_centrosymmetric"
)
mask_pixels_available_horizontal = kwargs.get("mask_pixels_available_horizontal")
mask_pixels_available_vertical = kwargs.get("mask_pixels_available_vertical")
if mask_pixels_available_centrosymmetric is None:
mask_pixels_available_centrosymmetric = _mask_caving(
data_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=True,
horizontal_symmetry=True,
)
if flip_caving and mask_pixels_available_horizontal is None:
mask_pixels_available_horizontal = _mask_caving(
data_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=False,
horizontal_symmetry=True,
)
if flip_caving and mask_pixels_available_vertical is None:
mask_pixels_available_vertical = _mask_caving(
data_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=True,
horizontal_symmetry=False,
)
# This is the mask with the pixels that will be, either masked with Dummy or caved
mask_to_cave = _mask_to_cave(data=dataset, Dummy=Dummy, mask_static=mask_static)
data_caved = _process_data_caving(
data=dataset,
Dummy=Dummy,
mask_dummy_pixels=True,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_centrosymmetric,
x_vector=x_shifted,
y_vector=y_shifted,
use_cupy=False,
)
if flip_caving and Dummy:
if flip_horizontally_preference:
# First horizontal caving
mask_to_cave = data_caved == Dummy
data_caved = _process_data_caving(
data=data_caved,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_horizontal,
x_vector=x_shifted,
y_vector=y_vector,
use_cupy=False,
)
# Second vertical caving
mask_to_cave = data_caved == Dummy
data_caved = _process_data_caving(
data=data_caved,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_vertical,
x_vector=x_vector,
y_vector=y_shifted,
use_cupy=False,
)
else:
# First vertical caving
mask_to_cave = data_caved == Dummy
data_caved = _process_data_caving(
data=data_caved,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_vertical,
x_vector=x_vector,
y_vector=y_shifted,
use_cupy=False,
)
# Second horizontal caving
mask_to_cave = data_caved == Dummy
data_caved = _process_data_caving(
data=data_caved,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_horizontal,
x_vector=x_shifted,
y_vector=y_vector,
use_cupy=False,
)
return data_caved
def _process_dataset_caving_cupy(
dataset: numpy.ndarray,
Center_1: int,
Center_2: int,
Dummy: int = None,
mask_static: numpy.ndarray = None,
mask_reference: numpy.ndarray = None,
flip_caving: bool = False,
flip_horizontally_preference: bool = True,
**kwargs,
) -> numpy.ndarray:
"""Performs the caving on a 3-dimensional dataset using cupy methods (frame by frame)
Inputs:
- dataset (numpy.ndarray): data signal, 3-dimensional
- Center_1 (int): beam center in the first data dimension
- Center_2 (int): beam center in the second data dimension
- Dummy (int): pixels in data with dummy value will be replaced
- mask_static (numpy.ndarray): pixels that will be replaced
- mask_reference (numpy.ndarray): mask with the pixels whose intensity should not be used, that means that their symmetric brother should not be caved
- flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed)
- flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving
- **kwargs
Outputs:
- numpy.ndarray: data with replaced pixels (caved) using numpy methods
"""
log_allocated_gpu_memory()
if dataset.ndim == 2:
data_shape = dataset.shape
dataset = dataset[numpy.newaxis, ...]
elif dataset.ndim == 3:
data_shape = dataset.shape[1:]
y_vector, x_vector = numpy.meshgrid(
numpy.arange(data_shape[0]),
numpy.arange(data_shape[1]),
indexing="ij",
sparse=True,
)
x_shifted = 2 * int(Center_1) - x_vector + 1
y_shifted = 2 * int(Center_2) - y_vector + 1
x_shifted_cupy = cupy.asarray(x_shifted)
y_shifted_cupy = cupy.asarray(y_shifted)
x_vector_cupy = cupy.asarray(x_vector)
y_vector_cupy = cupy.asarray(y_vector)
mask_pixels_available_centrosymmetric = kwargs.get(
"mask_pixels_available_centrosymmetric"
)
mask_pixels_available_horizontal = kwargs.get("mask_pixels_available_horizontal")
mask_pixels_available_vertical = kwargs.get("mask_pixels_available_vertical")
if mask_pixels_available_centrosymmetric is None:
mask_pixels_available_centrosymmetric = _mask_caving(
data_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=True,
horizontal_symmetry=True,
)
if flip_caving and mask_pixels_available_horizontal is None:
mask_pixels_available_horizontal = _mask_caving(
data_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=False,
horizontal_symmetry=True,
)
if flip_caving and mask_pixels_available_vertical is None:
mask_pixels_available_vertical = _mask_caving(
data_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=True,
horizontal_symmetry=False,
)
if mask_static is not None:
mask_static_cupy = cupy.asarray(mask_static)
else:
mask_static_cupy = None
mask_pixels_available_centrosymmetric_cupy = cupy.asarray(
mask_pixels_available_centrosymmetric
)
if flip_caving:
mask_pixels_available_horizontal_cupy = cupy.asarray(
mask_pixels_available_horizontal
)
mask_pixels_available_vertical_cupy = cupy.asarray(
mask_pixels_available_vertical
)
else:
mask_pixels_available_horizontal_cupy = None
mask_pixels_available_vertical_cupy = None
dataset_caved = numpy.zeros_like(dataset)
for index_frame, data in enumerate(dataset):
data_cupy = cupy.asarray(data)
mask_to_cave = _mask_to_cave(data_cupy, Dummy, mask_static_cupy)
# mask_complete = mask_to_cave & mask_pixels_available_centrosymmetric_cupy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=True,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_centrosymmetric_cupy,
x_vector=x_shifted_cupy,
y_vector=y_shifted_cupy,
use_cupy=True,
)
if flip_caving and Dummy:
if flip_horizontally_preference:
# First horizontal caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_horizontal_cupy,
x_vector=x_shifted_cupy,
y_vector=y_vector_cupy,
use_cupy=True,
)
# Second vertical caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_vertical_cupy,
x_vector=x_vector_cupy,
y_vector=y_shifted_cupy,
use_cupy=True,
)
else:
# First vertical caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_vertical_cupy,
x_vector=x_vector_cupy,
y_vector=y_shifted_cupy,
use_cupy=True,
)
# Second horizontal caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_horizontal_cupy,
x_vector=x_shifted_cupy,
y_vector=y_vector_cupy,
use_cupy=True,
)
dataset_caved[index_frame] = data_cupy.get()
return dataset_caved
def _process_data_caving_cupy(
data_cupy: cupy.ndarray,
mask_pixels_available_centrosymmetric_cupy: cupy.ndarray,
x_shifted_cupy: cupy.ndarray,
y_shifted_cupy: cupy.ndarray,
Dummy: int = None,
mask_static_cupy: cupy.ndarray = None,
flip_caving: bool = False,
flip_horizontally_preference: bool = True,
mask_pixels_available_horizontal_cupy: cupy.ndarray = None,
mask_pixels_available_vertical_cupy: cupy.ndarray = None,
x_vector_cupy: cupy.ndarray = None,
y_vector_cupy: cupy.ndarray = None,
) -> cupy.ndarray:
"""Performs the caving on a 3-dimensional dataset using cupy methods (frame by frame)
Inputs:
- data_cupy (cupy.ndarray): data signal, 2-dimensional, already on GPU
- mask_pixels_available_centrosymmetric_cupy (cupy.ndarray): mask with the available pixels to do centrosymmetric caving (already on GPU)
- x_shifted_cupy (cupy.ndarray): shifted vector of index along the first dimension of data array, already on GPU
- y_shifted_cupy (cupy.ndarray): shifted vector of index along the second dimension of data array, already on GPU
- Dummy (int): pixels in data with dummy value will be replaced
- mask_static_cupy (cupy.ndarray): mask with the pixels that will be replaced, already on GPU
- flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed)
- flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving
- mask_pixels_available_horizontal_cupy (cupy.ndarray): mask with the available pixels to do horizontal-symmetric caving (already on GPU)
- mask_pixels_available_vertical_cupy (cupy.ndarray): mask with the available pixels to do vertical-symmetric caving (already on GPU)
- x_vector_cupy (cupy.ndarray): vector of index along the first dimension of data array, already on GPU
- y_vector_cupy (cupy.ndarray): vector of index along the second dimension of data array, already on GPU
Outputs:
- cupy.ndarray: data with replaced pixels (caved), still on GPU
"""
mask_to_cave = _mask_to_cave(data_cupy, Dummy, mask_static_cupy)
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=True,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_centrosymmetric_cupy,
x_vector=x_shifted_cupy,
y_vector=y_shifted_cupy,
use_cupy=True,
)
if flip_caving and Dummy:
if flip_horizontally_preference:
# First horizontal caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_horizontal_cupy,
x_vector=x_shifted_cupy,
y_vector=y_vector_cupy,
use_cupy=True,
)
# Second vertical caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_vertical_cupy,
x_vector=x_vector_cupy,
y_vector=y_shifted_cupy,
use_cupy=True,
)
else:
# First vertical caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_vertical_cupy,
x_vector=x_vector_cupy,
y_vector=y_shifted_cupy,
use_cupy=True,
)
# Second horizontal caving
mask_to_cave = data_cupy == Dummy
data_cupy = _process_data_caving(
data=data_cupy,
Dummy=Dummy,
mask_dummy_pixels=False,
pixels_to_cave=mask_to_cave,
pixels_to_substitute=mask_pixels_available_horizontal_cupy,
x_vector=x_shifted_cupy,
y_vector=y_vector_cupy,
use_cupy=True,
)
return data_cupy
[docs]
def process_data_caving(
data: numpy.ndarray,
Center_1: int,
Center_2: int,
Dummy: int = None,
filename_mask_static: str = None,
filename_mask_reference: str = None,
algorithm: str = "numpy",
flip_caving: bool = False,
flip_horizontally_preference: bool = True,
**kwargs,
) -> numpy.ndarray:
"""Performs the caving on a 3-dimensional dataset using numpy methods
Inputs:
- data (numpy.ndarray): data signal, 2 or 3-dimensional
- Center_1 (int): beam center in the first data dimension
- Center_2 (int): beam center in the second data dimension
- Dummy (int): pixels in data with dummy value will be replaced
- filename_mask_static (str): path to the file with the mask whose pixels we want to replace
- filename_mask_reference (str): path to the file with the mask whose pixels should not be used, that means that their symmetric brother should not be caved
- algorithm (str): implementation to perform the caving: numpy or cupy
- flip_caving (bool): if True, adds horizontal-symmetric and vertical-symmetric caving (by default, centro-symmetric caving is always performed)
- flip_horizontally_preference (bool): if flip_caving is activated, horizontal caving is done before vertical caving
- **kwargs
Outputs:
- numpy.ndarray: data with replaced pixels (caved) using numpy methods
"""
if data.ndim == 2:
data_signal_shape = data.shape
data = data[numpy.newaxis, ...]
elif data.ndim == 3:
data_signal_shape = data.shape[1:]
if algorithm not in ALGORITHMS_AVAILABLE:
logger.warning(
f"Algorithm '{algorithm}' is not available. Using '{DEFAULT_ALGORITHM}' instead."
)
algorithm = DEFAULT_ALGORITHM
elif algorithm == "cupy" and not cupy_available():
logger.warning(f"CuPy is not available. Using {DEFAULT_ALGORITHM} instead.")
algorithm = DEFAULT_ALGORITHM
binning = kwargs.get("binning")
use_cupy = True if algorithm == "cupy" else False
# Load static masks
mask_static = None
if filename_mask_static:
mask_static = get_array_mask(
filename_mask=filename_mask_static,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
persistent=True,
)
if mask_static is not None:
mask_static = mask_static.astype(bool)
mask_reference = None
if filename_mask_reference:
mask_reference = get_array_mask(
filename_mask=filename_mask_reference,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=False, # This reference mask won't be cupy
persistent=True,
)
if mask_reference is not None:
mask_reference = mask_reference.astype(bool)
# This is the mask that contains the only pixels which can be substituted, limited by the detector limits and the reference mask (it's data independent)
mask_pixels_available_centrosymmetric = _mask_caving(
data_signal_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=True,
horizontal_symmetry=True,
use_cupy=use_cupy,
)
if flip_caving:
mask_pixels_available_horizontal = _mask_caving(
data_signal_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=False,
horizontal_symmetry=True,
use_cupy=use_cupy,
)
mask_pixels_available_vertical = _mask_caving(
data_signal_shape,
Center_1,
Center_2,
mask_reference,
vertical_symmetry=True,
horizontal_symmetry=False,
use_cupy=use_cupy,
)
else:
mask_pixels_available_horizontal = None
mask_pixels_available_vertical = None
params_caving = {
"dataset": data,
"Center_1": Center_1,
"Center_2": Center_2,
"Dummy": Dummy,
"mask_static": mask_static,
"mask_reference": mask_reference,
"flip_caving": flip_caving,
"flip_horizontally_preference": flip_horizontally_preference,
"mask_pixels_available_centrosymmetric": mask_pixels_available_centrosymmetric,
"mask_pixels_available_horizontal": mask_pixels_available_horizontal,
"mask_pixels_available_vertical": mask_pixels_available_vertical,
}
result = ALGORITHMS_AVAILABLE[algorithm]["algorithm"](
**params_caving,
)
return result
ALGORITHMS_AVAILABLE = {
"numpy": {"algorithm": _process_dataset_caving_numpy, "use_cupy": False},
"cupy": {"algorithm": _process_dataset_caving_cupy, "use_cupy": True},
}
DEFAULT_ALGORITHM = "numpy"