import logging
import os
import re
import time
from functools import lru_cache
from typing import Dict, List, Optional, Union
import fabio
import h5py
import hdf5plugin # noqa
import numpy
from pyFAI.average import average_dark
from pyFAI.utils.mathutil import binning as binning_tool
from silx.io.h5py_utils import open_item as open_item_silx
from silx.utils.retry import RetryTimeoutError
from .gpu import cupy, cupy_available
logger = logging.getLogger(__name__)
KEY_PIXEL_SIZE_1 = "PSize_1"
KEY_PIXEL_SIZE_2 = "PSize_2"
KEY_BINNING_1 = "BSize_1"
KEY_BINNING_2 = "BSize_2"
KEY_WAVELENGTH = "WaveLength"
KEY_SAMPLEDETECTOR_DISTANCE = "SampleDistance"
KEY_NORMALIZATION_FACTOR = "NormalizationFactor"
KEY_DETECTOR_MASK_FOLDER = "DetectorMaskFilePath"
KEY_DETECTOR_MASK_FILE = "DetectorMaskFileName"
KEY_BEAMSTOP_MASK_FOLDER = "MaskFilePath"
KEY_BEAMSTOP_MASK_FILE = "MaskFileName"
KEY_POLARIZATION_FACTOR = "polarization_factor"
KEY_POLARIZATION_AXIS = "polarization_axis_offset"
KEY_VARIANCE_FORMULA = "variance_formula"
KEY_WINDOW_ROI_SIZE = "WindowRoiSize"
KEY_DARK_FOLDER = "DarkFilePath"
KEY_DARK_FILE = "DarkFileName"
KEY_FLAT_FOLDER = "FlatfieldFilePath"
KEY_FLAT_FILE = "FlatfieldFileName"
KEY_WINDOW_FOLDER = "WindowFilePath"
KEY_WINDOW_FILE = "WindowFileName"
KEY_DUMMY = "Dummy"
KEY_DELTA_DUMMY = "DDummy"
KEY_CENTER_1 = "Center_1"
KEY_CENTER_2 = "Center_2"
KEY_NPT2_RAD = "npt2_rad"
KEY_NPT2_AZIM = "npt2_azim"
KEY_UNIT = "unit"
KEY_TITLEEXTENSION = "TitleExtension"
KEY_ALGORITHM_NORMALIZATION = "NormAlgorithm"
[docs]
def get_isotime(forceTime: Optional[float] = None) -> str:
"""
Get the current time as an ISO8601 string.
Inputs:
- forceTime (Optional[float], optional): Enforce a given time (current by default). Defaults to None.
Outputs:
- str: The current time as an ISO8601 string.
"""
if forceTime is None:
forceTime = time.time()
localtime = time.localtime(forceTime)
gmtime = time.gmtime(forceTime)
tz_h = localtime.tm_hour - gmtime.tm_hour
tz_m = localtime.tm_min - gmtime.tm_min
return "%s%+03i:%02i" % (time.strftime("%Y-%m-%dT%H:%M:%S", localtime), tz_h, tz_m)
[docs]
def refactor_stream_name_raw(stream_name: str, cut_name: bool = False):
if "roi_counters" in stream_name:
lst = stream_name.split(":")
return f"{lst[0]}_{lst[-1]}"
if not cut_name:
return "_".join(re.split(":|_", stream_name))
stream_name = stream_name.split(":")[-1]
return "_".join(re.split(":|_", stream_name))
[docs]
def refactor_stream_name_interpreted(stream_name: str):
return "_".join(re.split(":|_", stream_name))
[docs]
def match_stream(name: str, streams: dict):
name_raw = refactor_stream_name_raw(stream_name=name, cut_name=False)
name_raw_cut = refactor_stream_name_raw(stream_name=name, cut_name=True)
name_interpreted = refactor_stream_name_interpreted(stream_name=name)
for stream_name, stream_array in streams.items():
if name == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_raw == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_raw_cut == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_interpreted == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_raw == stream_name.replace(":", "_"):
return (stream_name, stream_array)
return (None, None)
[docs]
def parse_titleextension_template(template: str):
# First, find the curly brackets
pattern = r"\{(.*?)\}"
template_parsed = template
template_info = []
for template_element in re.findall(pattern, template):
# Clear empty spaces
template_element_parsed = template_element.replace(" ", "")
try:
stream_name, format_spec = template_element_parsed.split(":")
template_info.append(
{"stream_name": stream_name, "format_spec": format_spec}
)
template_parsed = template_parsed.replace(
template_element, f"{stream_name}:{format_spec}"
)
except Exception:
logger.error(
f"{template_element} in {template} is not a valid format for TitleExtension"
)
continue
return template_parsed, template_info
[docs]
def get_value_from_file(filename: str, h5path: str, key: str, to_integer: bool = False):
params = {
"filename": filename,
"name": h5path,
"retry_timeout": 0.1,
}
try:
with open_item_silx(**params) as grp:
if grp is None:
return
if key not in grp:
return
value = grp[key][()]
except RetryTimeoutError:
return
if isinstance(value, bytes):
value = value.decode("UTF-8")
try:
value = float(value)
if to_integer:
return int(value)
return value
except Exception:
return value
[docs]
def load_data(
filename: Union[str, List[str]],
binning: tuple = (1, 1),
data_signal_shape: tuple = None,
use_cupy: bool = False,
datatype: str = None,
dark_filter: str = "median",
dark_filter_quantil_lower: int = 0,
dark_filter_quantil_upper: int = 1,
**kwargs,
) -> Optional[numpy.ndarray]:
"""
Load data from a file or a list of files.
Inputs:
- filename (Union[str, List[str]]): The filename or list of filenames.
- binning (tuple): binning of the data signal
- data_signal_shape (tuple): shape of the data array (2-dimensional)
- use_cupy (bool): if True, returns a cupy.asarray
- datatype (str): format of the imported array, if None, datatype is respected
Outputs:
- Optional[numpy.ndarray]: The loaded data or None if the file does not exist.
"""
if filename is None:
return
# Import data
data = None
if isinstance(filename, (tuple, list)):
for _, file in enumerate(filename):
data_ = _load_data(file)
if data_ is None:
continue
if data is None:
data = data_
else:
data += data_
elif isinstance(filename, str):
data = _load_data(filename)
if data is None:
return
# Set datatype
if datatype is not None and data.dtype != datatype:
data = data.astype(datatype, copy=False)
# Dataset filter
if data.ndim > 2:
if dark_filter.startswith("quantil"):
data = average_dark(
data,
center_method=dark_filter,
quantiles=(dark_filter_quantil_lower, dark_filter_quantil_upper),
)
else:
data = average_dark(data, center_method=dark_filter)
# Binning unification
if data_signal_shape and data.shape != data_signal_shape:
binning_additional_data = _get_data_binning(filename=filename)
binning_relative = (
int(binning[0] / binning_additional_data[0]),
int(binning[1] / binning_additional_data[1]),
)
data_binned = binning_tool(data, binning_relative, norm=False)
if data_binned.shape != data_signal_shape:
raise ValueError(
f"Data shape after binning {binning} from {filename} does not match the expected shape: {data_binned.shape} != {data_signal_shape}"
)
data = data_binned
elif data_signal_shape is None and binning == (1, 1):
...
if use_cupy and cupy_available():
return cupy.asarray(data)
return data
def _load_data(filename: str) -> Optional[numpy.ndarray]:
"""
Load data from a single file.
Inputs:
- filename (str): The filename.
Outputs:
- Optional[numpy.ndarray]: The loaded data or None if the file does not exist.
"""
if not os.path.exists(filename):
return
data = get_data_with_fabio(filename=filename)
if data is None:
data = get_data_from_h5py_defaults(filename=filename)
return data
[docs]
def get_data_with_fabio(filename: str) -> Optional[numpy.ndarray]:
if filename.endswith(".h5"):
filename += "::/entry_0000/measurement/data"
try:
with fabio.open(filename) as f:
if f.nframes == 1:
return f.data
else:
logger.warning(
f"{filename} is a multiframe .edf file. Getting the first frame..."
)
return f.data[:]
except Exception as e:
logger.warning(f"File {filename} could not be open with fabio: {e}")
return
[docs]
def get_data_from_h5py_defaults(filename: str) -> Optional[numpy.ndarray]:
try:
with h5py.File(filename, "r") as h5root:
return _get_data_from_h5py_defaults(h5group=h5root)
except Exception as e:
logger.error(f"File {filename} could not be open with h5py + defaults: {e}")
return
def _get_data_from_h5py_defaults(h5group: h5py.Group) -> Optional[numpy.ndarray]:
if isinstance(h5group, h5py.Dataset):
return h5group[()]
elif isinstance(h5group, h5py.Group):
signal = h5group.attrs.get("signal")
if signal is not None:
h5item = h5group[signal.decode()]
return _get_data_from_h5py_defaults(h5group=h5item)
default = h5group.attrs.get("default")
if default is not None:
h5item = h5group[default.decode()]
return _get_data_from_h5py_defaults(h5group=h5item)
def _get_data_binning(filename: str):
"""
Load data from a single file.
Inputs:
- filename (str): The filename.
Outputs:
- Optional[numpy.ndarray]: The loaded data or None if the file does not exist.
"""
if not os.path.exists(filename):
return
if filename.endswith(".h5"):
filename += "::/entry_0000/measurement/data"
try:
with fabio.open(filename) as f:
b1 = f.header.get("Bsize_1")
b1_ = f.header.get("BSize_1")
b2 = f.header.get("Bsize_2")
b2_ = f.header.get("BSize_2")
b1 = b1 or b1_
b2 = b2 or b2_
return (int(b1), int(b2))
except Exception as e:
logger.error(f"File {filename} could not be open with fabio: {e}")
[docs]
def serialize_h5py_task(h5py_group: h5py.Group) -> dict:
"""
Recursively convert an h5py Group or File into a nested Python dictionary.
Datasets become numpy arrays or scalars.
"""
result = {}
for key, item in h5py_group.items():
if isinstance(item, h5py.Dataset):
data = item[()]
if isinstance(data, bytes):
data = data.decode()
# Convert 0-d arrays to scalars for readability
if isinstance(data, numpy.ndarray) and data.shape == ():
data = data.item()
result[key] = data
elif isinstance(item, h5py.Group):
result[key] = serialize_h5py_task(item)
else:
result[key] = str(item)
return result
[docs]
def deserialize_h5py_task(h5dict: dict, h5py_parent: h5py.Group) -> None:
for key, value in h5dict.items():
if isinstance(value, dict):
child_group = h5py_parent.create_group(name=key)
child_group.attrs["NX_class"] = "NXcollection"
deserialize_h5py_task(h5dict=value, h5py_parent=child_group)
else:
try:
h5py_parent.create_dataset(
name=key,
data=value,
)
except Exception as e:
print(e, key, value)
continue
[docs]
def get_array_mask(
filename_mask: str,
data_signal_shape: tuple,
datatype: str = "bool",
binning: tuple = (1, 1),
use_cupy: bool = False,
persistent: bool = True,
) -> Optional[Union[numpy.ndarray]]:
"""
Generate the array to mask (gaps or beamstop normally)
params:
- filename_mask (str): the filename of the mask
- data_signal_shape (tuple): the shape of the data signal array, used for binning unification
- datatype (str): format of the imported array, if None, datatype is respected
- binning (tuple): binning of the data signal, used for binning unification
- use_cupy (bool): if True, returns a cupy.asarray
- persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes
returns:
- Optional[Union[numpy.ndarray, cupy.ndarray]]: the array mask or None if the file does not exist
"""
if not persistent:
return _get_array_mask(
filename_mask=filename_mask,
datatype=datatype,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
)
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_persistent_array_mask(
filename_mask=filename_mask,
mtime_mask=mtime_mask,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_mask(
filename_mask: str,
mtime_mask: float,
data_signal_shape: tuple,
datatype: str = "bool",
binning: tuple = (1, 1),
use_cupy: bool = False,
):
logger.info(
f"No cache hit. Creating new mask array from {filename_mask=}, {mtime_mask=}, \
{data_signal_shape=}, {datatype=}, {binning=}, {use_cupy=}"
)
return _get_array_mask(
filename_mask=filename_mask,
datatype=datatype,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
)
def _get_array_mask(
filename_mask: str,
datatype: str = "bool",
data_signal_shape: tuple = None,
binning: tuple = (1, 1),
use_cupy: bool = False,
):
return load_data(
filename=filename_mask,
datatype=datatype,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
)
[docs]
def get_array_flat(
filename_flat: str,
data_signal_shape: tuple,
datatype: str = "float32",
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
use_cupy: bool = False,
persistent: bool = True,
):
"""
Generate the array of the flat field correction, eventually applying the mask and dummy pixel filtering
params:
- filename_flat (str): the filename of the flat field correction
- data_signal_shape (tuple): the shape of the data signal array, used for binning unification
- datatype (str): format of the imported array, if None, datatype is respected
- binning (tuple): binning of the data signal, used for binning unification
- dummy (int): if not None, the value of the dummy pixels to filter
- delta_dummy (float): if dummy is not None, the tolerance around the dummy value
- filename_mask (str): the filename of the mask
- use_cupy (bool): if True, returns a cupy.asarray
- persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes
returns:
- Optional[Union[numpy.ndarray, cupy.ndarray]]: the array flat-field correction or None if the file does not exist
"""
if not persistent:
return _get_array_flat(
filename_flat=filename_flat,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
use_cupy=use_cupy,
)
if filename_flat and os.path.exists(filename_flat):
mtime_flat = os.path.getmtime(filename_flat)
else:
mtime_flat = None
filename_flat = None
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_persistent_array_flat(
filename_flat=filename_flat,
mtime_flat=mtime_flat,
filename_mask=filename_mask,
mtime_mask=mtime_mask,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_flat(
filename_flat: str,
mtime_flat: float,
data_signal_shape: tuple,
datatype: str = "float32",
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
mtime_mask: float = None,
use_cupy: bool = False,
):
logger.info(
f"No cache hit. Creating new flat-field array from {filename_flat=}, {mtime_flat=}, \
{data_signal_shape=}, {datatype=}, {binning=}, {dummy=}, {delta_dummy=}, {filename_mask=}, {mtime_mask=}, {use_cupy=}"
)
return _get_array_flat(
filename_flat=filename_flat,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
use_cupy=use_cupy,
)
def _get_array_flat(
filename_flat: str,
data_signal_shape: tuple,
datatype: str = "float32",
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
use_cupy: bool = False,
):
array_flat = load_data(
filename=filename_flat,
datatype=datatype,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
)
if array_flat is None:
return
array_mask = get_array_mask(
filename_mask=filename_mask,
data_signal_shape=data_signal_shape,
datatype="bool",
binning=binning,
use_cupy=use_cupy,
persistent=True,
)
if array_mask is not None and dummy is not None and delta_dummy is not None:
if delta_dummy == 0:
array_mask |= array_flat == dummy
else:
array_mask |= abs(array_flat - dummy) < delta_dummy
return array_flat
[docs]
def get_array_dark(
filename_dark: str,
data_signal_shape: tuple,
datatype: str = None,
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
dark_filter: str = None,
dark_filter_quantil_lower: float = 0.1,
dark_filter_quantil_upper: float = 0.9,
use_cupy: bool = False,
persistent: bool = True,
) -> Optional[numpy.ndarray]:
"""
Generate the array of the dark current correction, eventually applying the mask and dummy pixel filtering and dark dataset filtering
params:
- filename_dark (str): the filename of the dark current correction
- data_signal_shape (tuple): the shape of the data signal array, used for binning unification
- datatype (str): format of the imported array, if None, datatype is respected
- binning (tuple): binning of the data signal, used for binning unification
- dummy (int): if not None, the value of the dummy pixels to filter
- delta_dummy (float): if dummy is not None, the tolerance around the dummy value
- filename_mask (str): the filename of the mask
- dark_filter (str): if not None, the method to use for filtering a stack of darks into a single dark (e.g. "median" or "quantil")
- dark_filter_quantil_lower (float): if dark_filter is "quantil", the lower quantile to use for filtering
- dark_filter_quantil_upper (float): if dark_filter is "quantil", the upper quantile to use for filtering
- use_cupy (bool): if True, returns a cupy.asarray
- persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes
returns:
- Optional[Union[numpy.ndarray, cupy.ndarray]]: the array dark current correction or None if the file does not exist
"""
if not persistent:
return _get_array_dark(
filename_dark=filename_dark,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
dark_filter=dark_filter,
dark_filter_quantil_lower=dark_filter_quantil_lower,
dark_filter_quantil_upper=dark_filter_quantil_upper,
use_cupy=use_cupy,
)
if filename_dark and os.path.exists(filename_dark):
mtime_dark = os.path.getmtime(filename_dark)
else:
mtime_dark = None
filename_dark = None
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_persistent_array_dark(
filename_dark=filename_dark,
mtime_dark=mtime_dark,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
mtime_mask=mtime_mask,
dark_filter=dark_filter,
dark_filter_quantil_lower=dark_filter_quantil_lower,
dark_filter_quantil_upper=dark_filter_quantil_upper,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_dark(
filename_dark: str,
mtime_dark: float,
data_signal_shape: tuple,
datatype: str = None,
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
mtime_mask: float = None,
dark_filter: str = None,
dark_filter_quantil_lower: float = 0.1,
dark_filter_quantil_upper: float = 0.9,
use_cupy: bool = False,
):
logger.info(
f"No cache hit. Creating new dark-current array from {filename_dark=}, {mtime_dark=}, {data_signal_shape=}, \
{datatype=}, {binning=}, {dummy=}, {delta_dummy=}, {filename_mask=}, {mtime_mask=}, {dark_filter=},\
{dark_filter_quantil_lower=}, {dark_filter_quantil_upper=}, {use_cupy=}"
)
if filename_dark and os.path.exists(filename_dark):
mtime_dark = os.path.getmtime(filename_dark)
else:
mtime_dark = None
filename_dark = None
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_array_dark(
filename_dark=filename_dark,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
dark_filter=dark_filter,
dark_filter_quantil_lower=dark_filter_quantil_lower,
dark_filter_quantil_upper=dark_filter_quantil_upper,
use_cupy=use_cupy,
)
def _get_array_dark(
filename_dark: str,
data_signal_shape: tuple,
datatype: str = None,
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
dark_filter: str = None,
dark_filter_quantil_lower: float = 0.1,
dark_filter_quantil_upper: float = 0.9,
use_cupy: bool = False,
) -> Optional[numpy.ndarray]:
if not filename_dark:
return
array_dark = load_data(
filename=filename_dark,
datatype=datatype,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
dark_filter=dark_filter,
dark_filter_quantil_lower=dark_filter_quantil_lower,
dark_filter_quantil_upper=dark_filter_quantil_upper,
)
if array_dark is None:
return
array_mask = get_array_mask(
filename_mask=filename_mask,
data_signal_shape=data_signal_shape,
datatype="bool",
binning=binning,
use_cupy=use_cupy,
persistent=True,
)
if array_mask is not None and dummy is not None and delta_dummy is not None:
if delta_dummy == 0:
array_mask |= array_dark == dummy
else:
array_mask |= abs(array_dark - dummy) < delta_dummy
return array_dark
[docs]
def get_dataset_signal_from_processed_file(
filename: str,
index_range: tuple = None,
):
if filename and os.path.exists(filename):
with h5py.File(filename, "r") as f:
nxprocess_path = "entry_0000/PyFAI"
if nxprocess_path not in f:
logger.error(f"{filename} does not contain the {nxprocess_path} format")
return
nxprocess_grp = f[nxprocess_path]
nxdata_name = next(
(name for name in nxprocess_grp if "result_" in name), None
)
if nxdata_name is None:
logger.error(f"There is no result_xxx group in {nxprocess_path}")
return
nxdata_grp = nxprocess_grp[nxdata_name]
if "data" not in nxdata_grp:
logger.error(
f"There is no data dataset in {nxprocess_path}/{nxdata_name}"
)
return
if index_range is not None:
return nxdata_grp["data"][index_range[0] : index_range[-1]]
return nxdata_grp["data"][:]
@lru_cache(maxsize=10)
def _get_persistent_array(
filename: str,
mtime: float,
**kwargs,
):
if not filename:
return
logger.info(
f"No cache hit. Creating new array from {filename=}, {mtime=}, {kwargs}"
)
return load_data(
filename=filename,
**kwargs,
)