Source code for ewoksid02.id02_format

import json
import logging
import threading
from contextlib import contextmanager
from importlib.metadata import version
from pathlib import Path

import h5py
import hdf5plugin  # noqa
import numpy
from pyFAI import version as pyFAIVersion
from silx.io.h5py_utils import open_item as open_item_silx
from silx.utils.retry import RetryTimeoutError

from ewoksid02.headers import HS32, read_headers
from ewoksid02.utils.io import deserialize_h5py_task, get_isotime, serialize_h5py_task

lock = threading.Lock()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

KEYS_FLOAT = [
    "Center_1",
    "Center_2",
    "Dummy",
    "DDummy",
    "PSize_1",
    "PSize_2",
    "SampleDistance",
    "WaveLength",
]

KEYS_INT = [
    "BSize_1",
    "BSize_2",
    "Offset_1",
    "Offset_2",
    "RasterOrientation",
]

PYFAI_PROCESSES = ["norm", "gaps", "2scat", "cave", "azim", "ave", "caving"]
TRUSAXS_PROCESSES = ["scalers", "dispatch", "debug"]
ALL_PROCESSES = PYFAI_PROCESSES + TRUSAXS_PROCESSES

MAP_DETECTORS_LIMA = {
    "eiger2": "ESRF-ID02",
    "waxs": "instrument",
    "default": "ESRF-ID02",
    "eiger500k": "ESRF-ID02",
}
DETECTOR_LIMA_DEFAULT = MAP_DETECTORS_LIMA.get("default")


