import logging
import os
import time
from functools import lru_cache
from typing import Dict, List, Optional, Union
from silx.io.h5py_utils import open_item as open_item_silx
from silx.utils.retry import RetryTimeoutError
from pyFAI.utils.mathutil import binning as binning_tool
from pyFAI.average import average_dark
import fabio
import h5py
import hdf5plugin # noqa
import numpy
import re
try:
import cupy
except ImportError:
CUPY_AVAILABLE = False
else:
CUPY_AVAILABLE = True
logger = logging.getLogger(__name__)
KEY_PIXEL_SIZE_1 = "PSize_1"
KEY_PIXEL_SIZE_2 = "PSize_2"
KEY_BINNING_1 = "BSize_1"
KEY_BINNING_2 = "BSize_2"
KEY_WAVELENGTH = "WaveLength"
KEY_SAMPLEDETECTOR_DISTANCE = "SampleDistance"
KEY_NORMALIZATION_FACTOR = "NormalizationFactor"
KEY_DETECTOR_MASK_FOLDER = "DetectorMaskFilePath"
KEY_DETECTOR_MASK_FILE = "DetectorMaskFileName"
KEY_BEAMSTOP_MASK_FOLDER = "MaskFilePath"
KEY_BEAMSTOP_MASK_FILE = "MaskFileName"
KEY_POLARIZATION_FACTOR = "polarization_factor"
KEY_POLARIZATION_AXIS = "polarization_axis_offset"
KEY_VARIANCE_FORMULA = "variance_formula"
KEY_WINDOW_ROI_SIZE = "WindowRoiSize"
KEY_DARK_FOLDER = "DarkFilePath"
KEY_DARK_FILE = "DarkFileName"
KEY_FLAT_FOLDER = "FlatfieldFilePath"
KEY_FLAT_FILE = "FlatfieldFileName"
KEY_WINDOW_FOLDER = "WindowFilePath"
KEY_WINDOW_FILE = "WindowFileName"
KEY_DUMMY = "Dummy"
KEY_DELTA_DUMMY = "DDummy"
KEY_CENTER_1 = "Center_1"
KEY_CENTER_2 = "Center_2"
KEY_NPT2_RAD = "npt2_rad"
KEY_NPT2_AZIM = "npt2_azim"
KEY_UNIT = "unit"
KEY_TITLEEXTENSION = "TitleExtension"
KEY_ALGORITHM_NORMALIZATION = "NormAlgorithm"
[docs]
def get_isotime(forceTime: Optional[float] = None) -> str:
"""
Get the current time as an ISO8601 string.
Inputs:
- forceTime (Optional[float], optional): Enforce a given time (current by default). Defaults to None.
Outputs:
- str: The current time as an ISO8601 string.
"""
if forceTime is None:
forceTime = time.time()
localtime = time.localtime(forceTime)
gmtime = time.gmtime(forceTime)
tz_h = localtime.tm_hour - gmtime.tm_hour
tz_m = localtime.tm_min - gmtime.tm_min
return "%s%+03i:%02i" % (time.strftime("%Y-%m-%dT%H:%M:%S", localtime), tz_h, tz_m)
[docs]
def refactor_stream_name_raw(stream_name: str, cut_name: bool = False):
if "roi_counters" in stream_name:
lst = stream_name.split(":")
return f"{lst[0]}_{lst[-1]}"
if not cut_name:
return "_".join(re.split(":|_", stream_name))
stream_name = stream_name.split(":")[-1]
return "_".join(re.split(":|_", stream_name))
[docs]
def refactor_stream_name_interpreted(stream_name: str):
return "_".join(re.split(":|_", stream_name))
[docs]
def match_stream(name: str, streams: dict):
name_raw = refactor_stream_name_raw(stream_name=name, cut_name=False)
name_raw_cut = refactor_stream_name_raw(stream_name=name, cut_name=True)
name_interpreted = refactor_stream_name_interpreted(stream_name=name)
for stream_name, stream_array in streams.items():
if name == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_raw == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_raw_cut == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_interpreted == stream_name:
return (stream_name, stream_array)
for stream_name, stream_array in streams.items():
if name_raw == stream_name.replace(":", "_"):
return (stream_name, stream_array)
return (None, None)
[docs]
def parse_titleextension_template(template: str):
# First, find the curly brackets
pattern = r"\{(.*?)\}"
template_parsed = template
template_info = []
for template_element in re.findall(pattern, template):
# Clear empty spaces
template_element_parsed = template_element.replace(" ", "")
try:
stream_name, format_spec = template_element_parsed.split(":")
template_info.append(
{"stream_name": stream_name, "format_spec": format_spec}
)
template_parsed = template_parsed.replace(
template_element, f"{stream_name}:{format_spec}"
)
except Exception:
logger.error(
f"{template_element} in {template} is not a valid format for TitleExtension"
)
continue
return template_parsed, template_info
[docs]
def get_value_from_file(filename: str, h5path: str, key: str, to_integer: bool = False):
params = {
"filename": filename,
"name": h5path,
"retry_timeout": 0.1,
}
try:
with open_item_silx(**params) as grp:
if grp is None:
return
if key not in grp:
return
value = grp[key][()]
except RetryTimeoutError:
return
if isinstance(value, bytes):
value = value.decode("UTF-8")
try:
value = float(value)
if to_integer:
return int(value)
return value
except Exception:
return value
# def get_flat_filename(**kwargs):
# """Returns the whole filename for the flat field correction from the headers."""
# flat_folder = get_from_headers(key=KEY_FLAT_FOLDER, **kwargs)
# flat_file = get_from_headers(key=KEY_FLAT_FILE, **kwargs)
# if flat_folder is None or flat_file is None:
# return
# flat_filename = os.path.join(flat_folder, flat_file)
# if not os.path.exists(flat_filename):
# return
# return flat_filename
# def get_mask_detector_filename(**kwargs):
# """Returns the whole filename for the detector gaps mask from the headers."""
# mask_folder = get_from_headers(key=KEY_DETECTOR_MASK_FOLDER, **kwargs)
# mask_file = get_from_headers(key=KEY_DETECTOR_MASK_FILE, **kwargs)
# if mask_folder is None or mask_file is None:
# return
# mask_filename = os.path.join(mask_folder, mask_file)
# if not os.path.exists(mask_filename):
# return
# return mask_filename
# def get_mask_beamstop_filename(**kwargs):
# """Returns the whole filename for the beamstop mask from the headers."""
# mask_folder = get_from_headers(key=KEY_BEAMSTOP_MASK_FOLDER, **kwargs)
# mask_file = get_from_headers(key=KEY_BEAMSTOP_MASK_FILE, **kwargs)
# if mask_folder is None or mask_file is None:
# return
# mask_filename = os.path.join(mask_folder, mask_file)
# if not os.path.exists(mask_filename):
# return
# return mask_filename
# def get_dark_filename(**kwargs):
# """Returns the whole filename for the dark current correction from the headers."""
# dark_folder = get_from_headers(key=KEY_DARK_FOLDER, **kwargs)
# dark_file = get_from_headers(key=KEY_DARK_FILE, **kwargs)
# if dark_folder is None or dark_file is None:
# return
# dark_filename = os.path.join(dark_folder, dark_file)
# if not os.path.exists(dark_filename):
# return
# return dark_filename
[docs]
def load_data(
filename: Union[str, List[str]],
binning: tuple = (1, 1),
data_signal_shape: tuple = None,
use_cupy: bool = False,
datatype: str = None,
dark_filter: str = "median",
dark_filter_quantil_lower: int = 0,
dark_filter_quantil_upper: int = 1,
**kwargs,
) -> Optional[numpy.ndarray]:
"""
Load data from a file or a list of files.
Inputs:
- filename (Union[str, List[str]]): The filename or list of filenames.
- binning (tuple): binning of the data signal
- data_signal_shape (tuple): shape of the data array (2-dimensional)
- use_cupy (bool): if True, returns a cupy.asarray
- datatype (str): format of the imported array, if None, datatype is respected
Outputs:
- Optional[numpy.ndarray]: The loaded data or None if the file does not exist.
"""
if filename is None:
return
# Import data
data = None
if isinstance(filename, (tuple, list)):
for _, file in enumerate(filename):
data_ = _load_data(file)
if data_ is None:
continue
if data is None:
data = data_
else:
data += data_
elif isinstance(filename, str):
data = _load_data(filename)
if data is None:
return
# Set datatype
if datatype is not None and data.dtype != datatype:
data = data.astype(datatype, copy=False)
# Dataset filter
if data.ndim > 2:
if dark_filter.startswith("quantil"):
data = average_dark(
data,
center_method=dark_filter,
quantiles=(dark_filter_quantil_lower, dark_filter_quantil_upper),
)
else:
data = average_dark(data, center_method=dark_filter)
# Binning unification
if data_signal_shape and data.shape != data_signal_shape:
binning_additional_data = _get_data_binning(filename=filename)
binning_relative = (
int(binning[0] / binning_additional_data[0]),
int(binning[1] / binning_additional_data[1]),
)
data_binned = binning_tool(data, binning_relative, norm=False)
if data_binned.shape != data_signal_shape:
raise ValueError(
f"Data shape after binning {binning} from {filename} does not match the expected shape: {data_binned.shape} != {data_signal_shape}"
)
data = data_binned
elif data_signal_shape is None and binning == (1, 1):
...
if use_cupy and CUPY_AVAILABLE:
return cupy.asarray(data)
return data
def _load_data(filename: str) -> Optional[numpy.ndarray]:
"""
Load data from a single file.
Inputs:
- filename (str): The filename.
Outputs:
- Optional[numpy.ndarray]: The loaded data or None if the file does not exist.
"""
if not os.path.exists(filename):
return
if filename.endswith(".h5"):
filename += "::/entry_0000/measurement/data"
try:
with fabio.open(filename) as f:
if f.nframes == 1:
return f.data
else:
# TODO, what is the philosophy in case there's a multiframe edf?
logger.error(
f"{filename} is a multiframe .edf file. Getting the first frame..."
)
return f.data[:]
except Exception as e:
logger.error(f"File {filename} could not be open with fabio: {e}")
def _get_data_binning(filename: str):
"""
Load data from a single file.
Inputs:
- filename (str): The filename.
Outputs:
- Optional[numpy.ndarray]: The loaded data or None if the file does not exist.
"""
if not os.path.exists(filename):
return
if filename.endswith(".h5"):
filename += "::/entry_0000/measurement/data"
try:
with fabio.open(filename) as f:
b1 = f.header.get("Bsize_1")
b1_ = f.header.get("BSize_1")
b2 = f.header.get("Bsize_2")
b2_ = f.header.get("BSize_2")
b1 = b1 or b1_
b2 = b2 or b2_
return (int(b1), int(b2))
except Exception as e:
logger.error(f"File {filename} could not be open with fabio: {e}")
[docs]
def get_free_memory(device_id):
"""Retrieves the available memory on a GPU device"""
if not CUPY_AVAILABLE:
logger.warning("Cupy is not available.")
return None
with cupy.cuda.Device(device_id):
free_mem, total_mem = cupy.cuda.runtime.memGetInfo()
return free_mem
[docs]
def get_best_gpu():
"""Decides the best GPU in terms of memory available"""
if not CUPY_AVAILABLE:
logger.warning("Cupy is not available.")
return None
best_device = None
max_free_memory = 0
for device_id in range(cupy.cuda.runtime.getDeviceCount()):
free_memory = get_free_memory(device_id)
if free_memory is not None and free_memory > max_free_memory:
max_free_memory = free_memory
best_device = device_id
return best_device
[docs]
def use_best_gpu():
"""
Set the best available GPU for cupy operations.
"""
best_device = get_best_gpu()
if best_device is not None:
cupy.cuda.Device(best_device).use()
logger.info(f"Using GPU {best_device} with the most free memory.")
else:
logger.warning("No suitable GPU found or cupy is not available.")
[docs]
def serialize_h5py_task(h5py_group: h5py.Group) -> dict:
"""
Recursively convert an h5py Group or File into a nested Python dictionary.
Datasets become numpy arrays or scalars.
"""
result = {}
for key, item in h5py_group.items():
if isinstance(item, h5py.Dataset):
data = item[()]
if isinstance(data, bytes):
data = data.decode()
# Convert 0-d arrays to scalars for readability
if isinstance(data, numpy.ndarray) and data.shape == ():
data = data.item()
result[key] = data
elif isinstance(item, h5py.Group):
result[key] = serialize_h5py_task(item)
else:
result[key] = str(item)
return result
[docs]
def deserialize_h5py_task(h5dict: dict, h5py_parent: h5py.Group) -> None:
for key, value in h5dict.items():
if isinstance(value, dict):
child_group = h5py_parent.create_group(name=key)
child_group.attrs["NX_class"] = "NXcollection"
deserialize_h5py_task(h5dict=value, h5py_parent=child_group)
else:
try:
h5py_parent.create_dataset(
name=key,
data=value,
)
except Exception as e:
print(e, key, value)
continue
[docs]
def get_persistent_array_mask(
filename_mask: str,
data_signal_shape: tuple,
datatype: str = "bool",
binning: tuple = (1, 1),
use_cupy: bool = False,
**kwargs,
):
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_persistent_array(
filename=filename_mask,
mtime=mtime_mask,
datatype=datatype,
data_signal_shape=data_signal_shape,
binning=binning,
use_cupy=use_cupy,
)
[docs]
def get_persistent_array_flat(
filename_flat: str,
data_signal_shape: tuple,
datatype: str = None,
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
use_cupy: bool = False,
**kwargs,
):
if filename_flat and os.path.exists(filename_flat):
mtime_flat = os.path.getmtime(filename_flat)
else:
mtime_flat = None
filename_flat = None
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_persistent_array_flat(
filename_flat=filename_flat,
mtime_flat=mtime_flat,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
mtime_mask=mtime_mask,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_flat(
filename_flat: str,
mtime_flat: float,
**kwargs,
):
if not filename_flat:
return
logger.info(
f"No cache hit. Creating new flat-field array from {filename_flat=}, {mtime_flat=}, {kwargs}"
)
array_flat = load_data(
filename=filename_flat,
**kwargs,
)
if array_flat is None:
return
array_mask = _get_persistent_array(
filename=kwargs.get("filename_mask"),
mtime=kwargs.get("mtime_mask"),
datatype="bool",
data_signal_shape=kwargs.get("data_signal_shape"),
binning=kwargs.get("binning"),
use_cupy=kwargs.get("use_cupy"),
)
dummy = kwargs.get("dummy")
delta_dummy = kwargs.get("delta_dummy")
if array_mask is not None and dummy is not None and delta_dummy is not None:
if delta_dummy == 0:
array_mask |= array_flat == dummy
else:
array_mask |= abs(array_flat - dummy) < delta_dummy
return array_flat
[docs]
def get_persistent_array_dark(
filename_dark: str,
data_signal_shape: tuple,
datatype: str = None,
binning: tuple = (1, 1),
dummy: int = None,
delta_dummy: float = None,
filename_mask: str = None,
dark_filter: str = None,
dark_filter_quantil_lower: float = 0.1,
dark_filter_quantil_upper: float = 0.9,
use_cupy: bool = False,
**kwargs,
):
if filename_dark and os.path.exists(filename_dark):
mtime_dark = os.path.getmtime(filename_dark)
else:
mtime_dark = None
filename_dark = None
if filename_mask and os.path.exists(filename_mask):
mtime_mask = os.path.getmtime(filename_mask)
else:
mtime_mask = None
filename_mask = None
return _get_persistent_array_dark(
filename_dark=filename_dark,
mtime_dark=mtime_dark,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
dummy=dummy,
delta_dummy=delta_dummy,
filename_mask=filename_mask,
mtime_mask=mtime_mask,
dark_filter=dark_filter,
dark_filter_quantil_lower=dark_filter_quantil_lower,
dark_filter_quantil_upper=dark_filter_quantil_upper,
use_cupy=use_cupy,
)
@lru_cache(maxsize=5)
def _get_persistent_array_dark(
filename_dark: str,
mtime_dark: float,
**kwargs,
):
if not filename_dark:
return
logger.info(
f"No cache hit. Creating new dark-current array from {filename_dark=}, {mtime_dark}, {kwargs}"
)
array_dark = load_data(
filename=filename_dark,
**kwargs,
)
if array_dark is None:
return
array_mask = _get_persistent_array(
filename=kwargs.get("filename_mask"),
mtime=kwargs.get("mtime_mask"),
datatype="bool",
data_signal_shape=kwargs.get("data_signal_shape"),
binning=kwargs.get("binning"),
use_cupy=kwargs.get("use_cupy"),
)
dummy = kwargs.get("dummy")
delta_dummy = kwargs.get("delta_dummy")
if array_mask is not None and dummy is not None and delta_dummy is not None:
if delta_dummy == 0:
array_mask |= array_dark == dummy
else:
array_mask |= abs(array_dark - dummy) < delta_dummy
return array_dark
[docs]
def get_persistent_array_window_wagon(
filename_window_wagon: str,
data_signal_shape: tuple,
datatype: str = None,
binning: tuple = (1, 1),
use_cupy: bool = False,
):
if filename_window_wagon and os.path.exists(filename_window_wagon):
window_mtime = os.path.getmtime(filename_window_wagon)
else:
window_mtime = None
filename_window_wagon = None
return _get_persistent_array(
filename=filename_window_wagon,
mtime=window_mtime,
data_signal_shape=data_signal_shape,
datatype=datatype,
binning=binning,
use_cupy=use_cupy,
)
[docs]
def get_dataset_signal_from_processed_file(
filename: str,
index_range: tuple = None,
):
if filename and os.path.exists(filename):
with h5py.File(filename, "r") as f:
nxprocess_path = "entry_0000/PyFAI"
if nxprocess_path not in f:
logger.error(f"{filename} does not contain the {nxprocess_path} format")
return
nxprocess_grp = f[nxprocess_path]
nxdata_name = next(
(name for name in nxprocess_grp if "result_" in name), None
)
if nxdata_name is None:
logger.error(f"There is no result_xxx group in {nxprocess_path}")
return
nxdata_grp = nxprocess_grp[nxdata_name]
if "data" not in nxdata_grp:
logger.error(
f"There is no data dataset in {nxprocess_path}/{nxdata_name}"
)
return
if index_range is not None:
return nxdata_grp["data"][index_range[0] : index_range[-1]]
return nxdata_grp["data"][:]
@lru_cache(maxsize=10)
def _get_persistent_array(
filename: str,
mtime: float,
**kwargs,
):
if not filename:
return
logger.info(
f"No cache hit. Creating new array from {filename=}, {mtime=}, {kwargs}"
)
return load_data(
filename=filename,
**kwargs,
)