Source code for ewoksid02.scripts.saxs

import logging
from pathlib import Path
from typing import Dict, List, Union

from ewokstools.submit import (
    _print_number_of_jobs_to_submit,
    _warning_dry_run_mode,
    _warning_if_too_many_jobs,
    save_and_execute,
    wait_to_finish_queue,
)
from ewoksutils.task_utils import task_inputs

from ._utils import (
    ID02_EXECUTION_PARAMETERS,
    ID02_SLURM_POST_SCRIPT,
    ID02_SLURM_PRE_SCRIPT,
    ID02_WORKER_MODULE,
    SLURM_JOB_PARAMETERS_SAXS,
    _validate_yaml_template,
    get_list_scan_info_from_bliss_filenames_id02,
)
from .resources import WORKFLOW_SAXS_LOOP
from .resources.schemas import schema_saxs

logger = logging.getLogger(__name__)


[docs] def main_saxs(args): for file in args.FILES: if file.endswith((".yaml", ".yml")): _main_saxs_from_template(filename=file) elif file.endswith(".h5"): kwargs = vars(args) for key in ("FILES", "command"): kwargs.pop(key, None) _main_saxs_from_cli(filename=file, **kwargs) else: logger.warning( f"File {file} has an unsupported extension. Skipping it. Supported extensions are .yaml, .yml and .h5." )
def _main_saxs_from_template(filename: Union[str, Path]): is_valid, params_validated = _validate_yaml_template(filename, schema_saxs) if is_valid: _main_saxs(**params_validated) def _main_saxs_from_cli(filename: Union[str, Path], **kwargs): ... def _main_saxs( bliss_filenames: Union[str, List[str]], detectors: Union[str, List[str]] = None, scans: Union[int, List[int]] = None, output_root_folder: str = None, tag: str = "", average_limits: str = None, submit_parameters: Dict = None, **kwargs, ): dry_run = not submit_parameters.pop("submit", True) list_scans_info = get_list_scan_info_from_bliss_filenames_id02( bliss_filenames=bliss_filenames, detectors=detectors, scans=scans, output_root_folder=output_root_folder, **kwargs, ) nb_total_jobs = len(list_scans_info) _print_number_of_jobs_to_submit(nb_total_jobs) if dry_run: _warning_dry_run_mode() else: _warning_if_too_many_jobs(nb_total_jobs) submitted_jobs = [] tag = tag or "" if tag: tag = f"_{tag}" if dry_run: tag = f"{tag}_dryrun" for index_job, scan_info in enumerate(list_scans_info): inputs_ewoks_saxs = [ { "name": "filename_data", "value": scan_info.filename_raw_dataset, "all": True, }, { "name": "filename_lima", "value": scan_info.filename_raw_scan, "all": True, }, { "name": "lima_url_template", "value": scan_info.lima_url_template, "all": True, }, { "name": "lima_url_template_args", "value": scan_info.lima_url_template_args, "all": True, }, {"name": "detector_name", "value": scan_info.detector_name, "all": True}, {"name": "scan_nb", "value": scan_info.scan_nb, "all": True}, { "name": "datatype", "value": kwargs.get("datatype", "float32"), "all": True, }, { "name": "log_level", "value": kwargs.get("log_level", "info"), "all": True, }, { "name": "max_slice_size", "value": kwargs.get("max_slice_size", 100), "all": True, }, ] inputs_ewoks_saxs += task_inputs( id="2scat", task_identifier="ewoksid02.tasks.secondaryscatteringtask.SecondaryScatteringTask", inputs={ **{k: v for k, v in kwargs.pop("2scat", {}).items() if v is not None}, "processing_filename": scan_info.filename_processed_scan.replace( ".h5", f"{tag}_2scat.h5" ), }, ) inputs_ewoks_saxs += task_inputs( id="norm", task_identifier="ewoksid02.tasks.normalizationtask.NormalizationTask", inputs={ **{k: v for k, v in kwargs.pop("norm", {}).items() if v is not None}, "processing_filename": scan_info.filename_processed_scan.replace( ".h5", f"{tag}_norm.h5" ), }, ) inputs_ewoks_saxs += task_inputs( id="cave", task_identifier="ewoksid02.tasks.cavingtask.CavingBeamstopTask", inputs={ **{k: v for k, v in kwargs.pop("cave", {}).items() if v is not None}, "processing_filename": scan_info.filename_processed_scan.replace( ".h5", f"{tag}_cave.h5" ), }, ) inputs_ewoks_saxs += task_inputs( id="azim", task_identifier="ewoksid02.tasks.azimuthaltask.AzimuthalTask", inputs={ **{k: v for k, v in kwargs.pop("azim", {}).items() if v is not None}, "processing_filename": scan_info.filename_processed_scan.replace( ".h5", f"{tag}_azim.h5" ), }, ) inputs_ewoks_saxs += task_inputs( id="ave", task_identifier="ewoksid02.tasks.averagetask.AverageTask", inputs={ **{k: v for k, v in kwargs.pop("ave", {}).items() if v is not None}, "processing_filename": scan_info.filename_processed_scan.replace( ".h5", f"{tag}_ave.h5" ), }, ) inputs_ewoks_saxs += task_inputs( id="save_gallery", task_identifier="ewoksid02.tasks.gallerytask.SaveGalleryTask", inputs={ **{ k: v for k, v in kwargs.pop("save_gallery", {}).items() if v is not None }, "nxdata_url": f"{scan_info.filename_processed_scan.replace('.h5', f'{tag}_ave.h5')}::/entry_0000/PyFAI/result_ave", }, ) submit_parameters.update( { "worker_module": submit_parameters.get( "worker_module", ID02_WORKER_MODULE ), "pre_script_python": submit_parameters.get( "pre_script_python", ID02_SLURM_PRE_SCRIPT ), "post_script_python": submit_parameters.get( "post_script_python", ID02_SLURM_POST_SCRIPT ), "execution_kwargs": ID02_EXECUTION_PARAMETERS, } ) directory_workflows = ( Path(scan_info.filename_processed_scan).parent / "workflows" ) directory_workflows.mkdir(exist_ok=True, parents=True) name = Path( scan_info.filename_processed_scan.replace(".h5", f"{tag}_saxs_ewoks.json") ).name filename_destination = str((directory_workflows / name)) submitted = save_and_execute( workflow=WORKFLOW_SAXS_LOOP, inputs=inputs_ewoks_saxs, destination_filename=filename_destination, dry_run=dry_run, execution_kwargs={ **ID02_EXECUTION_PARAMETERS, **submit_parameters.pop("execution_kwargs", {}), }, slurm_job_parameters={ **SLURM_JOB_PARAMETERS_SAXS, **submit_parameters.pop("slurm_job_parameters", {}), }, python_package="ewoksid02", # To point the logs to .ewoksid02 **submit_parameters, ) submitted_jobs.append(submitted) print(f"Submitted job {index_job} / {nb_total_jobs}.") wait_to_finish_queue(submitted_jobs)