import h5py
import numpy
from ewoks import execute_graph
from silx.io.url import DataUrl
import logging
logger = logging.getLogger(__name__)
[docs]
def check_h5groups_equivalent(
url_reference,
url_test,
):
dataurl_reference = DataUrl(url_reference)
dataurl_test = DataUrl(url_test)
with h5py.File(dataurl_reference.file_path(), "r") as file_reference:
with h5py.File(dataurl_test.file_path(), "r") as file_test:
reference_group = file_reference[dataurl_reference.data_path()]
test_group = file_test[dataurl_test.data_path()]
def check_equivalence(name_reference_item, h5item_reference):
if "TitleExtension" in name_reference_item:
# Only the ewoks files before ~October25 do not contain TitleExtension
# After that, the header includes TitleExtensionTemplate #TODO add the option TitleExtensionTemplate
return
if "interpreted" in h5item_reference.name:
if name_reference_item == "epoch":
# ewoksid02 does not generate epoch group, it's artificial
return
# When it's an offline processing, the counter names in interpreted group cannot be rebuilt fully because this information is not in the RAW_DATA file (only in blissdata)
if name_reference_item not in test_group:
logger.warning(
f"{name_reference_item} not in {dataurl_test.path()}, but could have a different name"
)
return
assert (
name_reference_item in test_group
), f"{name_reference_item} not in {dataurl_test.path()}"
logger.debug(f"{name_reference_item} found in {test_group}")
if isinstance(h5item_reference, h5py.Dataset):
check_equivalence_datasets(
dataset_test=test_group[name_reference_item],
dataset_reference=h5item_reference,
)
def check_equivalence_datasets(
dataset_test: h5py.Dataset, dataset_reference: h5py.Dataset
):
dataset_reference_name = dataset_reference.name # noqa
dataset_test_name = dataset_test.name # noqa
data_reference = dataset_reference[()]
data_test = dataset_test[()]
if isinstance(data_reference, numpy.ndarray):
if dataurl_reference.data_slice():
data_reference = data_reference[
dataurl_reference.data_slice()[
0
] : dataurl_reference.data_slice()[1]
]
if dataurl_test.data_slice():
data_test = data_test[
dataurl_test.data_slice()[0] : dataurl_test.data_slice()[1]
]
if any(_ in dataset_test.name for _ in ("HS32C", "HS32V")):
# Because in ewoksid02, we save also aux1, aux2 counters
data_test = numpy.concatenate(
(data_test[:, 0:10], data_test[:, 12:]), axis=1
)
data_reference = numpy.concatenate(
(data_reference[:, 0:10], data_reference[:, 12:]), axis=1
)
if "HS32N" in dataset_test.name:
# HS32N is the only array (so far) made of string elements
data_test = numpy.array([_.decode() for _ in data_test])
data_reference = numpy.array(
[_.decode() for _ in data_reference]
)
for i, j in zip(data_test, data_reference):
assert i == j
return
assert numpy.allclose(
data_test, data_reference, equal_nan=True
), f"{dataset_reference.name} is not equivalent"
elif isinstance(data_reference, bytes):
data_reference = data_reference.decode()
data_test = data_test.decode()
try:
data_test = float(data_test)
data_reference = float(data_reference)
if numpy.isnan(data_reference) and numpy.isnan(data_test):
return
except Exception:
pass
assert (
data_reference == data_test
), f"{dataset_reference.name} is not equivalent"
else:
assert (
data_reference == data_test
), f"{dataset_reference.name} is not equivalent"
logger.debug(f"{dataset_reference.name} are equivalent")
if isinstance(reference_group, h5py.Dataset) and isinstance(
test_group, h5py.Dataset
):
check_equivalence_datasets(
dataset_test=test_group,
dataset_reference=reference_group,
)
elif isinstance(reference_group, h5py.Group):
reference_group.visititems(func=check_equivalence)
[docs]
def check_h5groups_common(
filename_reference: str,
filename_test: str,
detector_name: str = "eiger2",
):
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/TFG/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/TFG/",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/parameters/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/parameters/",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/{detector_name}",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/{detector_name}",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/ExposureTime/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/ExposureTime",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/HS32C/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/HS32C",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/HS32F",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/HS32F",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/HS32N",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/HS32N",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/HS32V/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/HS32V",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/HS32Z",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/HS32Z",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/Intensity0ShutCor/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/Intensity0ShutCor",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/Intensity0UnCor/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/Intensity0UnCor",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/Intensity1ShutCor/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/Intensity1ShutCor",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/Intensity1UnCor/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/Intensity1UnCor",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/raw/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/raw/&slice=0,2",
)
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/interpreted/&slice=0,2",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/interpreted",
)
for grp in (
"HSI0",
"HSI1",
"HSI0Factor",
"HSI1Factor",
"HSTime",
"ShutterClosingTime",
"ShutterOpeningTime",
):
check_h5groups_equivalent(
url_reference=f"silx://{filename_reference}?path=/entry_0000/PyFAI/MCS/{grp}",
url_test=f"silx://{filename_test}?path=/entry_0000/PyFAI/MCS/{grp}",
)
[docs]
def check_allclose(
url_reference,
url_test,
):
dataurl_reference = DataUrl(url_reference)
dataurl_test = DataUrl(url_test)
with h5py.File(dataurl_reference.file_path(), "r") as file_reference:
with h5py.File(dataurl_test.file_path(), "r") as file_tested:
data_reference = file_reference[dataurl_reference.data_path()][()]
data_test = file_tested[dataurl_test.data_path()][()]
if dataurl_reference.data_slice():
data_reference = data_reference[
dataurl_reference.data_slice()[0] : dataurl_reference.data_slice()[
1
]
]
if dataurl_test.data_slice():
data_test = data_test[
dataurl_test.data_slice()[0] : dataurl_test.data_slice()[1]
]
assert numpy.allclose(data_reference, data_test, equal_nan=True)
[docs]
def execute_ewoks(
graph,
inputs,
engine="ppf",
pool_type="thread",
):
"""
Execute a graph with the given inputs and return the result.
"""
result = execute_graph(
graph=graph,
inputs=inputs,
engine=engine,
pool_type=pool_type,
)
return result