Source code for ewoksid02.utils.io

import logging
import os
import re
import time
from functools import lru_cache
from typing import Dict, List, Optional, Union

import fabio
import h5py
import hdf5plugin  # noqa
import numpy
from pyFAI.average import average_dark
from pyFAI.utils.mathutil import binning as binning_tool
from silx.io.h5py_utils import open_item as open_item_silx
from silx.utils.retry import RetryTimeoutError

from .gpu import cupy, cupy_available

logger = logging.getLogger(__name__)

KEY_PIXEL_SIZE_1 = "PSize_1"
KEY_PIXEL_SIZE_2 = "PSize_2"
KEY_BINNING_1 = "BSize_1"
KEY_BINNING_2 = "BSize_2"
KEY_WAVELENGTH = "WaveLength"
KEY_SAMPLEDETECTOR_DISTANCE = "SampleDistance"
KEY_NORMALIZATION_FACTOR = "NormalizationFactor"
KEY_DETECTOR_MASK_FOLDER = "DetectorMaskFilePath"
KEY_DETECTOR_MASK_FILE = "DetectorMaskFileName"
KEY_BEAMSTOP_MASK_FOLDER = "MaskFilePath"
KEY_BEAMSTOP_MASK_FILE = "MaskFileName"
KEY_POLARIZATION_FACTOR = "polarization_factor"
KEY_POLARIZATION_AXIS = "polarization_axis_offset"
KEY_VARIANCE_FORMULA = "variance_formula"
KEY_WINDOW_ROI_SIZE = "WindowRoiSize"
KEY_DARK_FOLDER = "DarkFilePath"
KEY_DARK_FILE = "DarkFileName"
KEY_FLAT_FOLDER = "FlatfieldFilePath"
KEY_FLAT_FILE = "FlatfieldFileName"
KEY_WINDOW_FOLDER = "WindowFilePath"
KEY_WINDOW_FILE = "WindowFileName"
KEY_DUMMY = "Dummy"
KEY_DELTA_DUMMY = "DDummy"
KEY_CENTER_1 = "Center_1"
KEY_CENTER_2 = "Center_2"
KEY_NPT2_RAD = "npt2_rad"
KEY_NPT2_AZIM = "npt2_azim"
KEY_UNIT = "unit"
KEY_TITLEEXTENSION = "TitleExtension"
KEY_ALGORITHM_NORMALIZATION = "NormAlgorithm"


