import logging
from pathlib import Path
from typing import Dict, List, Union
from ewokstools.submit import (
_print_number_of_jobs_to_submit,
_warning_dry_run_mode,
_warning_if_too_many_jobs,
save_and_execute,
wait_to_finish_queue,
)
from ewoksutils.task_utils import task_inputs
from ._utils import (
ID02_EXECUTION_PARAMETERS,
ID02_SLURM_POST_SCRIPT,
ID02_SLURM_PRE_SCRIPT,
ID02_WORKER_MODULE,
SLURM_JOB_PARAMETERS_SAXS,
_validate_yaml_template,
get_list_scan_info_from_bliss_filenames_id02,
)
from .resources import WORKFLOW_SAXS_LOOP
from .resources.schemas import schema_saxs
logger = logging.getLogger(__name__)
[docs]
def main_saxs(args):
for file in args.FILES:
if file.endswith((".yaml", ".yml")):
_main_saxs_from_template(filename=file)
elif file.endswith(".h5"):
kwargs = vars(args)
for key in ("FILES", "command"):
kwargs.pop(key, None)
_main_saxs_from_cli(filename=file, **kwargs)
else:
logger.warning(
f"File {file} has an unsupported extension. Skipping it. Supported extensions are .yaml, .yml and .h5."
)
def _main_saxs_from_template(filename: Union[str, Path]):
is_valid, params_validated = _validate_yaml_template(filename, schema_saxs)
if is_valid:
_main_saxs(**params_validated)
def _main_saxs_from_cli(filename: Union[str, Path], **kwargs): ...
def _main_saxs(
bliss_filenames: Union[str, List[str]],
detectors: Union[str, List[str]] = None,
scans: Union[int, List[int]] = None,
output_root_folder: str = None,
tag: str = "",
average_limits: str = None,
submit_parameters: Dict = None,
**kwargs,
):
dry_run = not submit_parameters.pop("submit", True)
list_scans_info = get_list_scan_info_from_bliss_filenames_id02(
bliss_filenames=bliss_filenames,
detectors=detectors,
scans=scans,
output_root_folder=output_root_folder,
**kwargs,
)
nb_total_jobs = len(list_scans_info)
_print_number_of_jobs_to_submit(nb_total_jobs)
if dry_run:
_warning_dry_run_mode()
else:
_warning_if_too_many_jobs(nb_total_jobs)
submitted_jobs = []
tag = tag or ""
if tag:
tag = f"_{tag}"
if dry_run:
tag = f"{tag}_dryrun"
for index_job, scan_info in enumerate(list_scans_info):
inputs_ewoks_saxs = [
{
"name": "filename_data",
"value": scan_info.filename_raw_dataset,
"all": True,
},
{
"name": "filename_lima",
"value": scan_info.filename_raw_scan,
"all": True,
},
{
"name": "lima_url_template",
"value": scan_info.lima_url_template,
"all": True,
},
{
"name": "lima_url_template_args",
"value": scan_info.lima_url_template_args,
"all": True,
},
{"name": "detector_name", "value": scan_info.detector_name, "all": True},
{"name": "scan_nb", "value": scan_info.scan_nb, "all": True},
{
"name": "datatype",
"value": kwargs.get("datatype", "float32"),
"all": True,
},
{
"name": "log_level",
"value": kwargs.get("log_level", "info"),
"all": True,
},
{
"name": "max_slice_size",
"value": kwargs.get("max_slice_size", 100),
"all": True,
},
]
inputs_ewoks_saxs += task_inputs(
id="2scat",
task_identifier="ewoksid02.tasks.secondaryscatteringtask.SecondaryScatteringTask",
inputs={
**{k: v for k, v in kwargs.pop("2scat", {}).items() if v is not None},
"processing_filename": scan_info.filename_processed_scan.replace(
".h5", f"{tag}_2scat.h5"
),
},
)
inputs_ewoks_saxs += task_inputs(
id="norm",
task_identifier="ewoksid02.tasks.normalizationtask.NormalizationTask",
inputs={
**{k: v for k, v in kwargs.pop("norm", {}).items() if v is not None},
"processing_filename": scan_info.filename_processed_scan.replace(
".h5", f"{tag}_norm.h5"
),
},
)
inputs_ewoks_saxs += task_inputs(
id="cave",
task_identifier="ewoksid02.tasks.cavingtask.CavingBeamstopTask",
inputs={
**{k: v for k, v in kwargs.pop("cave", {}).items() if v is not None},
"processing_filename": scan_info.filename_processed_scan.replace(
".h5", f"{tag}_cave.h5"
),
},
)
inputs_ewoks_saxs += task_inputs(
id="azim",
task_identifier="ewoksid02.tasks.azimuthaltask.AzimuthalTask",
inputs={
**{k: v for k, v in kwargs.pop("azim", {}).items() if v is not None},
"processing_filename": scan_info.filename_processed_scan.replace(
".h5", f"{tag}_azim.h5"
),
},
)
inputs_ewoks_saxs += task_inputs(
id="ave",
task_identifier="ewoksid02.tasks.averagetask.AverageTask",
inputs={
**{k: v for k, v in kwargs.pop("ave", {}).items() if v is not None},
"processing_filename": scan_info.filename_processed_scan.replace(
".h5", f"{tag}_ave.h5"
),
},
)
inputs_ewoks_saxs += task_inputs(
id="save_gallery",
task_identifier="ewoksid02.tasks.gallerytask.SaveGalleryTask",
inputs={
**{
k: v
for k, v in kwargs.pop("save_gallery", {}).items()
if v is not None
},
"nxdata_url": f"{scan_info.filename_processed_scan.replace('.h5', f'{tag}_ave.h5')}::/entry_0000/PyFAI/result_ave",
},
)
submit_parameters.update(
{
"worker_module": submit_parameters.get(
"worker_module", ID02_WORKER_MODULE
),
"pre_script_python": submit_parameters.get(
"pre_script_python", ID02_SLURM_PRE_SCRIPT
),
"post_script_python": submit_parameters.get(
"post_script_python", ID02_SLURM_POST_SCRIPT
),
"execution_kwargs": ID02_EXECUTION_PARAMETERS,
}
)
directory_workflows = (
Path(scan_info.filename_processed_scan).parent / "workflows"
)
directory_workflows.mkdir(exist_ok=True, parents=True)
name = Path(
scan_info.filename_processed_scan.replace(".h5", f"{tag}_saxs_ewoks.json")
).name
filename_destination = str((directory_workflows / name))
submitted = save_and_execute(
workflow=WORKFLOW_SAXS_LOOP,
inputs=inputs_ewoks_saxs,
destination_filename=filename_destination,
dry_run=dry_run,
execution_kwargs={
**ID02_EXECUTION_PARAMETERS,
**submit_parameters.pop("execution_kwargs", {}),
},
slurm_job_parameters={
**SLURM_JOB_PARAMETERS_SAXS,
**submit_parameters.pop("slurm_job_parameters", {}),
},
python_package="ewoksid02", # To point the logs to .ewoksid02
**submit_parameters,
)
submitted_jobs.append(submitted)
print(f"Submitted job {index_job} / {nb_total_jobs}.")
wait_to_finish_queue(submitted_jobs)