ewoksid02.tasks.looptask.ID02ProcessingTask#

class ewoksid02.tasks.looptask.ID02ProcessingTask(inputs=None, varinfo=None, node_id=None, node_attrs=None, execinfo=None, profile_directory=None)[source]#

Bases: Task

The ID02LoopTask class is a base task designed to handle iterative data processing in the ID02 SAXS pipeline. It provides functionality for reading datasets, managing processing loops, and controlling the pipeline flow. This class is intended to be extended by more specific tasks, such as ID02ProcessingTask. This class could be seen also a Reading node.

Inputs:
  • detector_name (str): Name of the detector used for data acquisition. This is the only mandatory input.

Optional Inputs:
  • scan_memory_url (str): URL for accessing scan memory in online processing.

  • filename_data (str): Path to the dataset file (Master file, Nexus writer) for offline processing.

  • scan_nb (int): Scan number for identifying the dataset.

  • subscan (int): Subscan number for processing. Default is 1.

  • max_slice_size (int): Maximum number of frames to process in one iteration. Default is 20.

  • last_index_read (int): Index of the last frame read in the dataset. Default is 0.

  • range_index_read (list): Range of indices to read from the dataset. This parameter is not propagated to the next task.

  • loop_nb (int): Current loop iteration number. Default is 0.

  • dataset_signal (numpy.ndarray): Signal dataset to be processed.

  • dataset_variance (numpy.ndarray): Variance dataset to be processed.

  • dataset_sigma (numpy.ndarray): Sigma dataset to be processed.

  • reading_node (bool): Flag to indicate if the task should read data from the node.

  • lima_url_template (str): Format string to locate the Lima file and the path to the data inside that file.

  • lima_url_template_args (dict): Dictionary to format the lima_url_template.

  • beacon_host (str): Host and port to plug blissdata to the correct beacon server. Only for online processing.

  • log_level (str): Logging level for the task. Default is “warning”.

  • info (dict): Additional metadata to save.

  • info_history (dict): Additional metadata to propagate and save, creating a history of processing.

Outputs:
  • last_index_read (int): Updated index of the last frame read.

  • loop_nb (int): Updated loop iteration number.

  • dataset_signal (numpy.ndarray): Processed signal dataset.

  • dataset_variance (numpy.ndarray): Processed variance dataset.

  • dataset_sigma (numpy.ndarray): Processed sigma dataset.

  • continue_pipeline (bool): Flag to indicate whether the pipeline should continue.

  • info_history (dict): Additional metadata to propagate and save, creating a history of processing.

Usage:#

This class is intended to be used as part of a larger pipeline for SAXS data processing. It handles the reading and propagation of data: - If ‘reading_node’ is set to True, it will try to get data from Blissdata (online processing) or from a static file (offline processing). - If ‘reading_node’ is set to False (default) and it receives a dataset, it will just propagate the dataset to the next task. Since the SAXS pipeline is a loop, if reading_node to True, the task will become a kind of entry-exit valve for the pipeline.

type inputs:

Optional[Mapping]

param inputs:

type varinfo:

Optional[dict]

param varinfo:

type node_id:

Union[str, int, tuple, None]

param node_id:

type node_attrs:

Optional[dict]

param node_attrs:

type execinfo:

Optional[dict]

param execinfo:

type profile_directory:

Optional[dict]

param profile_directory:

MISSING_DATA = <MISSING_DATA>#
assert_ready_to_execute()#
cancel()#

Function called when a task is cancelled. To be implemented by the derived classes

property cancelled: bool#

Return True if the task has been cancelled by the user

classmethod class_nonce()#
classmethod class_nonce_data()#
classmethod class_registry_name()#
Return type:

Optional[str]

cleanup_references()#

Removes all references to the inputs. Side effect: fixes the uhash of the task and outputs

property done: bool#

Completed (with or without exception)

property exception: Exception | None#
execute(force_rerun=False, raise_on_error=True, cleanup_references=False)#
Parameters:
  • force_rerun (Optional[bool])

  • raise_on_error (Optional[bool])

  • cleanup_references (Optional[bool])

property failed: bool#

Completed with exception

fix_uhash()#

Fix the uhash when it is derived from the uhash data.

generate_streams()[source]#
get_datasets_from_bliss()[source]#

Get the dataset from the bliss scan memory.

get_datasets_from_static_file()[source]#

Get the dataset from the static file.

get_input_uhashes()#
get_input_value(key, default=<MISSING_DATA>)#
Parameters:

default (Any)

Return type:

Any

get_input_values()#
get_named_input_values()#
get_new_datasets()[source]#
get_output_transfer_data()#

The values are either DataUri or Variable

get_output_uhashes()#
get_output_value(key, default=<MISSING_DATA>)#
Parameters:

default (Any)

Return type:

Any

get_output_values()#
get_positional_input_values()#
get_scan()[source]#
classmethod get_subclass(registry_name, _second_attempt=False)#

Retrieving a derived class

classmethod get_subclass_names()#
Return type:

List[str]

classmethod get_subclasses()#
get_uhash_init(serialize=False)#
classmethod input_model()#
Return type:

Optional[Type[BaseInputModel]]

classmethod input_names()#
Return type:

Set[str]

property input_uhashes#
property input_values#

DEPRECATED

property input_variables: VariableContainer#
property inputs: ReadOnlyVariableContainerNamespace#
instance_nonce()#
classmethod instantiate(registry_name, **kw)#

Factory method for instantiating a derived class.

Parameters:
  • registry_name (str) – for example “tasklib.tasks.MyTask” or “MyTask”

  • **kwTask constructor arguments

  • registry_name

Returns Task:

property is_ready_to_execute#
property job_id: str | None#
property label: str#
log_allocated_memory()[source]#
log_benchmark(bench)[source]#
log_debug(msg)[source]#
log_error(msg)[source]#
log_info(msg)[source]#
log_warning(msg)[source]#
property missing_inputs: VariableContainerMissingNamespace#
property missing_outputs: VariableContainerMissingNamespace#
property n_positional_inputs: int#
classmethod n_required_positional_inputs()#
Return type:

int

property named_input_values#

DEPRECATED

property node_id: str | int | tuple#
property npositional_inputs#

DEPRECATED

classmethod optional_input_names()#
Return type:

Set[str]

property output_metadata: dict | None#
classmethod output_model()#
Return type:

Optional[Type[BaseOutputModel]]

classmethod output_names()#
Return type:

Set[str]

property output_transfer_data#

DEPRECATED

property output_uhashes#

DEPRECATED

property output_values#

DEPRECATED

property output_variables: VariableContainer#
property outputs: VariableContainerNamespace#
property positional_input_values#

DEPRECATED

classmethod required_input_names()#
Return type:

Set[str]

reset_state()#
run(processing_type)[source]#

To be implemented by the derived classes

set_log_level(log_level='warning')[source]#
set_uhash_init(pre_uhash=None, instance_nonce=None)#
Parameters:
  • pre_uhash (Union[str, bytes, UniversalHash, HasUhash, None])

  • instance_nonce (Optional[Any])

property succeeded: bool#

Completed without exception and with output values

property task_identifier: str#
property uhash: UniversalHash | None#
uhash_randomize()#
undo_fix_uhash()#
undo_randomize()#
property workflow_id: str | None#