[docs] def get_isotime(forceTime: Optional[float] = None) -> str: """ Get the current time as an ISO8601 string. Inputs: - forceTime (Optional[float], optional): Enforce a given time (current by default). Defaults to None. Outputs: - str: The current time as an ISO8601 string. """ if forceTime is None: forceTime = time.time() localtime = time.localtime(forceTime) gmtime = time.gmtime(forceTime) tz_h = localtime.tm_hour - gmtime.tm_hour tz_m = localtime.tm_min - gmtime.tm_min return "%s%+03i:%02i" % (time.strftime("%Y-%m-%dT%H:%M:%S", localtime), tz_h, tz_m)
[docs] def refactor_stream_name_raw(stream_name: str, cut_name: bool = False): if "roi_counters" in stream_name: lst = stream_name.split(":") return f"{lst[0]}_{lst[-1]}" if not cut_name: return "_".join(re.split(":|_", stream_name)) stream_name = stream_name.split(":")[-1] return "_".join(re.split(":|_", stream_name))
[docs] def refactor_stream_name_interpreted(stream_name: str): return "_".join(re.split(":|_", stream_name))
[docs] def match_stream(name: str, streams: dict): name_raw = refactor_stream_name_raw(stream_name=name, cut_name=False) name_raw_cut = refactor_stream_name_raw(stream_name=name, cut_name=True) name_interpreted = refactor_stream_name_interpreted(stream_name=name) for stream_name, stream_array in streams.items(): if name == stream_name: return (stream_name, stream_array) for stream_name, stream_array in streams.items(): if name_raw == stream_name: return (stream_name, stream_array) for stream_name, stream_array in streams.items(): if name_raw_cut == stream_name: return (stream_name, stream_array) for stream_name, stream_array in streams.items(): if name_interpreted == stream_name: return (stream_name, stream_array) for stream_name, stream_array in streams.items(): if name_raw == stream_name.replace(":", "_"): return (stream_name, stream_array) return (None, None)
[docs] def parse_titleextension_template(template: str): # First, find the curly brackets pattern = r"\{(.*?)\}" template_parsed = template template_info = [] for template_element in re.findall(pattern, template): # Clear empty spaces template_element_parsed = template_element.replace(" ", "") try: stream_name, format_spec = template_element_parsed.split(":") template_info.append( {"stream_name": stream_name, "format_spec": format_spec} ) template_parsed = template_parsed.replace( template_element, f"{stream_name}:{format_spec}" ) except Exception: logger.error( f"{template_element} in {template} is not a valid format for TitleExtension" ) continue return template_parsed, template_info
[docs] def get_from_headers( key: str, headers: Optional[Dict[str, Union[str, float]]] = None, metadata_file_group: Optional[h5py.Group] = None, to_integer: bool = False, ) -> Optional[Union[str, float, int]]: """ Retrieve a header value from the header object (for online processing) or from an HDF5 group (for offline processing). Inputs: - key (str): The key to retrieve. - headers (Optional[Dict[str, Union[str, float]]], optional): The header object. Defaults to None. - metadata_file_group (Optional[h5py.Group], optional): The HDF5 group. Defaults to None. - to_integer (bool, optional): Whether to convert the value to an integer. Defaults to False. Outputs: - Optional[Union[str, float, int]]: The retrieved value or None if not found. """ value = None if headers: # Retrieve directly from the header object if key not in headers: logger.warning(f"Key {key} is not in headers") return value = headers[key] elif metadata_file_group: # Retrieve from a group in the metadata file if key not in metadata_file_group: logger.warning(f"Key {key} not in {metadata_file_group}") return value = metadata_file_group[key][()] else: return if isinstance(value, bytes): value = value.decode("UTF-8") try: value = float(value) if to_integer: return int(value) return value except Exception: return value
[docs] def get_value_from_file(filename: str, h5path: str, key: str, to_integer: bool = False): params = { "filename": filename, "name": h5path, "retry_timeout": 0.1, } try: with open_item_silx(**params) as grp: if grp is None: return if key not in grp: return value = grp[key][()] except RetryTimeoutError: return if isinstance(value, bytes): value = value.decode("UTF-8") try: value = float(value) if to_integer: return int(value) return value except Exception: return value
[docs] def load_data( filename: Union[str, List[str]], binning: tuple = (1, 1), data_signal_shape: tuple = None, use_cupy: bool = False, datatype: str = None, dark_filter: str = "median", dark_filter_quantil_lower: int = 0, dark_filter_quantil_upper: int = 1, **kwargs, ) -> Optional[numpy.ndarray]: """ Load data from a file or a list of files. Inputs: - filename (Union[str, List[str]]): The filename or list of filenames. - binning (tuple): binning of the data signal - data_signal_shape (tuple): shape of the data array (2-dimensional) - use_cupy (bool): if True, returns a cupy.asarray - datatype (str): format of the imported array, if None, datatype is respected Outputs: - Optional[numpy.ndarray]: The loaded data or None if the file does not exist. """ if filename is None: return # Import data data = None if isinstance(filename, (tuple, list)): for _, file in enumerate(filename): data_ = _load_data(file) if data_ is None: continue if data is None: data = data_ else: data += data_ elif isinstance(filename, str): data = _load_data(filename) if data is None: return # Set datatype if datatype is not None and data.dtype != datatype: data = data.astype(datatype, copy=False) # Dataset filter if data.ndim > 2: if dark_filter.startswith("quantil"): data = average_dark( data, center_method=dark_filter, quantiles=(dark_filter_quantil_lower, dark_filter_quantil_upper), ) else: data = average_dark(data, center_method=dark_filter) # Binning unification if data_signal_shape and data.shape != data_signal_shape: binning_additional_data = _get_data_binning(filename=filename) binning_relative = ( int(binning[0] / binning_additional_data[0]), int(binning[1] / binning_additional_data[1]), ) data_binned = binning_tool(data, binning_relative, norm=False) if data_binned.shape != data_signal_shape: raise ValueError( f"Data shape after binning {binning} from {filename} does not match the expected shape: {data_binned.shape} != {data_signal_shape}" ) data = data_binned elif data_signal_shape is None and binning == (1, 1): ... if use_cupy and cupy_available(): return cupy.asarray(data) return data
def _load_data(filename: str) -> Optional[numpy.ndarray]: """ Load data from a single file. Inputs: - filename (str): The filename. Outputs: - Optional[numpy.ndarray]: The loaded data or None if the file does not exist. """ if not os.path.exists(filename): return data = get_data_with_fabio(filename=filename) if data is None: data = get_data_from_h5py_defaults(filename=filename) return data
[docs] def get_data_with_fabio(filename: str) -> Optional[numpy.ndarray]: if filename.endswith(".h5"): filename += "::/entry_0000/measurement/data" try: with fabio.open(filename) as f: if f.nframes == 1: return f.data else: logger.warning( f"{filename} is a multiframe .edf file. Getting the first frame..." ) return f.data[:] except Exception as e: logger.warning(f"File {filename} could not be open with fabio: {e}") return
[docs] def get_data_from_h5py_defaults(filename: str) -> Optional[numpy.ndarray]: try: with h5py.File(filename, "r") as h5root: return _get_data_from_h5py_defaults(h5group=h5root) except Exception as e: logger.error(f"File {filename} could not be open with h5py + defaults: {e}") return
def _get_data_from_h5py_defaults(h5group: h5py.Group) -> Optional[numpy.ndarray]: if isinstance(h5group, h5py.Dataset): return h5group[()] elif isinstance(h5group, h5py.Group): signal = h5group.attrs.get("signal") if signal is not None: h5item = h5group[signal.decode()] return _get_data_from_h5py_defaults(h5group=h5item) default = h5group.attrs.get("default") if default is not None: h5item = h5group[default.decode()] return _get_data_from_h5py_defaults(h5group=h5item) def _get_data_binning(filename: str): """ Load data from a single file. Inputs: - filename (str): The filename. Outputs: - Optional[numpy.ndarray]: The loaded data or None if the file does not exist. """ if not os.path.exists(filename): return if filename.endswith(".h5"): filename += "::/entry_0000/measurement/data" try: with fabio.open(filename) as f: b1 = f.header.get("Bsize_1") b1_ = f.header.get("BSize_1") b2 = f.header.get("Bsize_2") b2_ = f.header.get("BSize_2") b1 = b1 or b1_ b2 = b2 or b2_ return (int(b1), int(b2)) except Exception as e: logger.error(f"File {filename} could not be open with fabio: {e}")
[docs] def get_headers( headers: Optional[Dict[str, Union[str, float]]] = None, metadata_file_group: Optional[h5py.Group] = None, ) -> Optional[Dict[str, Union[str, float]]]: """ Retrieve headers from a dictionary or an HDF5 group. Inputs: - headers (Optional[Dict[str, Union[str, float]]], optional): The header dictionary. Defaults to None. - metadata_file_group (Optional[h5py.Group], optional): The HDF5 group. Defaults to None. Outputs: - Optional[Dict[str, Union[str, float]]]: The headers or None if not found. """ if headers: return headers elif metadata_file_group: headers = {} for key in metadata_file_group: value = metadata_file_group[key][()] if isinstance(value, bytes): value = value.decode("UTF-8") headers[key] = value return headers else: return {}
[docs] def serialize_h5py_task(h5py_group: h5py.Group) -> dict: """ Recursively convert an h5py Group or File into a nested Python dictionary. Datasets become numpy arrays or scalars. """ result = {} for key, item in h5py_group.items(): if isinstance(item, h5py.Dataset): data = item[()] if isinstance(data, bytes): data = data.decode() # Convert 0-d arrays to scalars for readability if isinstance(data, numpy.ndarray) and data.shape == (): data = data.item() result[key] = data elif isinstance(item, h5py.Group): result[key] = serialize_h5py_task(item) else: result[key] = str(item) return result
[docs] def deserialize_h5py_task(h5dict: dict, h5py_parent: h5py.Group) -> None: for key, value in h5dict.items(): if isinstance(value, dict): child_group = h5py_parent.create_group(name=key) child_group.attrs["NX_class"] = "NXcollection" deserialize_h5py_task(h5dict=value, h5py_parent=child_group) else: try: h5py_parent.create_dataset( name=key, data=value, ) except Exception as e: print(e, key, value) continue
[docs] def get_array_mask( filename_mask: str, data_signal_shape: tuple, datatype: str = "bool", binning: tuple = (1, 1), use_cupy: bool = False, persistent: bool = True, ) -> Optional[Union[numpy.ndarray]]: """ Generate the array to mask (gaps or beamstop normally) params: - filename_mask (str): the filename of the mask - data_signal_shape (tuple): the shape of the data signal array, used for binning unification - datatype (str): format of the imported array, if None, datatype is respected - binning (tuple): binning of the data signal, used for binning unification - use_cupy (bool): if True, returns a cupy.asarray - persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes returns: - Optional[Union[numpy.ndarray, cupy.ndarray]]: the array mask or None if the file does not exist """ if not persistent: return _get_array_mask( filename_mask=filename_mask, datatype=datatype, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, ) if filename_mask and os.path.exists(filename_mask): mtime_mask = os.path.getmtime(filename_mask) else: mtime_mask = None filename_mask = None return _get_persistent_array_mask( filename_mask=filename_mask, mtime_mask=mtime_mask, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, use_cupy=use_cupy, )
@lru_cache(maxsize=5) def _get_persistent_array_mask( filename_mask: str, mtime_mask: float, data_signal_shape: tuple, datatype: str = "bool", binning: tuple = (1, 1), use_cupy: bool = False, ): logger.info( f"No cache hit. Creating new mask array from {filename_mask=}, {mtime_mask=}, \ {data_signal_shape=}, {datatype=}, {binning=}, {use_cupy=}" ) return _get_array_mask( filename_mask=filename_mask, datatype=datatype, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, ) def _get_array_mask( filename_mask: str, datatype: str = "bool", data_signal_shape: tuple = None, binning: tuple = (1, 1), use_cupy: bool = False, ): return load_data( filename=filename_mask, datatype=datatype, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, )
[docs] def get_array_flat( filename_flat: str, data_signal_shape: tuple, datatype: str = "float32", binning: tuple = (1, 1), dummy: int = None, delta_dummy: float = None, filename_mask: str = None, use_cupy: bool = False, persistent: bool = True, ): """ Generate the array of the flat field correction, eventually applying the mask and dummy pixel filtering params: - filename_flat (str): the filename of the flat field correction - data_signal_shape (tuple): the shape of the data signal array, used for binning unification - datatype (str): format of the imported array, if None, datatype is respected - binning (tuple): binning of the data signal, used for binning unification - dummy (int): if not None, the value of the dummy pixels to filter - delta_dummy (float): if dummy is not None, the tolerance around the dummy value - filename_mask (str): the filename of the mask - use_cupy (bool): if True, returns a cupy.asarray - persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes returns: - Optional[Union[numpy.ndarray, cupy.ndarray]]: the array flat-field correction or None if the file does not exist """ if not persistent: return _get_array_flat( filename_flat=filename_flat, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, filename_mask=filename_mask, use_cupy=use_cupy, ) if filename_flat and os.path.exists(filename_flat): mtime_flat = os.path.getmtime(filename_flat) else: mtime_flat = None filename_flat = None if filename_mask and os.path.exists(filename_mask): mtime_mask = os.path.getmtime(filename_mask) else: mtime_mask = None filename_mask = None return _get_persistent_array_flat( filename_flat=filename_flat, mtime_flat=mtime_flat, filename_mask=filename_mask, mtime_mask=mtime_mask, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, use_cupy=use_cupy, )
@lru_cache(maxsize=5) def _get_persistent_array_flat( filename_flat: str, mtime_flat: float, data_signal_shape: tuple, datatype: str = "float32", binning: tuple = (1, 1), dummy: int = None, delta_dummy: float = None, filename_mask: str = None, mtime_mask: float = None, use_cupy: bool = False, ): logger.info( f"No cache hit. Creating new flat-field array from {filename_flat=}, {mtime_flat=}, \ {data_signal_shape=}, {datatype=}, {binning=}, {dummy=}, {delta_dummy=}, {filename_mask=}, {mtime_mask=}, {use_cupy=}" ) return _get_array_flat( filename_flat=filename_flat, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, filename_mask=filename_mask, use_cupy=use_cupy, ) def _get_array_flat( filename_flat: str, data_signal_shape: tuple, datatype: str = "float32", binning: tuple = (1, 1), dummy: int = None, delta_dummy: float = None, filename_mask: str = None, use_cupy: bool = False, ): array_flat = load_data( filename=filename_flat, datatype=datatype, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, ) if array_flat is None: return array_mask = get_array_mask( filename_mask=filename_mask, data_signal_shape=data_signal_shape, datatype="bool", binning=binning, use_cupy=use_cupy, persistent=True, ) if array_mask is not None and dummy is not None and delta_dummy is not None: if delta_dummy == 0: array_mask |= array_flat == dummy else: array_mask |= abs(array_flat - dummy) < delta_dummy return array_flat
[docs] def get_array_dark( filename_dark: str, data_signal_shape: tuple, datatype: str = None, binning: tuple = (1, 1), dummy: int = None, delta_dummy: float = None, filename_mask: str = None, dark_filter: str = None, dark_filter_quantil_lower: float = 0.1, dark_filter_quantil_upper: float = 0.9, use_cupy: bool = False, persistent: bool = True, ) -> Optional[numpy.ndarray]: """ Generate the array of the dark current correction, eventually applying the mask and dummy pixel filtering and dark dataset filtering params: - filename_dark (str): the filename of the dark current correction - data_signal_shape (tuple): the shape of the data signal array, used for binning unification - datatype (str): format of the imported array, if None, datatype is respected - binning (tuple): binning of the data signal, used for binning unification - dummy (int): if not None, the value of the dummy pixels to filter - delta_dummy (float): if dummy is not None, the tolerance around the dummy value - filename_mask (str): the filename of the mask - dark_filter (str): if not None, the method to use for filtering a stack of darks into a single dark (e.g. "median" or "quantil") - dark_filter_quantil_lower (float): if dark_filter is "quantil", the lower quantile to use for filtering - dark_filter_quantil_upper (float): if dark_filter is "quantil", the upper quantile to use for filtering - use_cupy (bool): if True, returns a cupy.asarray - persistent (bool): if True, the array is cached in memory and only reloaded if the file modification time changes returns: - Optional[Union[numpy.ndarray, cupy.ndarray]]: the array dark current correction or None if the file does not exist """ if not persistent: return _get_array_dark( filename_dark=filename_dark, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, filename_mask=filename_mask, dark_filter=dark_filter, dark_filter_quantil_lower=dark_filter_quantil_lower, dark_filter_quantil_upper=dark_filter_quantil_upper, use_cupy=use_cupy, ) if filename_dark and os.path.exists(filename_dark): mtime_dark = os.path.getmtime(filename_dark) else: mtime_dark = None filename_dark = None if filename_mask and os.path.exists(filename_mask): mtime_mask = os.path.getmtime(filename_mask) else: mtime_mask = None filename_mask = None return _get_persistent_array_dark( filename_dark=filename_dark, mtime_dark=mtime_dark, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, filename_mask=filename_mask, mtime_mask=mtime_mask, dark_filter=dark_filter, dark_filter_quantil_lower=dark_filter_quantil_lower, dark_filter_quantil_upper=dark_filter_quantil_upper, use_cupy=use_cupy, )
@lru_cache(maxsize=5) def _get_persistent_array_dark( filename_dark: str, mtime_dark: float, data_signal_shape: tuple, datatype: str = None, binning: tuple = (1, 1), dummy: int = None, delta_dummy: float = None, filename_mask: str = None, mtime_mask: float = None, dark_filter: str = None, dark_filter_quantil_lower: float = 0.1, dark_filter_quantil_upper: float = 0.9, use_cupy: bool = False, ): logger.info( f"No cache hit. Creating new dark-current array from {filename_dark=}, {mtime_dark=}, {data_signal_shape=}, \ {datatype=}, {binning=}, {dummy=}, {delta_dummy=}, {filename_mask=}, {mtime_mask=}, {dark_filter=},\ {dark_filter_quantil_lower=}, {dark_filter_quantil_upper=}, {use_cupy=}" ) if filename_dark and os.path.exists(filename_dark): mtime_dark = os.path.getmtime(filename_dark) else: mtime_dark = None filename_dark = None if filename_mask and os.path.exists(filename_mask): mtime_mask = os.path.getmtime(filename_mask) else: mtime_mask = None filename_mask = None return _get_array_dark( filename_dark=filename_dark, data_signal_shape=data_signal_shape, datatype=datatype, binning=binning, dummy=dummy, delta_dummy=delta_dummy, filename_mask=filename_mask, dark_filter=dark_filter, dark_filter_quantil_lower=dark_filter_quantil_lower, dark_filter_quantil_upper=dark_filter_quantil_upper, use_cupy=use_cupy, ) def _get_array_dark( filename_dark: str, data_signal_shape: tuple, datatype: str = None, binning: tuple = (1, 1), dummy: int = None, delta_dummy: float = None, filename_mask: str = None, dark_filter: str = None, dark_filter_quantil_lower: float = 0.1, dark_filter_quantil_upper: float = 0.9, use_cupy: bool = False, ) -> Optional[numpy.ndarray]: if not filename_dark: return array_dark = load_data( filename=filename_dark, datatype=datatype, data_signal_shape=data_signal_shape, binning=binning, use_cupy=use_cupy, dark_filter=dark_filter, dark_filter_quantil_lower=dark_filter_quantil_lower, dark_filter_quantil_upper=dark_filter_quantil_upper, ) if array_dark is None: return array_mask = get_array_mask( filename_mask=filename_mask, data_signal_shape=data_signal_shape, datatype="bool", binning=binning, use_cupy=use_cupy, persistent=True, ) if array_mask is not None and dummy is not None and delta_dummy is not None: if delta_dummy == 0: array_mask |= array_dark == dummy else: array_mask |= abs(array_dark - dummy) < delta_dummy return array_dark
[docs] def get_dataset_signal_from_processed_file( filename: str, index_range: tuple = None, ): if filename and os.path.exists(filename): with h5py.File(filename, "r") as f: nxprocess_path = "entry_0000/PyFAI" if nxprocess_path not in f: logger.error(f"{filename} does not contain the {nxprocess_path} format") return nxprocess_grp = f[nxprocess_path] nxdata_name = next( (name for name in nxprocess_grp if "result_" in name), None ) if nxdata_name is None: logger.error(f"There is no result_xxx group in {nxprocess_path}") return nxdata_grp = nxprocess_grp[nxdata_name] if "data" not in nxdata_grp: logger.error( f"There is no data dataset in {nxprocess_path}/{nxdata_name}" ) return if index_range is not None: return nxdata_grp["data"][index_range[0] : index_range[-1]] return nxdata_grp["data"][:]
@lru_cache(maxsize=10) def _get_persistent_array( filename: str, mtime: float, **kwargs, ): if not filename: return logger.info( f"No cache hit. Creating new array from {filename=}, {mtime=}, {kwargs}" ) return load_data( filename=filename, **kwargs, )