from ewoksid02.tasks.cavingtask import CavingBeamstopTask
from .utils import check_h5groups_common, check_h5groups_equivalent, execute_ewoks
[docs]
def test_eiger2_cave_numpy(
inputs_task_generic,
inputs_task_cave,
tmp_path,
dataset_signal_2scat,
dataset_sigma_2scat,
filename_processed_cave_full,
):
processing_filename_cave_numpy = str(tmp_path / "id02test_eiger2_cave.h5")
inputs_cave_numpy = {
**inputs_task_generic,
**inputs_task_cave,
"processing_filename": processing_filename_cave_numpy,
"dataset_signal": dataset_signal_2scat,
"dataset_sigma": dataset_sigma_2scat,
}
task_cave_numpy = CavingBeamstopTask(inputs_cave_numpy)
task_cave_numpy.run()
check_h5groups_equivalent(
url_reference=f"silx://{filename_processed_cave_full}?path=/entry_0000/PyFAI/result_cave/",
url_test=f"silx://{processing_filename_cave_numpy}?path=/entry_0000/PyFAI/result_cave/",
)
check_h5groups_common(
filename_reference=filename_processed_cave_full,
filename_test=processing_filename_cave_numpy,
)
[docs]
def test_eiger2_cave_cupy(
cupy_available,
inputs_task_generic,
inputs_task_cave,
tmp_path,
dataset_signal_2scat,
dataset_sigma_2scat,
filename_processed_cave_full,
):
if not cupy_available:
return
processing_filename_cave_cupy = str(tmp_path / "id02test_eiger2_cave.h5")
inputs_cave_cupy = {
**inputs_task_generic,
**inputs_task_cave,
"processing_filename": processing_filename_cave_cupy,
"dataset_signal": dataset_signal_2scat,
"dataset_sigma": dataset_sigma_2scat,
}
inputs_cave_cupy["algorithm"] = "cupy"
task_cave_cupy = CavingBeamstopTask(inputs_cave_cupy)
task_cave_cupy.run()
check_h5groups_equivalent(
url_reference=f"silx://{filename_processed_cave_full}?path=/entry_0000/PyFAI/result_cave/",
url_test=f"silx://{processing_filename_cave_cupy}?path=/entry_0000/PyFAI/result_cave/",
)
check_h5groups_common(
filename_reference=filename_processed_cave_full,
filename_test=processing_filename_cave_cupy,
)
[docs]
def test_eiger2_cave_workflow(
workflow_norm_2scat_cave,
inputs_task_generic,
inputs_task_norm,
inputs_task_2scat,
inputs_task_cave,
tmp_path,
filename_processed_norm_reference,
filename_processed_2scat_full,
filename_processed_cave_full,
):
inputs = []
for key, value in inputs_task_generic.items():
inputs.append({"name": key, "value": value, "all": True})
for key, value in inputs_task_norm.items():
inputs.append({"name": key, "value": value, "id": "norm"})
for key, value in inputs_task_2scat.items():
inputs.append({"name": key, "value": value, "id": "2scat"})
for key, value in inputs_task_cave.items():
inputs.append({"name": key, "value": value, "id": "cave"})
filename_processing_norm = str(tmp_path / "id02test_eiger2_norm.h5")
filename_processing_2scat = str(tmp_path / "id02test_eiger2_2scat.h5")
filename_processing_cave = str(tmp_path / "id02test_eiger2_cave.h5")
for name, value, id_ in [
("processing_filename", filename_processing_norm, "norm"),
("processing_filename", filename_processing_2scat, "2scat"),
("processing_filename", filename_processing_cave, "cave"),
]:
inputs.append({"name": name, "value": value, "id": id_})
_ = execute_ewoks(
graph=workflow_norm_2scat_cave,
inputs=inputs,
)
for ref_file, test_file, proc_type in [
(filename_processed_norm_reference, filename_processing_norm, "norm"),
(filename_processed_2scat_full, filename_processing_2scat, "2scat"),
(filename_processed_cave_full, filename_processing_cave, "cave"),
]:
check_h5groups_equivalent(
url_reference=f"silx://{ref_file}?path=/entry_0000/PyFAI/result_{proc_type}",
url_test=f"silx://{test_file}?path=/entry_0000/PyFAI/result_{proc_type}",
)
check_h5groups_common(
filename_reference=ref_file,
filename_test=test_file,
)