[docs] @contextmanager def open_id02_file( filename: str, mode: str = "r", detector_name: str = None, scan_nb: int = None, processing_type: str = None, retry_timeout: float = 0.1, headers: dict = None, **kwargs, ): if mode == "w" and Path(filename).is_file(): raise RuntimeError(f"Attempt to overwrite {filename}. This is not allowed.") try: with open_item_silx( filename=filename, name="/", retry_timeout=retry_timeout, mode=mode, **kwargs, ) as root: creator = (root.attrs.get("creator") or "").lower() kwargs = { "root": root, "scan_nb": scan_nb, "detector_name": detector_name, "mode": mode, "processing_type": processing_type, "headers": headers, } if "ewoksid02" in creator or mode == "w": yield ID02ProcessedFileHS32(**kwargs) elif "lima" in creator: yield ID02BlissLimaFile(**kwargs) elif "bliss" in creator: yield ID02BlissMasterFile(**kwargs) else: logger.error(f"Unknown creator: {root.attrs.get('creator')!r}") yield None except RetryTimeoutError: logger.warning(f"File {filename} is not available yet.") yield None
[docs] class ID02FormatFile: def __init__( self, root: h5py.Group, scan_nb: int = None, detector_name: str = None, mode: str = None, processing_type: str = None, headers: dict = None, ): self._mode = mode self.root = root self.scan_nb = scan_nb self.detector_name = detector_name self.filename = root.file.filename self.processing_type = processing_type def __enter__(self): return self def __exit__(self, exc_type, exc, tb): try: self.root.close() finally: return False # don't suppress exceptions def __contains__(self, item): return item in self.root def __iter__(self): return iter(self.root) def __getitem__(self, item): return self.root[item] def __setitem__(self, item, value): self.root[item] = value
[docs] def create_group(self, *args, **kwargs): return self.root.create_group(*args, **kwargs)
@property def nxdetector_path(self) -> str: return @property def headers_path(self) -> str: return None @property def headers_group(self) -> h5py.Group: return None @property def headers(self) -> dict: return None
[docs] class ID02BlissMasterFile(ID02FormatFile): @property def nxdata_dataset_signal_path(self) -> str: return str( Path(f"{self.scan_nb}.1") / "instrument" / self.detector_name / "data" ) @property def streams_subscan1_path(self) -> str: return str(Path(f"{self.scan_nb}.1") / "measurement") @property def streams_subscan2_path(self) -> str: return str(Path(f"{self.scan_nb}.2") / "measurement") @property def collection_name(self) -> str: return self[f"{self.scan_nb}.1/sample/name"][()].decode() @property def filename_lima(self) -> str: if self.detector_name: return str( Path(self.filename).parent / f"{self.collection_name}_{self.detector_name}_{self.scan_nb:05}_00.h5" )
[docs] class ID02BlissLimaFile(ID02FormatFile): ENTRY_NAME = "entry_0000" def __init__( self, *args, **kwargs, ): super().__init__(*args, **kwargs) entry_grp = self[self.ENTRY_NAME] self.nxinstrument_name = next( ( n for n in entry_grp if entry_grp[n].attrs.get("NX_class") == "NXinstrument" ), None, ) if not self.detector_name: if self.nxinstrument_name: nxinstrument_grp = entry_grp[self.nxinstrument_name] nxdetector_name = next( ( n for n in nxinstrument_grp if nxinstrument_grp[n].attrs.get("NX_class") == "NXdetector" ), None, ) if nxdetector_name: self.detector_name = nxdetector_name @property def nxinstrument_path(self) -> str: return str(Path(self.ENTRY_NAME) / self.nxinstrument_name) @property def nxdetector_path(self) -> str: return str(Path(self.nxinstrument_path) / self.detector_name) @property def headers_path(self) -> str: return str(Path(self.nxdetector_path) / "header") @property def headers_group(self) -> h5py.Group: return self[self.headers_path] @property def headers(self) -> dict: return read_headers(h5_group=self.headers_group)
[docs] class ID02ProcessedFileHS32(ID02FormatFile): ENTRY_NAME = "/entry_0000" NXPROCESS_PYFAI_NAME = "PyFAI" NXPROCESS_TRUSAXS_NAME = "TRUSAXS" NXDATA_NAME_TEMPLATE = "result_{processing_type}" NXNOTE_CONFIGURATION_NAME = "configuration" NXCOLLECTION_MCS_NAME = "MCS" NXCOLLECTION_PARAMETERS_NAME = "parameters" NXCOLLECTION_TFG_NAME = "TFG" NXCOLLECTION_EWOKS_NAME = "ewoks" NXCOLLECTION_EWOKS_HISTORY_NAME = "history" NXCOLLECTION_EWOKS_LASTPROCESS_NAME = "last_process" FORMAT_HS32C = "int64" FORMAT_HS32F = "float64" FORMAT_HS32V = "float64" FORMAT_STRING = h5py.string_dtype(encoding="utf-8") FORMAT_HS32N = FORMAT_STRING FORMAT_HS32Z = "float64" FORMAT_EWOKS_PROCESS_TITLE = "{index:02} - {class_name}" DATASET_SIGNAL_NAME = "data" DATASET_VARIANCE_NAME = "data_variance" DATASET_SIGMA_NAME = "data_errors" MCS_RAW_NAME = "raw" MCS_SUBSCAN1 = "subscan_1" MCS_SUBSCAN2 = "subscan_2" MCS_INTERPRETED = "interpreted" KEY_EXPOSURE_TIME = "ExposureTime" KEY_DELTA_TIME = "delta_time" KEY_DELTA_TIME_NXDATA = "t" KEY_I0_SHUTCOR = "Intensity0ShutCor" KEY_I0_UNCOR = "Intensity0UnCor" KEY_I1_SHUTCOR = "Intensity1ShutCor" KEY_I1_UNCOR = "Intensity1UnCor" KEY_TITLEEXTENSION = "TitleExtension" COMPRESSION_ALGORITHM = hdf5plugin.Bitshuffle(cname="lz4") CHUNK_SIZE_3D = (1, 200, 200) def __init__( self, headers: dict = None, # Only to create the file ('w') *args, **kwargs, ): super().__init__(*args, **kwargs) self._headers = headers if self._mode == "w": if not self.processing_type: raise ValueError("New ID02 processing file needs a processing_type") if self.processing_type in PYFAI_PROCESSES and not self.detector_name: raise ValueError( f"PyFAI process {self.processing_type} needs a detector_name" ) self._complete_root_group() self._build_entry_group() self._build_configuration_group(headers=headers) self._build_nxprocess_group() self._build_MCS_group(headers=headers) self._build_parameters_group(processing_parameters=headers) self._build_TFG_group(headers=headers) self._build_ewoks_groups() else: self.processing_type = self._read_processing_type() self.detector_name = self._read_detector_name()
[docs] def is_id02_processed_file(self): if self.ENTRY_NAME in self: if self.root.attrs.get("creator") == "ewoksid02": return True return False
@property def entry_group(self) -> h5py.Group: return self[self.ENTRY_NAME] @property def configuration_path(self) -> str: return str(Path(self.ENTRY_NAME) / self.NXNOTE_CONFIGURATION_NAME) @property def configuration_group(self) -> h5py.Group: return self[self.configuration_path] @property def nxprocess_name(self) -> str: if self.processing_type in PYFAI_PROCESSES: return self.NXPROCESS_PYFAI_NAME elif self.processing_type in TRUSAXS_PROCESSES: return self.NXPROCESS_TRUSAXS_NAME @property def nxprocess_path(self) -> str: return str(Path(self.ENTRY_NAME) / self.nxprocess_name) @property def nxprocess_group(self) -> h5py.Group: return self[self.nxprocess_path] @property def nxdata_name(self) -> str: if self.processing_type in PYFAI_PROCESSES: return self.NXDATA_NAME_TEMPLATE.format( processing_type=self.processing_type ) @property def mcs_path(self) -> str: return str(Path(self.nxprocess_path) / self.NXCOLLECTION_MCS_NAME) @property def mcs_group(self) -> h5py.Group: return self[self.mcs_path] @property def parameters_path(self) -> str: return str(Path(self.nxprocess_path) / self.NXCOLLECTION_PARAMETERS_NAME) @property def parameters_group(self) -> h5py.Group: return self[self.parameters_path] @property def headers_path(self) -> str: return self.parameters_path @property def headers_group(self) -> h5py.Group: return self.parameters_group @property def headers(self) -> dict: if not self._headers: self._headers = read_headers(h5_group=self.headers_group) return self._headers @property def tfg_path(self) -> str: return str(Path(self.nxprocess_path) / self.NXCOLLECTION_TFG_NAME) @property def tfg_group(self) -> h5py.Group: return self[self.tfg_path] @property def ewoks_path(self) -> str: return str(Path(self.ENTRY_NAME) / self.NXCOLLECTION_EWOKS_NAME) @property def ewoks_group(self) -> h5py.Group: return self[self.ewoks_path] @property def ewoks_history_path(self) -> str: return str(Path(self.ewoks_path) / self.NXCOLLECTION_EWOKS_HISTORY_NAME) @property def ewoks_history_group(self) -> h5py.Group: return self[self.ewoks_history_path] @property def ewoks_last_process_path(self) -> str: return str(Path(self.ewoks_path) / self.NXCOLLECTION_EWOKS_LASTPROCESS_NAME) @property def nxdata_path(self) -> str: return str(Path(self.nxprocess_path) / self.nxdata_name) @property def nxdata_dataset_signal_path(self) -> str: return str(Path(self.nxdata_path) / self.DATASET_SIGNAL_NAME) @property def nxdata_dataset_variance_path(self) -> str: return str(Path(self.nxdata_path) / self.DATASET_VARIANCE_NAME) @property def nxdata_dataset_sigma_path(self) -> str: return str(Path(self.nxdata_path) / self.DATASET_SIGMA_NAME) @property def nxdata_delta_time_path(self) -> str: return str(Path(self.nxdata_path) / self.KEY_DELTA_TIME_NXDATA) @property def nxdetector_path(self) -> str: return str(Path(self.nxprocess_path) / self.detector_name) @property def streams_subscan1_path(self) -> str: return str(Path(self.mcs_path) / self.MCS_RAW_NAME / self.MCS_SUBSCAN1) @property def streams_subscan2_path(self) -> str: return str(Path(self.mcs_path) / self.MCS_RAW_NAME / self.MCS_SUBSCAN2) def _read_processing_type(self) -> str: """ Blind reading of processing type """ entry_group = self[self.ENTRY_NAME] nxprocess_name = None if self.NXPROCESS_PYFAI_NAME in entry_group: nxprocess_name = self.NXPROCESS_PYFAI_NAME elif self.NXPROCESS_TRUSAXS_NAME in entry_group: nxprocess_name = self.NXPROCESS_TRUSAXS_NAME if nxprocess_name: return entry_group[nxprocess_name]["processing_type"][()].decode() def _read_detector_name(self) -> str: """ Blind reading of detector_name """ if "detector_name" in self[self.ENTRY_NAME]: return self[self.ENTRY_NAME]["detector_name"][()].decode() def _complete_root_group(self) -> None: self.root.attrs.update( { "HDF5_Version": h5py.version.hdf5_version, "NX_class": "NXroot", "creator": "ewoksid02", "creator_version": version("ewoksid02"), "file_name": str(self.filename), "file_time": get_isotime(), "default": self.ENTRY_NAME, "processing_type": self.processing_type, "detector_name": self.detector_name, } ) def _build_entry_group(self) -> None: if self.processing_type in PYFAI_PROCESSES: title = f"{self.filename}::{self.nxdata_path}" elif self.processing_type in TRUSAXS_PROCESSES: title = "TFG metadata collection" else: title = "" entry_group = self.create_h5_group( h5_group_path=self.ENTRY_NAME, title=title, NX_class="NXentry", ) entry_group["start_time"] = str(get_isotime()) if self.processing_type in PYFAI_PROCESSES: entry_group.attrs["default"] = self.nxdata_path entry_group["detector_name"] = self.detector_name def _build_configuration_group(self, headers: dict) -> None: configuration_group = self.create_h5_group( h5_group_path=self.configuration_path, NX_class="NXnote", ) configuration_group["type"] = "text/json" headers_to_serialize = headers.copy() for k, v in headers.items(): if isinstance(v, numpy.ndarray): headers_to_serialize[k] = v.tolist() configuration_group["data"] = json.dumps( headers_to_serialize, indent=2, separators=(",\r\n", ": ") ) def _build_nxprocess_group(self) -> None: if self.processing_type in PYFAI_PROCESSES: process_group = self.create_h5_group( h5_group_path=self.nxprocess_path, NX_class="NXprocess", default=self.nxdata_path, ) process_group["date"] = str(get_isotime()) process_group["processing_type"] = self.processing_type process_group["program"] = "pyFAI" process_group["version"] = pyFAIVersion elif self.processing_type in TRUSAXS_PROCESSES: process_group = self.create_h5_group( h5_group_path=self.nxprocess_path, NX_class="NXinstrument", ) process_group["date"] = str(get_isotime()) process_group["processing_type"] = self.processing_type process_group["program"] = "TruSAXS" def _build_MCS_group(self, headers: dict) -> None: mcs_group = self.create_h5_group( h5_group_path=self.mcs_path, NX_class="NXcollection", ) mcs_group["device"] = "bliss" if not headers: logger.error(f"There is no headers to initialize MCS on {self.filename}") return # HS32 N(name), Z(zero), F(factor) arrays nb_pins = HS32.get_HS32_number_pins(headers=self.headers) HS32N_array = numpy.array( [ self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}") for index_pin in range(nb_pins) ], dtype=self.FORMAT_HS32N, ) HS32Z_array = numpy.array( [ (self.headers.get(f"{HS32.KEY_HS32_ZERO_VALUE}{(index_pin + 1):02}")) for index_pin in range(nb_pins) ], dtype=self.FORMAT_HS32Z, ) HS32F_array = numpy.array( [ self.headers.get(f"{HS32.KEY_HS32_FACTOR}{(index_pin + 1):02}") for index_pin in range(nb_pins) ], dtype=self.FORMAT_HS32F, ) mcs_group.create_dataset( name=HS32.KEY_HS32_NAME, data=HS32N_array, dtype=self.FORMAT_HS32N, ) mcs_group.create_dataset( name=HS32.KEY_HS32_ZERO_VALUE, data=HS32Z_array, dtype=self.FORMAT_HS32Z, ) mcs_group.create_dataset( name=HS32.KEY_HS32_FACTOR, data=HS32F_array, dtype=self.FORMAT_HS32F, ) # HSI0Factor, HSI1Factor for key in [ HS32.KEY_MONITOR_0_FACTOR, HS32.KEY_MONITOR_1_FACTOR, ]: value = headers.get( key, ) if value is not None: mcs_group.create_dataset(name=key, data=value, dtype="float64") # HSI0, HSI1, HSTime for key in [ HS32.KEY_MONITOR_0, HS32.KEY_MONITOR_1, HS32.KEY_EXPOSURE_TIME, ]: pin_name = headers.get(key) for index_pin in range(nb_pins): if ( self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}") == pin_name ): mcs_group.create_dataset( name=key, data=index_pin + 1, dtype="int64" ) break # ShutterTime for key in [HS32.KEY_SOT, HS32.KEY_SCT]: value = headers.get(key) if value is None: value = 0.0 mcs_group.create_dataset(name=key, data=value, dtype="float64") def _build_parameters_group(self, processing_parameters: dict = None) -> None: self.create_h5_group( h5_group_path=self.parameters_path, NX_class="NXcollection", ) if processing_parameters: self.write_processing_parameters( processing_parameters=processing_parameters )
[docs] def write_processing_parameters(self, processing_parameters: dict) -> None: # After version 0.2.0, the processing parameters will be overwritten by the ones explicitly used (not the header ones) for key, value in processing_parameters.items(): if isinstance(value, numpy.ndarray): if key in self.parameters_group: dset = self.parameters_group[key] nb_written_frames = len(dset) dset.resize((nb_written_frames + len(value),)) dset[nb_written_frames:] = value else: dset = self.parameters_group.create_dataset( name=key, maxshape=(None,), data=value, dtype=value.dtype, ) else: if key in KEYS_FLOAT: dtype = "float64" elif key in KEYS_INT: dtype = "int64" elif key == HS32.KEY_TITLEEXTENSION: continue else: dtype = self.FORMAT_STRING value = str(value) if key in self.parameters_group: dset = self.parameters_group[key] else: dset = self.parameters_group.create_dataset( name=key, data=value, dtype=dtype, ) dset[()] = value
[docs] def update_dataset( self, added_dataset: numpy.ndarray, h5_dataset_path: str, index_read: tuple = None, datatype: str = None, **kwargs, ) -> None: """ Update a dataset in a HDF5 file with new data. It will create the dataset if it does not exist. added_dataset: numpy.ndarray - Array with the new data. It has to contain an additional dimension to the data h5_group: h5py.Group - Group in the HDF5 file where the dataset is located h5_dataset_name: str - Name of the dataset in h5_group kwargs: dict - Additional arguments to add as attributes in the dataset """ if not isinstance(added_dataset, numpy.ndarray): logger.error(f"Added dataset is not a numpy array. {type(added_dataset)}") return nb_new_frames = len(added_dataset) if nb_new_frames == 0: return if index_read is None: slice_init = 0 slice_end = nb_new_frames else: slice_init, slice_end = index_read logger.debug( f"Updating dataset {h5_dataset_path} with {nb_new_frames} new frames" ) ndim = added_dataset.ndim if h5_dataset_path not in self.root: if ndim == 3: interpretation = "image" dtype = datatype compression = self.COMPRESSION_ALGORITHM chunks = self.CHUNK_SIZE_3D elif ndim == 2: interpretation = "spectrum" dtype = datatype or "float64" compression = None chunks = None elif ndim == 1: interpretation = "scalar" dtype = datatype or added_dataset.dtype compression = None chunks = None if added_dataset.dtype.kind == "U": dtype = self.FORMAT_STRING added_dataset = added_dataset.astype(self.FORMAT_STRING) dset = self.root.create_dataset( name=h5_dataset_path, shape=(0,) + added_dataset.shape[1:ndim], maxshape=(None,) + added_dataset.shape[1:ndim], chunks=chunks, dtype=dtype, compression=compression, ) dset.attrs["interpretation"] = interpretation for key, value in kwargs.items(): dset.attrs[key] = value else: dset = self[h5_dataset_path] # The added dataset can be: # - New frames to append between index_range_last limits (normal loop procedure) # - Rewrite the whole dataset, posibly with more frames (subscan2 interpolation) dset_current_nb_frames = len(dset) if slice_end <= dset_current_nb_frames: # No resize is needed ... else: dset_new_shape = (slice_end, *dset.shape[1:ndim]) dset.resize(dset_new_shape) try: dset[slice_init:slice_end] = added_dataset logger.debug( f"Dataset {h5_dataset_path} updated with {len(added_dataset)} frames" ) except Exception as e: logger.error(f"""{e}: Failed while saving {h5_dataset_path}. \ Incoming dataset shape: {added_dataset.shape}, \ hdf5 dset shape={dset.shape}, {added_dataset.shape=} \ limits: {slice_init} -> {slice_end} """)
def _build_TFG_group(self, headers: dict) -> None: tfg_group = self.create_h5_group( h5_group_path=self.tfg_path, NX_class="NXcollection", ) tfg_group["device"] = "bliss" if not headers: logger.error(f"There is no headers to initialize MCS on {self.filename}") return for key in [HS32.KEY_STARTEPOCH, HS32.KEY_STARTTIME]: value = headers.get(key) if value is not None: tfg_group.create_dataset( name=key, data=str(value), dtype=self.FORMAT_STRING, ) else: logger.warning(f"Key {key} not found in headers") def _build_ewoks_groups(self) -> None: self.create_h5_group( h5_group_path=self.ewoks_path, NX_class="NXcollection", ) self.create_h5_group( h5_group_path=self.ewoks_history_path, NX_class="NXcollection", ) def _write_ewoks_info_last_process(self, ewoks_info: dict): # This methods appens the current info the history and links it to last_process # Index and title have been already fixed self._append_ewoks_info(ewoks_info=ewoks_info) self.ewoks_group[self.NXCOLLECTION_EWOKS_LASTPROCESS_NAME] = h5py.SoftLink( f"{self.NXCOLLECTION_EWOKS_HISTORY_NAME}/{ewoks_info['title']}" )
[docs] def write_ewoks_history(self, list_ewoks_history: list): # This method only writes the previous history for ewoks_info in list_ewoks_history: self._append_ewoks_info(ewoks_info=ewoks_info) if list_ewoks_history: self[self.ewoks_last_process_path] = h5py.SoftLink( f"{self.ewoks_history_path}/{ewoks_info['title']}" )
def _append_ewoks_info(self, ewoks_info: dict): # Only add a new group, index has been already fixed ewoks_info_group = self.create_h5_group( h5_group_path=str(Path(self.ewoks_history_path) / ewoks_info["title"]), NX_class="NXcollection", ) deserialize_h5py_task( h5dict=ewoks_info, h5py_parent=ewoks_info_group, )
[docs] def read_ewoks_history(self) -> list: info_history_complete = [] nb_previous_steps = [_ for _ in self.ewoks_history_group] if nb_previous_steps == 0: return info_history_complete for process_name in self.ewoks_history_group: historic_task_serialized = serialize_h5py_task( h5py_group=self.ewoks_history_group[process_name] ) info_history_complete.append(historic_task_serialized) return info_history_complete
[docs] def create_h5_group( self, h5_group_path: str, title: str = None, **kwargs, ) -> h5py.Group: """ Unified method to create a group in a HDF5 file with additional attributes. h5_parent_group: h5py.Group - Parent group where the new group will be created h5_group_name: str - Name of the new group title: str - Title of the group kwargs: dict - Additional arguments to add as attributes in the group """ if h5_group_path in self.root: logger.warning(f"Group {h5_group_path} already exists") return self.root[h5_group_path] h5_group = self.root.create_group(h5_group_path) if title: h5_group["title"] = title for key, value in kwargs.items(): h5_group.attrs[key] = value return h5_group
def _save_benchmark( self, bench, ): benchmark_name = bench.benchmark_name total_time = bench.bench_total_s time_per_frame = bench.bench_per_frame_ms nb_frames = bench.nb_frames if self.ewoks_last_process_path not in self.root: return title = self.root[self.ewoks_last_process_path]["title"][()].decode() h5path_ewoks_benchmark = str( Path(self.ewoks_history_path) / title / "benchmark" ) if h5path_ewoks_benchmark not in self: bench_grp = self.create_h5_group( h5_group_path=h5path_ewoks_benchmark, title="Benchmark", NX_class="NXdata", default=h5path_ewoks_benchmark, signal="data", # axes=["steps"], axes=["index"], ) steps = bench_grp.create_dataset( name="steps", shape=(0,), maxshape=(None,), dtype=self.FORMAT_STRING, ) indexes = bench_grp.create_dataset( name="index", shape=(0,), maxshape=(None,), dtype="int32", ) data = bench_grp.create_dataset( name="data", shape=(0,), maxshape=(None,), dtype="float32", ) data.attrs.update( { "interpretation": "spectrum", } ) data_errors = bench_grp.create_dataset( name="data_errors", shape=(0,), maxshape=(None,), dtype="float32", ) data_errors.attrs.update( { "interpretation": "spectrum", } ) else: bench_grp = self[h5path_ewoks_benchmark] steps = bench_grp["steps"] indexes = bench_grp["index"] data = bench_grp["data"] data_errors = bench_grp["data_errors"] h5path_ewoks_benchmark_step = str( Path(self.ewoks_history_path) / title / "benchmark" / benchmark_name ) if h5path_ewoks_benchmark_step not in self: bench_grp_step = self.create_h5_group( h5_group_path=h5path_ewoks_benchmark_step, title=benchmark_name, NX_class="NXdata", default=h5path_ewoks_benchmark, ) else: bench_grp_step = self[h5path_ewoks_benchmark_step] if f"{benchmark_name}_loop_nb" not in bench_grp_step: loop_dset = bench_grp_step.create_dataset( name=f"{benchmark_name}_loop_nb", dtype="int32", shape=(0,), maxshape=(None,), ) perframe_dset = bench_grp_step.create_dataset( name=f"{benchmark_name}_per_frame", dtype="float32", shape=(0,), maxshape=(None,), ) perframe_mean_dset = bench_grp_step.create_dataset( name=f"{benchmark_name}_per_frame_mean", dtype="float32", data=time_per_frame, ) perframe_std_dset = bench_grp_step.create_dataset( name=f"{benchmark_name}_per_frame_std", dtype="float32", data=0.0, ) nbframes_dset = bench_grp_step.create_dataset( name=f"{benchmark_name}_nb_frames", dtype="int32", shape=(0,), maxshape=(None,), ) total_dset = bench_grp_step.create_dataset( name=f"{benchmark_name}_total", dtype="float32", shape=(0,), maxshape=(None,), ) accumulated_dset = bench_grp_step.create_dataset( name=f"{benchmark_name}_accumulated", dtype="float32", shape=(0,), maxshape=(None,), ) steps.resize((steps.shape[0] + 1,)) steps[-1] = benchmark_name indexes.resize((indexes.shape[0] + 1,)) indexes[-1] = len(indexes) data.resize((data.shape[0] + 1,)) data_errors.resize((data_errors.shape[0] + 1,)) else: loop_dset = bench_grp_step[f"{benchmark_name}_loop_nb"] perframe_dset = bench_grp_step[f"{benchmark_name}_per_frame"] perframe_mean_dset = bench_grp_step[f"{benchmark_name}_per_frame_mean"] perframe_std_dset = bench_grp_step[f"{benchmark_name}_per_frame_std"] nbframes_dset = bench_grp_step[f"{benchmark_name}_nb_frames"] total_dset = bench_grp_step[f"{benchmark_name}_total"] accumulated_dset = bench_grp_step[f"{benchmark_name}_accumulated"] # Append new data loop_dset.resize((loop_dset.shape[0] + 1,)) perframe_dset.resize((perframe_dset.shape[0] + 1,)) nbframes_dset.resize((nbframes_dset.shape[0] + 1,)) total_dset.resize((total_dset.shape[0] + 1,)) accumulated_dset.resize((accumulated_dset.shape[0] + 1,)) loop_dset[-1] = len(loop_dset) perframe_dset[-1] = time_per_frame nbframes_dset[-1] = nb_frames total_dset[-1] = total_time if len(accumulated_dset) == 1: accumulated_dset[-1] = total_time else: accumulated_dset[-1] = accumulated_dset[-2] + total_time mean_value = perframe_dset[:].mean() std_value = perframe_dset[:].std() perframe_mean_dset[()] = mean_value perframe_std_dset[()] = std_value step_names = [s.decode() for s in steps] index_benchmark = next( (i for i, s in enumerate(step_names) if s == benchmark_name), None ) if index_benchmark is not None: data[index_benchmark] = mean_value data_errors[index_benchmark] = std_value
[docs] def update_nexus_detector_group( nxdetector_group_destination: h5py.Group = None, nxdetector_group_source: h5py.Group = None, ): for name, item in nxdetector_group_source.items(): if isinstance(item, h5py.Group): # Recursively copy subgroups new_subgroup = nxdetector_group_destination.create_group(name) copy_group_excluding_dataset(item, new_subgroup, "data") elif isinstance(item, h5py.Dataset): if name != "data": # Skip the excluded dataset nxdetector_group_source.copy( name, nxdetector_group_destination, name=name )
[docs] def copy_group_excluding_dataset(src_group, dest_group, exclude_dataset): for attr_name, attr_value in src_group.attrs.items(): dest_group.attrs[attr_name] = attr_value for name, item in src_group.items(): if isinstance(item, h5py.Group): # Recursively copy subgroups new_subgroup = dest_group.create_group(name) copy_group_excluding_dataset(item, new_subgroup, exclude_dataset) elif isinstance(item, h5py.Dataset): if name != exclude_dataset: # Skip the excluded dataset src_group.copy(name, dest_group, name=name)