from ewoksid02.tasks.averagetask import AverageTask
from .utils import check_h5groups_common, check_h5groups_equivalent, execute_ewoks
[docs]
def test_eiger2_ave_full(
inputs_task_generic,
inputs_task_ave,
tmp_path,
dataset_signal_azim_new,
dataset_sigma_azim_new,
dataset_sumsignal_azim_new,
dataset_sumnorm_azim_new,
dataset_sumvariance_azim_new,
dataset_radial_array,
dataset_azimuthal_array,
filename_processed_ave_full,
):
processing_filename = str(tmp_path / "id02test_eiger2_ave.h5")
inputs = {
**inputs_task_generic,
**inputs_task_ave,
"processing_filename": processing_filename,
"dataset_signal": dataset_signal_azim_new,
"dataset_sigma": dataset_sigma_azim_new,
"dataset_sum_signal": dataset_sumsignal_azim_new,
"dataset_sum_normalization": dataset_sumnorm_azim_new,
"dataset_sum_variance": dataset_sumvariance_azim_new,
"radial_array": dataset_radial_array,
"azimuth_array": dataset_azimuthal_array,
}
task_azim = AverageTask(inputs)
task_azim.run()
check_h5groups_equivalent(
url_reference=f"silx://{filename_processed_ave_full}?path=/entry_0000/PyFAI/result_ave/",
url_test=f"silx://{processing_filename}?path=/entry_0000/PyFAI/result_ave/",
)
check_h5groups_common(
filename_reference=filename_processed_ave_full,
filename_test=processing_filename,
)
[docs]
def test_eiger2_ave_workflow(
workflow_norm_2scat_cave_azim_ave,
inputs_task_generic,
inputs_task_norm,
inputs_task_2scat,
inputs_task_cave,
inputs_task_azim,
inputs_task_ave,
tmp_path,
filename_processed_norm_reference,
filename_processed_2scat_full,
filename_processed_cave_full,
filename_processed_azim_full,
filename_processed_ave_full,
):
inputs = []
for key, value in inputs_task_generic.items():
inputs.append({"name": key, "value": value, "all": True})
for key, value in inputs_task_norm.items():
inputs.append({"name": key, "value": value, "id": "norm"})
for key, value in inputs_task_2scat.items():
inputs.append({"name": key, "value": value, "id": "2scat"})
for key, value in inputs_task_cave.items():
inputs.append({"name": key, "value": value, "id": "cave"})
for key, value in inputs_task_azim.items():
inputs.append({"name": key, "value": value, "id": "azim"})
for key, value in inputs_task_ave.items():
inputs.append({"name": key, "value": value, "id": "ave"})
filename_processing_norm = str(tmp_path / "id02test_eiger2_norm.h5")
filename_processing_2scat = str(tmp_path / "id02test_eiger2_2scat.h5")
filename_processing_cave = str(tmp_path / "id02test_eiger2_cave.h5")
filename_processing_azim = str(tmp_path / "id02test_eiger2_azim.h5")
filename_processing_ave = str(tmp_path / "id02test_eiger2_ave.h5")
for name, value, id_ in [
("processing_filename", filename_processing_norm, "norm"),
("processing_filename", filename_processing_2scat, "2scat"),
("processing_filename", filename_processing_cave, "cave"),
("processing_filename", filename_processing_azim, "azim"),
("processing_filename", filename_processing_ave, "ave"),
]:
inputs.append({"name": name, "value": value, "id": id_})
_ = execute_ewoks(
graph=workflow_norm_2scat_cave_azim_ave,
inputs=inputs,
)
for ref_file, test_file, proc_type in [
(filename_processed_norm_reference, filename_processing_norm, "norm"),
(filename_processed_2scat_full, filename_processing_2scat, "2scat"),
(filename_processed_cave_full, filename_processing_cave, "cave"),
(filename_processed_azim_full, filename_processing_azim, "azim"),
(filename_processed_ave_full, filename_processing_ave, "ave"),
]:
check_h5groups_equivalent(
url_reference=f"silx://{ref_file}?path=/entry_0000/PyFAI/result_{proc_type}",
url_test=f"silx://{test_file}?path=/entry_0000/PyFAI/result_{proc_type}",
)
check_h5groups_common(
filename_reference=ref_file,
filename_test=test_file,
)