import json
import logging
import threading
from contextlib import contextmanager
from importlib.metadata import version
from pathlib import Path
import h5py
import hdf5plugin # noqa
import numpy
from pyFAI import version as pyFAIVersion
from silx.io.h5py_utils import open_item as open_item_silx
from silx.utils.retry import RetryTimeoutError
from ewoksid02.headers import HS32, read_headers
from ewoksid02.utils.io import deserialize_h5py_task, get_isotime, serialize_h5py_task
lock = threading.Lock()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
KEYS_FLOAT = [
"Center_1",
"Center_2",
"Dummy",
"DDummy",
"PSize_1",
"PSize_2",
"SampleDistance",
"WaveLength",
]
KEYS_INT = [
"BSize_1",
"BSize_2",
"Offset_1",
"Offset_2",
"RasterOrientation",
]
PYFAI_PROCESSES = ["norm", "gaps", "2scat", "cave", "azim", "ave", "caving"]
TRUSAXS_PROCESSES = ["scalers", "dispatch", "debug"]
ALL_PROCESSES = PYFAI_PROCESSES + TRUSAXS_PROCESSES
MAP_DETECTORS_LIMA = {
"eiger2": "ESRF-ID02",
"waxs": "instrument",
"default": "ESRF-ID02",
"eiger500k": "ESRF-ID02",
}
DETECTOR_LIMA_DEFAULT = MAP_DETECTORS_LIMA.get("default")
[docs]
@contextmanager
def open_id02_file(
filename: str,
mode: str = "r",
detector_name: str = None,
scan_nb: int = None,
processing_type: str = None,
retry_timeout: float = 0.1,
headers: dict = None,
**kwargs,
):
if mode == "w" and Path(filename).is_file():
raise RuntimeError(f"Attempt to overwrite {filename}. This is not allowed.")
try:
with open_item_silx(
filename=filename,
name="/",
retry_timeout=retry_timeout,
mode=mode,
**kwargs,
) as root:
creator = (root.attrs.get("creator") or "").lower()
kwargs = {
"root": root,
"scan_nb": scan_nb,
"detector_name": detector_name,
"mode": mode,
"processing_type": processing_type,
"headers": headers,
}
if "ewoksid02" in creator or mode == "w":
yield ID02ProcessedFileHS32(**kwargs)
elif "lima" in creator:
yield ID02BlissLimaFile(**kwargs)
elif "bliss" in creator:
yield ID02BlissMasterFile(**kwargs)
else:
logger.error(f"Unknown creator: {root.attrs.get('creator')!r}")
yield None
except RetryTimeoutError:
logger.warning(f"File {filename} is not available yet.")
yield None
[docs]
class ID02BlissMasterFile(ID02FormatFile):
@property
def nxdata_dataset_signal_path(self) -> str:
return str(
Path(f"{self.scan_nb}.1") / "instrument" / self.detector_name / "data"
)
@property
def streams_subscan1_path(self) -> str:
return str(Path(f"{self.scan_nb}.1") / "measurement")
@property
def streams_subscan2_path(self) -> str:
return str(Path(f"{self.scan_nb}.2") / "measurement")
@property
def collection_name(self) -> str:
return self[f"{self.scan_nb}.1/sample/name"][()].decode()
@property
def filename_lima(self) -> str:
if self.detector_name:
return str(
Path(self.filename).parent
/ f"{self.collection_name}_{self.detector_name}_{self.scan_nb:05}_00.h5"
)
[docs]
class ID02BlissLimaFile(ID02FormatFile):
ENTRY_NAME = "entry_0000"
def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
entry_grp = self[self.ENTRY_NAME]
self.nxinstrument_name = next(
(
n
for n in entry_grp
if entry_grp[n].attrs.get("NX_class") == "NXinstrument"
),
None,
)
if not self.detector_name:
if self.nxinstrument_name:
nxinstrument_grp = entry_grp[self.nxinstrument_name]
nxdetector_name = next(
(
n
for n in nxinstrument_grp
if nxinstrument_grp[n].attrs.get("NX_class") == "NXdetector"
),
None,
)
if nxdetector_name:
self.detector_name = nxdetector_name
@property
def nxinstrument_path(self) -> str:
return str(Path(self.ENTRY_NAME) / self.nxinstrument_name)
@property
def nxdetector_path(self) -> str:
return str(Path(self.nxinstrument_path) / self.detector_name)
@property
def headers_path(self) -> str:
return str(Path(self.nxdetector_path) / "header")
@property
def headers_group(self) -> h5py.Group:
return self[self.headers_path]
@property
def headers(self) -> dict:
return read_headers(h5_group=self.headers_group)
[docs]
class ID02ProcessedFileHS32(ID02FormatFile):
ENTRY_NAME = "/entry_0000"
NXPROCESS_PYFAI_NAME = "PyFAI"
NXPROCESS_TRUSAXS_NAME = "TRUSAXS"
NXDATA_NAME_TEMPLATE = "result_{processing_type}"
NXNOTE_CONFIGURATION_NAME = "configuration"
NXCOLLECTION_MCS_NAME = "MCS"
NXCOLLECTION_PARAMETERS_NAME = "parameters"
NXCOLLECTION_TFG_NAME = "TFG"
NXCOLLECTION_EWOKS_NAME = "ewoks"
NXCOLLECTION_EWOKS_HISTORY_NAME = "history"
NXCOLLECTION_EWOKS_LASTPROCESS_NAME = "last_process"
FORMAT_HS32C = "int64"
FORMAT_HS32F = "float64"
FORMAT_HS32V = "float64"
FORMAT_STRING = h5py.string_dtype(encoding="utf-8")
FORMAT_HS32N = FORMAT_STRING
FORMAT_HS32Z = "float64"
FORMAT_EWOKS_PROCESS_TITLE = "{index:02} - {class_name}"
DATASET_SIGNAL_NAME = "data"
DATASET_VARIANCE_NAME = "data_variance"
DATASET_SIGMA_NAME = "data_errors"
MCS_RAW_NAME = "raw"
MCS_SUBSCAN1 = "subscan_1"
MCS_SUBSCAN2 = "subscan_2"
MCS_INTERPRETED = "interpreted"
KEY_EXPOSURE_TIME = "ExposureTime"
KEY_DELTA_TIME = "delta_time"
KEY_DELTA_TIME_NXDATA = "t"
KEY_I0_SHUTCOR = "Intensity0ShutCor"
KEY_I0_UNCOR = "Intensity0UnCor"
KEY_I1_SHUTCOR = "Intensity1ShutCor"
KEY_I1_UNCOR = "Intensity1UnCor"
KEY_TITLEEXTENSION = "TitleExtension"
COMPRESSION_ALGORITHM = hdf5plugin.Bitshuffle(cname="lz4")
CHUNK_SIZE_3D = (1, 200, 200)
def __init__(
self,
headers: dict = None, # Only to create the file ('w')
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self._headers = headers
if self._mode == "w":
if not self.processing_type:
raise ValueError("New ID02 processing file needs a processing_type")
if self.processing_type in PYFAI_PROCESSES and not self.detector_name:
raise ValueError(
f"PyFAI process {self.processing_type} needs a detector_name"
)
self._complete_root_group()
self._build_entry_group()
self._build_configuration_group(headers=headers)
self._build_nxprocess_group()
self._build_MCS_group(headers=headers)
self._build_parameters_group(processing_parameters=headers)
self._build_TFG_group(headers=headers)
self._build_ewoks_groups()
else:
self.processing_type = self._read_processing_type()
self.detector_name = self._read_detector_name()
[docs]
def is_id02_processed_file(self):
if self.ENTRY_NAME in self:
if self.root.attrs.get("creator") == "ewoksid02":
return True
return False
@property
def entry_group(self) -> h5py.Group:
return self[self.ENTRY_NAME]
@property
def configuration_path(self) -> str:
return str(Path(self.ENTRY_NAME) / self.NXNOTE_CONFIGURATION_NAME)
@property
def configuration_group(self) -> h5py.Group:
return self[self.configuration_path]
@property
def nxprocess_name(self) -> str:
if self.processing_type in PYFAI_PROCESSES:
return self.NXPROCESS_PYFAI_NAME
elif self.processing_type in TRUSAXS_PROCESSES:
return self.NXPROCESS_TRUSAXS_NAME
@property
def nxprocess_path(self) -> str:
return str(Path(self.ENTRY_NAME) / self.nxprocess_name)
@property
def nxprocess_group(self) -> h5py.Group:
return self[self.nxprocess_path]
@property
def nxdata_name(self) -> str:
if self.processing_type in PYFAI_PROCESSES:
return self.NXDATA_NAME_TEMPLATE.format(
processing_type=self.processing_type
)
@property
def mcs_path(self) -> str:
return str(Path(self.nxprocess_path) / self.NXCOLLECTION_MCS_NAME)
@property
def mcs_group(self) -> h5py.Group:
return self[self.mcs_path]
@property
def parameters_path(self) -> str:
return str(Path(self.nxprocess_path) / self.NXCOLLECTION_PARAMETERS_NAME)
@property
def parameters_group(self) -> h5py.Group:
return self[self.parameters_path]
@property
def headers_path(self) -> str:
return self.parameters_path
@property
def headers_group(self) -> h5py.Group:
return self.parameters_group
@property
def headers(self) -> dict:
if not self._headers:
self._headers = read_headers(h5_group=self.headers_group)
return self._headers
@property
def tfg_path(self) -> str:
return str(Path(self.nxprocess_path) / self.NXCOLLECTION_TFG_NAME)
@property
def tfg_group(self) -> h5py.Group:
return self[self.tfg_path]
@property
def ewoks_path(self) -> str:
return str(Path(self.ENTRY_NAME) / self.NXCOLLECTION_EWOKS_NAME)
@property
def ewoks_group(self) -> h5py.Group:
return self[self.ewoks_path]
@property
def ewoks_history_path(self) -> str:
return str(Path(self.ewoks_path) / self.NXCOLLECTION_EWOKS_HISTORY_NAME)
@property
def ewoks_history_group(self) -> h5py.Group:
return self[self.ewoks_history_path]
@property
def ewoks_last_process_path(self) -> str:
return str(Path(self.ewoks_path) / self.NXCOLLECTION_EWOKS_LASTPROCESS_NAME)
@property
def nxdata_path(self) -> str:
return str(Path(self.nxprocess_path) / self.nxdata_name)
@property
def nxdata_dataset_signal_path(self) -> str:
return str(Path(self.nxdata_path) / self.DATASET_SIGNAL_NAME)
@property
def nxdata_dataset_variance_path(self) -> str:
return str(Path(self.nxdata_path) / self.DATASET_VARIANCE_NAME)
@property
def nxdata_dataset_sigma_path(self) -> str:
return str(Path(self.nxdata_path) / self.DATASET_SIGMA_NAME)
@property
def nxdata_delta_time_path(self) -> str:
return str(Path(self.nxdata_path) / self.KEY_DELTA_TIME_NXDATA)
@property
def nxdetector_path(self) -> str:
return str(Path(self.nxprocess_path) / self.detector_name)
@property
def streams_subscan1_path(self) -> str:
return str(Path(self.mcs_path) / self.MCS_RAW_NAME / self.MCS_SUBSCAN1)
@property
def streams_subscan2_path(self) -> str:
return str(Path(self.mcs_path) / self.MCS_RAW_NAME / self.MCS_SUBSCAN2)
def _read_processing_type(self) -> str:
"""
Blind reading of processing type
"""
entry_group = self[self.ENTRY_NAME]
nxprocess_name = None
if self.NXPROCESS_PYFAI_NAME in entry_group:
nxprocess_name = self.NXPROCESS_PYFAI_NAME
elif self.NXPROCESS_TRUSAXS_NAME in entry_group:
nxprocess_name = self.NXPROCESS_TRUSAXS_NAME
if nxprocess_name:
return entry_group[nxprocess_name]["processing_type"][()].decode()
def _read_detector_name(self) -> str:
"""
Blind reading of detector_name
"""
if "detector_name" in self[self.ENTRY_NAME]:
return self[self.ENTRY_NAME]["detector_name"][()].decode()
def _complete_root_group(self) -> None:
self.root.attrs.update(
{
"HDF5_Version": h5py.version.hdf5_version,
"NX_class": "NXroot",
"creator": "ewoksid02",
"creator_version": version("ewoksid02"),
"file_name": str(self.filename),
"file_time": get_isotime(),
"default": self.ENTRY_NAME,
"processing_type": self.processing_type,
"detector_name": self.detector_name,
}
)
def _build_entry_group(self) -> None:
if self.processing_type in PYFAI_PROCESSES:
title = f"{self.filename}::{self.nxdata_path}"
elif self.processing_type in TRUSAXS_PROCESSES:
title = "TFG metadata collection"
else:
title = ""
entry_group = self.create_h5_group(
h5_group_path=self.ENTRY_NAME,
title=title,
NX_class="NXentry",
)
entry_group["start_time"] = str(get_isotime())
if self.processing_type in PYFAI_PROCESSES:
entry_group.attrs["default"] = self.nxdata_path
entry_group["detector_name"] = self.detector_name
def _build_configuration_group(self, headers: dict) -> None:
configuration_group = self.create_h5_group(
h5_group_path=self.configuration_path,
NX_class="NXnote",
)
configuration_group["type"] = "text/json"
headers_to_serialize = headers.copy()
for k, v in headers.items():
if isinstance(v, numpy.ndarray):
headers_to_serialize[k] = v.tolist()
configuration_group["data"] = json.dumps(
headers_to_serialize, indent=2, separators=(",\r\n", ": ")
)
def _build_nxprocess_group(self) -> None:
if self.processing_type in PYFAI_PROCESSES:
process_group = self.create_h5_group(
h5_group_path=self.nxprocess_path,
NX_class="NXprocess",
default=self.nxdata_path,
)
process_group["date"] = str(get_isotime())
process_group["processing_type"] = self.processing_type
process_group["program"] = "pyFAI"
process_group["version"] = pyFAIVersion
elif self.processing_type in TRUSAXS_PROCESSES:
process_group = self.create_h5_group(
h5_group_path=self.nxprocess_path,
NX_class="NXinstrument",
)
process_group["date"] = str(get_isotime())
process_group["processing_type"] = self.processing_type
process_group["program"] = "TruSAXS"
def _build_MCS_group(self, headers: dict) -> None:
mcs_group = self.create_h5_group(
h5_group_path=self.mcs_path,
NX_class="NXcollection",
)
mcs_group["device"] = "bliss"
if not headers:
logger.error(f"There is no headers to initialize MCS on {self.filename}")
return
# HS32 N(name), Z(zero), F(factor) arrays
nb_pins = HS32.get_HS32_number_pins(headers=self.headers)
HS32N_array = numpy.array(
[
self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}")
for index_pin in range(nb_pins)
],
dtype=self.FORMAT_HS32N,
)
HS32Z_array = numpy.array(
[
(self.headers.get(f"{HS32.KEY_HS32_ZERO_VALUE}{(index_pin + 1):02}"))
for index_pin in range(nb_pins)
],
dtype=self.FORMAT_HS32Z,
)
HS32F_array = numpy.array(
[
self.headers.get(f"{HS32.KEY_HS32_FACTOR}{(index_pin + 1):02}")
for index_pin in range(nb_pins)
],
dtype=self.FORMAT_HS32F,
)
mcs_group.create_dataset(
name=HS32.KEY_HS32_NAME,
data=HS32N_array,
dtype=self.FORMAT_HS32N,
)
mcs_group.create_dataset(
name=HS32.KEY_HS32_ZERO_VALUE,
data=HS32Z_array,
dtype=self.FORMAT_HS32Z,
)
mcs_group.create_dataset(
name=HS32.KEY_HS32_FACTOR,
data=HS32F_array,
dtype=self.FORMAT_HS32F,
)
# HSI0Factor, HSI1Factor
for key in [
HS32.KEY_MONITOR_0_FACTOR,
HS32.KEY_MONITOR_1_FACTOR,
]:
value = headers.get(
key,
)
if value is not None:
mcs_group.create_dataset(name=key, data=value, dtype="float64")
# HSI0, HSI1, HSTime
for key in [
HS32.KEY_MONITOR_0,
HS32.KEY_MONITOR_1,
HS32.KEY_EXPOSURE_TIME,
]:
pin_name = headers.get(key)
for index_pin in range(nb_pins):
if (
self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}")
== pin_name
):
mcs_group.create_dataset(
name=key, data=index_pin + 1, dtype="int64"
)
break
# ShutterTime
for key in [HS32.KEY_SOT, HS32.KEY_SCT]:
value = headers.get(key)
if value is None:
value = 0.0
mcs_group.create_dataset(name=key, data=value, dtype="float64")
def _build_parameters_group(self, processing_parameters: dict = None) -> None:
self.create_h5_group(
h5_group_path=self.parameters_path,
NX_class="NXcollection",
)
if processing_parameters:
self.write_processing_parameters(
processing_parameters=processing_parameters
)
[docs]
def write_processing_parameters(self, processing_parameters: dict) -> None:
# After version 0.2.0, the processing parameters will be overwritten by the ones explicitly used (not the header ones)
for key, value in processing_parameters.items():
if isinstance(value, numpy.ndarray):
if key in self.parameters_group:
dset = self.parameters_group[key]
nb_written_frames = len(dset)
dset.resize((nb_written_frames + len(value),))
dset[nb_written_frames:] = value
else:
dset = self.parameters_group.create_dataset(
name=key,
maxshape=(None,),
data=value,
dtype=value.dtype,
)
else:
if key in KEYS_FLOAT:
dtype = "float64"
elif key in KEYS_INT:
dtype = "int64"
elif key == HS32.KEY_TITLEEXTENSION:
continue
else:
dtype = self.FORMAT_STRING
value = str(value)
if key in self.parameters_group:
dset = self.parameters_group[key]
else:
dset = self.parameters_group.create_dataset(
name=key,
data=value,
dtype=dtype,
)
dset[()] = value
[docs]
def update_dataset(
self,
added_dataset: numpy.ndarray,
h5_dataset_path: str,
index_read: tuple = None,
datatype: str = None,
**kwargs,
) -> None:
"""
Update a dataset in a HDF5 file with new data.
It will create the dataset if it does not exist.
added_dataset: numpy.ndarray - Array with the new data. It has to contain an additional dimension to the data
h5_group: h5py.Group - Group in the HDF5 file where the dataset is located
h5_dataset_name: str - Name of the dataset in h5_group
kwargs: dict - Additional arguments to add as attributes in the dataset
"""
if not isinstance(added_dataset, numpy.ndarray):
logger.error(f"Added dataset is not a numpy array. {type(added_dataset)}")
return
nb_new_frames = len(added_dataset)
if nb_new_frames == 0:
return
if index_read is None:
slice_init = 0
slice_end = nb_new_frames
else:
slice_init, slice_end = index_read
logger.debug(
f"Updating dataset {h5_dataset_path} with {nb_new_frames} new frames"
)
ndim = added_dataset.ndim
if h5_dataset_path not in self.root:
if ndim == 3:
interpretation = "image"
dtype = datatype
compression = self.COMPRESSION_ALGORITHM
chunks = self.CHUNK_SIZE_3D
elif ndim == 2:
interpretation = "spectrum"
dtype = datatype or "float64"
compression = None
chunks = None
elif ndim == 1:
interpretation = "scalar"
dtype = datatype or added_dataset.dtype
compression = None
chunks = None
if added_dataset.dtype.kind == "U":
dtype = self.FORMAT_STRING
added_dataset = added_dataset.astype(self.FORMAT_STRING)
dset = self.root.create_dataset(
name=h5_dataset_path,
shape=(0,) + added_dataset.shape[1:ndim],
maxshape=(None,) + added_dataset.shape[1:ndim],
chunks=chunks,
dtype=dtype,
compression=compression,
)
dset.attrs["interpretation"] = interpretation
for key, value in kwargs.items():
dset.attrs[key] = value
else:
dset = self[h5_dataset_path]
# The added dataset can be:
# - New frames to append between index_range_last limits (normal loop procedure)
# - Rewrite the whole dataset, posibly with more frames (subscan2 interpolation)
dset_current_nb_frames = len(dset)
if slice_end <= dset_current_nb_frames:
# No resize is needed
...
else:
dset_new_shape = (slice_end, *dset.shape[1:ndim])
dset.resize(dset_new_shape)
try:
dset[slice_init:slice_end] = added_dataset
logger.debug(
f"Dataset {h5_dataset_path} updated with {len(added_dataset)} frames"
)
except Exception as e:
logger.error(f"""{e}: Failed while saving {h5_dataset_path}. \
Incoming dataset shape: {added_dataset.shape}, \
hdf5 dset shape={dset.shape}, {added_dataset.shape=} \
limits: {slice_init} -> {slice_end}
""")
def _build_TFG_group(self, headers: dict) -> None:
tfg_group = self.create_h5_group(
h5_group_path=self.tfg_path,
NX_class="NXcollection",
)
tfg_group["device"] = "bliss"
if not headers:
logger.error(f"There is no headers to initialize MCS on {self.filename}")
return
for key in [HS32.KEY_STARTEPOCH, HS32.KEY_STARTTIME]:
value = headers.get(key)
if value is not None:
tfg_group.create_dataset(
name=key,
data=str(value),
dtype=self.FORMAT_STRING,
)
else:
logger.warning(f"Key {key} not found in headers")
def _build_ewoks_groups(self) -> None:
self.create_h5_group(
h5_group_path=self.ewoks_path,
NX_class="NXcollection",
)
self.create_h5_group(
h5_group_path=self.ewoks_history_path,
NX_class="NXcollection",
)
def _write_ewoks_info_last_process(self, ewoks_info: dict):
# This methods appens the current info the history and links it to last_process
# Index and title have been already fixed
self._append_ewoks_info(ewoks_info=ewoks_info)
self.ewoks_group[self.NXCOLLECTION_EWOKS_LASTPROCESS_NAME] = h5py.SoftLink(
f"{self.NXCOLLECTION_EWOKS_HISTORY_NAME}/{ewoks_info['title']}"
)
[docs]
def write_ewoks_history(self, list_ewoks_history: list):
# This method only writes the previous history
for ewoks_info in list_ewoks_history:
self._append_ewoks_info(ewoks_info=ewoks_info)
if list_ewoks_history:
self[self.ewoks_last_process_path] = h5py.SoftLink(
f"{self.ewoks_history_path}/{ewoks_info['title']}"
)
def _append_ewoks_info(self, ewoks_info: dict):
# Only add a new group, index has been already fixed
ewoks_info_group = self.create_h5_group(
h5_group_path=str(Path(self.ewoks_history_path) / ewoks_info["title"]),
NX_class="NXcollection",
)
deserialize_h5py_task(
h5dict=ewoks_info,
h5py_parent=ewoks_info_group,
)
[docs]
def read_ewoks_history(self) -> list:
info_history_complete = []
nb_previous_steps = [_ for _ in self.ewoks_history_group]
if nb_previous_steps == 0:
return info_history_complete
for process_name in self.ewoks_history_group:
historic_task_serialized = serialize_h5py_task(
h5py_group=self.ewoks_history_group[process_name]
)
info_history_complete.append(historic_task_serialized)
return info_history_complete
[docs]
def create_h5_group(
self,
h5_group_path: str,
title: str = None,
**kwargs,
) -> h5py.Group:
"""
Unified method to create a group in a HDF5 file with additional attributes.
h5_parent_group: h5py.Group - Parent group where the new group will be created
h5_group_name: str - Name of the new group
title: str - Title of the group
kwargs: dict - Additional arguments to add as attributes in the group
"""
if h5_group_path in self.root:
logger.warning(f"Group {h5_group_path} already exists")
return self.root[h5_group_path]
h5_group = self.root.create_group(h5_group_path)
if title:
h5_group["title"] = title
for key, value in kwargs.items():
h5_group.attrs[key] = value
return h5_group
def _save_benchmark(
self,
bench,
):
benchmark_name = bench.benchmark_name
total_time = bench.bench_total_s
time_per_frame = bench.bench_per_frame_ms
nb_frames = bench.nb_frames
if self.ewoks_last_process_path not in self.root:
return
title = self.root[self.ewoks_last_process_path]["title"][()].decode()
h5path_ewoks_benchmark = str(
Path(self.ewoks_history_path) / title / "benchmark"
)
if h5path_ewoks_benchmark not in self:
bench_grp = self.create_h5_group(
h5_group_path=h5path_ewoks_benchmark,
title="Benchmark",
NX_class="NXdata",
default=h5path_ewoks_benchmark,
signal="data",
# axes=["steps"],
axes=["index"],
)
steps = bench_grp.create_dataset(
name="steps",
shape=(0,),
maxshape=(None,),
dtype=self.FORMAT_STRING,
)
indexes = bench_grp.create_dataset(
name="index",
shape=(0,),
maxshape=(None,),
dtype="int32",
)
data = bench_grp.create_dataset(
name="data",
shape=(0,),
maxshape=(None,),
dtype="float32",
)
data.attrs.update(
{
"interpretation": "spectrum",
}
)
data_errors = bench_grp.create_dataset(
name="data_errors",
shape=(0,),
maxshape=(None,),
dtype="float32",
)
data_errors.attrs.update(
{
"interpretation": "spectrum",
}
)
else:
bench_grp = self[h5path_ewoks_benchmark]
steps = bench_grp["steps"]
indexes = bench_grp["index"]
data = bench_grp["data"]
data_errors = bench_grp["data_errors"]
h5path_ewoks_benchmark_step = str(
Path(self.ewoks_history_path) / title / "benchmark" / benchmark_name
)
if h5path_ewoks_benchmark_step not in self:
bench_grp_step = self.create_h5_group(
h5_group_path=h5path_ewoks_benchmark_step,
title=benchmark_name,
NX_class="NXdata",
default=h5path_ewoks_benchmark,
)
else:
bench_grp_step = self[h5path_ewoks_benchmark_step]
if f"{benchmark_name}_loop_nb" not in bench_grp_step:
loop_dset = bench_grp_step.create_dataset(
name=f"{benchmark_name}_loop_nb",
dtype="int32",
shape=(0,),
maxshape=(None,),
)
perframe_dset = bench_grp_step.create_dataset(
name=f"{benchmark_name}_per_frame",
dtype="float32",
shape=(0,),
maxshape=(None,),
)
perframe_mean_dset = bench_grp_step.create_dataset(
name=f"{benchmark_name}_per_frame_mean",
dtype="float32",
data=time_per_frame,
)
perframe_std_dset = bench_grp_step.create_dataset(
name=f"{benchmark_name}_per_frame_std",
dtype="float32",
data=0.0,
)
nbframes_dset = bench_grp_step.create_dataset(
name=f"{benchmark_name}_nb_frames",
dtype="int32",
shape=(0,),
maxshape=(None,),
)
total_dset = bench_grp_step.create_dataset(
name=f"{benchmark_name}_total",
dtype="float32",
shape=(0,),
maxshape=(None,),
)
accumulated_dset = bench_grp_step.create_dataset(
name=f"{benchmark_name}_accumulated",
dtype="float32",
shape=(0,),
maxshape=(None,),
)
steps.resize((steps.shape[0] + 1,))
steps[-1] = benchmark_name
indexes.resize((indexes.shape[0] + 1,))
indexes[-1] = len(indexes)
data.resize((data.shape[0] + 1,))
data_errors.resize((data_errors.shape[0] + 1,))
else:
loop_dset = bench_grp_step[f"{benchmark_name}_loop_nb"]
perframe_dset = bench_grp_step[f"{benchmark_name}_per_frame"]
perframe_mean_dset = bench_grp_step[f"{benchmark_name}_per_frame_mean"]
perframe_std_dset = bench_grp_step[f"{benchmark_name}_per_frame_std"]
nbframes_dset = bench_grp_step[f"{benchmark_name}_nb_frames"]
total_dset = bench_grp_step[f"{benchmark_name}_total"]
accumulated_dset = bench_grp_step[f"{benchmark_name}_accumulated"]
# Append new data
loop_dset.resize((loop_dset.shape[0] + 1,))
perframe_dset.resize((perframe_dset.shape[0] + 1,))
nbframes_dset.resize((nbframes_dset.shape[0] + 1,))
total_dset.resize((total_dset.shape[0] + 1,))
accumulated_dset.resize((accumulated_dset.shape[0] + 1,))
loop_dset[-1] = len(loop_dset)
perframe_dset[-1] = time_per_frame
nbframes_dset[-1] = nb_frames
total_dset[-1] = total_time
if len(accumulated_dset) == 1:
accumulated_dset[-1] = total_time
else:
accumulated_dset[-1] = accumulated_dset[-2] + total_time
mean_value = perframe_dset[:].mean()
std_value = perframe_dset[:].std()
perframe_mean_dset[()] = mean_value
perframe_std_dset[()] = std_value
step_names = [s.decode() for s in steps]
index_benchmark = next(
(i for i, s in enumerate(step_names) if s == benchmark_name), None
)
if index_benchmark is not None:
data[index_benchmark] = mean_value
data_errors[index_benchmark] = std_value
[docs]
def update_nexus_detector_group(
nxdetector_group_destination: h5py.Group = None,
nxdetector_group_source: h5py.Group = None,
):
for name, item in nxdetector_group_source.items():
if isinstance(item, h5py.Group):
# Recursively copy subgroups
new_subgroup = nxdetector_group_destination.create_group(name)
copy_group_excluding_dataset(item, new_subgroup, "data")
elif isinstance(item, h5py.Dataset):
if name != "data": # Skip the excluded dataset
nxdetector_group_source.copy(
name, nxdetector_group_destination, name=name
)
[docs]
def copy_group_excluding_dataset(src_group, dest_group, exclude_dataset):
for attr_name, attr_value in src_group.attrs.items():
dest_group.attrs[attr_name] = attr_value
for name, item in src_group.items():
if isinstance(item, h5py.Group):
# Recursively copy subgroups
new_subgroup = dest_group.create_group(name)
copy_group_excluding_dataset(item, new_subgroup, exclude_dataset)
elif isinstance(item, h5py.Dataset):
if name != exclude_dataset: # Skip the excluded dataset
src_group.copy(name, dest_group, name=name)