Source code for ewoksid02.tests.test_cave_node

from ewoksid02.tasks.cavingtask import CavingBeamstopTask

from .utils import check_h5groups_common, check_h5groups_equivalent, execute_ewoks


[docs] def test_eiger2_cave_numpy( inputs_task_generic, inputs_task_cave, tmp_path, dataset_signal_2scat, dataset_sigma_2scat, filename_processed_cave_full, ): processing_filename_cave_numpy = str(tmp_path / "id02test_eiger2_cave.h5") inputs_cave_numpy = { **inputs_task_generic, **inputs_task_cave, "processing_filename": processing_filename_cave_numpy, "dataset_signal": dataset_signal_2scat, "dataset_sigma": dataset_sigma_2scat, } task_cave_numpy = CavingBeamstopTask(inputs_cave_numpy) task_cave_numpy.run() check_h5groups_equivalent( url_reference=f"silx://{filename_processed_cave_full}?path=/entry_0000/PyFAI/result_cave/", url_test=f"silx://{processing_filename_cave_numpy}?path=/entry_0000/PyFAI/result_cave/", ) check_h5groups_common( filename_reference=filename_processed_cave_full, filename_test=processing_filename_cave_numpy, )
[docs] def test_eiger2_cave_cupy( cupy_available, inputs_task_generic, inputs_task_cave, tmp_path, dataset_signal_2scat, dataset_sigma_2scat, filename_processed_cave_full, ): if not cupy_available: return processing_filename_cave_cupy = str(tmp_path / "id02test_eiger2_cave.h5") inputs_cave_cupy = { **inputs_task_generic, **inputs_task_cave, "processing_filename": processing_filename_cave_cupy, "dataset_signal": dataset_signal_2scat, "dataset_sigma": dataset_sigma_2scat, } inputs_cave_cupy["algorithm"] = "cupy" task_cave_cupy = CavingBeamstopTask(inputs_cave_cupy) task_cave_cupy.run() check_h5groups_equivalent( url_reference=f"silx://{filename_processed_cave_full}?path=/entry_0000/PyFAI/result_cave/", url_test=f"silx://{processing_filename_cave_cupy}?path=/entry_0000/PyFAI/result_cave/", ) check_h5groups_common( filename_reference=filename_processed_cave_full, filename_test=processing_filename_cave_cupy, )
[docs] def test_eiger2_cave_workflow( workflow_norm_2scat_cave, inputs_task_generic, inputs_task_norm, inputs_task_2scat, inputs_task_cave, tmp_path, filename_processed_norm_reference, filename_processed_2scat_full, filename_processed_cave_full, ): inputs = [] for key, value in inputs_task_generic.items(): inputs.append({"name": key, "value": value, "all": True}) for key, value in inputs_task_norm.items(): inputs.append({"name": key, "value": value, "id": "norm"}) for key, value in inputs_task_2scat.items(): inputs.append({"name": key, "value": value, "id": "2scat"}) for key, value in inputs_task_cave.items(): inputs.append({"name": key, "value": value, "id": "cave"}) filename_processing_norm = str(tmp_path / "id02test_eiger2_norm.h5") filename_processing_2scat = str(tmp_path / "id02test_eiger2_2scat.h5") filename_processing_cave = str(tmp_path / "id02test_eiger2_cave.h5") for name, value, id_ in [ ("processing_filename", filename_processing_norm, "norm"), ("processing_filename", filename_processing_2scat, "2scat"), ("processing_filename", filename_processing_cave, "cave"), ]: inputs.append({"name": name, "value": value, "id": id_}) _ = execute_ewoks( graph=workflow_norm_2scat_cave, inputs=inputs, ) for ref_file, test_file, proc_type in [ (filename_processed_norm_reference, filename_processing_norm, "norm"), (filename_processed_2scat_full, filename_processing_2scat, "2scat"), (filename_processed_cave_full, filename_processing_cave, "cave"), ]: check_h5groups_equivalent( url_reference=f"silx://{ref_file}?path=/entry_0000/PyFAI/result_{proc_type}", url_test=f"silx://{test_file}?path=/entry_0000/PyFAI/result_{proc_type}", ) check_h5groups_common( filename_reference=ref_file, filename_test=test_file, )