Source code for ewoksid02.tasks.id02processingtask

import os
import gc
import threading
import socket
import time
from importlib.metadata import version
from contextlib import ExitStack
from ewokscore import Task
import psutil
from pathlib import Path
import h5py
import numpy
import logging
from ewokscore import missing_data
from ewoksid02.id02_format import (
    open_id02_file,
    ID02ProcessedFileHS32,
    ID02BlissMasterFile,
    ID02BlissLimaFile,
    update_nexus_detector_group,
)
from ewoksid02.headers import HS32
from ewoksid02.utils.io import (
    get_isotime,
    refactor_stream_name_raw,
    refactor_stream_name_interpreted,
    parse_titleextension_template,
    match_stream,
)

from ewoksid02.utils.blissdata import (
    LIMA_URL_TEMPLATE_ID02,
    load_scan,
    _slice_dataset_online,
    _slice_dataset_offline,
    _get_new_slice_limits,
    get_stream_names_subscan1,
    get_stream_names_subscan2,
)

lock = threading.Lock()

PYFAI_PROCESSES = ["norm", "gaps", "2scat", "cave", "azim", "ave", "caving"]
TRUSAXS_PROCESSES = ["scalers", "dispatch", "debug"]
ALL_PROCESSES = PYFAI_PROCESSES + TRUSAXS_PROCESSES

PROCESSING_TYPE_TASK = {
    "norm": "ewoksid02.tasks.normalizationtask.NormalizationTask",
    "gaps": "ewoksid02.tasks.cavingtask.CavingGapsTask",
    "2scat": "ewoksid02.tasks.secondaryscatteringtask.SecondaryScatteringTask",
    "caving": "ewoksid02.tasks.cavingtask.CavingTask",
    "gaps": "ewoksid02.tasks.cavingtask.CavingGapsTask",
    "cave": "ewoksid02.tasks.cavingtask.CavingBeamstopTask",
    "azim": "ewoksid02.tasks.azimuthaltask.AzimuthalTask",
    "ave": "ewoksid02.tasks.averagetask.AverageTask",
    "scalers": "ewoksid02.tasks.scalerstask.ScalersTask",
}

MAX_SLICE_SIZE = 50
LOG_LEVEL_DEFAULT = "info"
LIMA_INDEX_NUMBER_FORMAT_ID02 = "%02d"
MEM_USAGE_START = None

# Global logger at ewoksid02.tasks.id02processingtask
logger = logging.getLogger("ewoksid02")
logger.propagate = True


[docs] class Benchmark: """A context manager for benchmarking.""" def __init__( self, nb_frames, benchmark_name="processing", processing_type: str = "unknown" ): self.nb_frames = nb_frames self.benchmark_name = benchmark_name self.bench_total_s = 0.0 self.bench_per_frame_ms = 0.0 self.processing_type = processing_type def __enter__(self): logger.info(f"Start {self.benchmark_name} {self.processing_type}...") self.start = time.perf_counter() return self def __exit__(self, exc_type, exc_value, traceback): logger.info(f"Finished {self.benchmark_name} {self.processing_type}.") self.end = time.perf_counter() self.bench_total_s = self.end - self.start if self.nb_frames > 0: self.bench_per_frame_ms = self.bench_total_s / self.nb_frames * 1000 else: self.bench_per_frame_ms = 0
[docs] class ID02ProcessingTask( Task, optional_input_names=[ "detector_name", "scan_memory_url", "beacon_host", "reading_node", "filename_data", # Bliss master file for a dataset "filename_lima", "scan_nb", "headers", "dataset_signal", "dataset_variance", "dataset_sigma", "datatype", "lima_url_template", "lima_url_template_args", "log_level", "processing_filename", "processing_subtitle", "subtitle", "do_process", "do_save", "save_variance", "save_sigma", "save_metadata", "index_range", # Global range, do not propagate "index_range_last", # Dynamic range, propagate and change every loop "max_slice_size", "loop_nb", "info", "info_history", "gc_collect", "lima_index_number_format", "streams_subscan1", "streams_subscan2", ], output_names=[ "filename_data", "filename_lima", "detector_name", "scan_nb", "headers", "index_range_last", "loop_nb", "dataset_signal", "dataset_variance", "dataset_sigma", "continue_pipeline", "info_history", "streams_subscan1", "streams_subscan2", ], ): """This class contains processing support methods and saving methods in the ID02 SAXS pipeline. It extends the `ID02LoopTask` class and provides additional functionality for handling metadata, processing flags, and saving processed data to HDF5 files.This class is designed to be used as part of the ID02 pipeline.It does not contain a process method, that has to be implemented in the child class. Optional Inputs: - detector_name (str): Name of the detector used for data acquisition. This is the only mandatory input. - scan_memory_url (str): URL for accessing scan memory in online processing. - beacon_host (str): Host and port to plug blissdata to the correct beacon server. Only for online processing. - reading_node (bool): Flag to indicate if the task should read data from the node. - filename_data (str): Path to the dataset file (Master file, Nexus writer) for offline processing. - filename_lima (str): Path to the first Lima file, the only place where some detector metadata can be found. - scan_nb (int): Scan number for identifying the dataset. - headers (dict): Only for Online processing. Dictionary containing headers information. - max_slice_size (int): Maximum number of frames to process in one iteration. Default is `20`. - dataset_signal (numpy.ndarray): Signal dataset to be processed. - dataset_variance (numpy.ndarray): Variance dataset to be processed. - dataset_sigma (numpy.ndarray): Sigma dataset to be processed. - datatype (str): Datatype to be used to save the 2D data. Default and recommended is float32. - lima_url_template (str): Format string to locate the Lima file and the path to the data inside that file. - lima_url_template_args (dict): Dictionary to format the lima_url_template. - log_level (str): Logging level for the task. Default is `"warning"`. - processing_filename (str): Full path to the (new) output file. - processing_subtitle (str): Additional subtitle for the processing task. - subtitle (str): Subtitle for the processing task to be added to the output filename. - do_process (bool): Flag to enable or disable processing. Default is `True`. - do_save (bool): Flag to enable or disable saving of processed data. Default is `True`. - save_variance (bool): Flag to enable or disable saving of variance dataset. Default is `False`. - save_sigma (bool): Flag to enable or disable saving of sigma dataset. Default is `True`. - save_metadata (bool): Flag to enable or disable saving of metadata. Default is `True`. - last_index_read (int): Index of the last frame read in the dataset. Default is `0`. - range_index_read (list): Range of indices to read from the dataset. This parameter is not propagated to the next task. - loop_nb (int): Current loop iteration number. Default is `0`. - info (dict): Additional metadata to save. - info_history (dict): Additional metadata to propagate and save, creating a history of processing. - gc_collect (bool): Manually collect garbage at the end of every task. - lima_index_number_format (str): format to find the first Lima file (02%d by default) Outputs: - last_index_read (int): Updated index of the last frame read. - loop_nb (int): Updated loop iteration number. - dataset_signal (numpy.ndarray): Processed signal dataset. - dataset_variance (numpy.ndarray): Processed variance dataset. - dataset_sigma (numpy.ndarray): Processed sigma dataset. - continue_pipeline (bool): Flag to indicate whether the pipeline should continue. - info_history (dict): Additional metadata to propagate and save, creating a history of processing. """
[docs] def is_new_cycle(self): new_cycle = self.get_input_value("reading_node", False) if new_cycle is True: return new_cycle if self.get_input_value("loop_nb", 0) == 0: new_cycle = True return new_cycle
[docs] def is_new_workflow(self): if self.get_input_value("loop_nb", 0) == 0: return True return False
[docs] def is_first_loop(self): if self.get_input_value("loop_nb", 0) in (0, 1): return True return False
[docs] def open_id02_file(self, filename: str, mode: str = "r"): return open_id02_file( filename=filename, mode=mode, detector_name=self.detector_name, scan_nb=self.scan_nb, processing_type=self.processing_type, )
[docs] def create_id02_processed_file(self, filename: str): return open_id02_file( filename=filename, mode="w", detector_name=self.detector_name, scan_nb=self.scan_nb, processing_type=self.processing_type, headers=self.headers, )
[docs] def run(self): self.processing_type = next( ( k for k, v in PROCESSING_TYPE_TASK.items() if self.__class__.__name__ in v ), None, ) if not self.processing_type: raise RuntimeError( f"Please register the class {self.__class__.__name__} before using it" ) self._pid = os.getpid() self._process = psutil.Process() logger.setLevel(self.get_input_value("log_level", LOG_LEVEL_DEFAULT).upper()) self.processing_params = {} self.detector_name = self.get_input_value("detector_name", None) self.scan_memory_url = self.get_input_value("scan_memory_url", None) self.beacon_host = self.get_input_value( "beacon_host", os.environ.get("BEACON_HOST") ) self.filename_data = self.get_input_value("filename_data", None) self.filename_lima = self.get_input_value("filename_lima", None) self.scan_nb = self.get_input_value("scan_nb", None) self.max_slice_size = self.get_input_value("max_slice_size", MAX_SLICE_SIZE) self.index_range = self.get_input_value("index_range", None) self.index_range_last = self.get_input_value("index_range_last", None) self.processing_filename = self.get_input_value("processing_filename", None) self.info_history = self.get_input_value("info_history", []).copy() self.headers = self.get_input_value("headers", {}) self.outputs.continue_pipeline = True do_process = self.get_input_value("do_process", True) do_save = self.get_input_value("do_save", True) # Mark the beginning of a new cycle self.loop_nb = self.get_input_value("loop_nb", 0) if not isinstance(self.loop_nb, int) or ( isinstance(self.loop_nb, int) and self.loop_nb < 0 ): raise RuntimeError(f"Loop format is not compatible: {self.loop_nb}") new_cycle = self.is_new_cycle() new_workflow = self.is_new_workflow() first_loop = self.is_first_loop() if new_cycle: self.loop_nb += 1 self.log_info(f"New cycle: new loop index {self.loop_nb}") self.outputs.loop_nb = self.loop_nb # Check-point to avoid data overwritting if ( self.loop_nb == 1 and do_save and self.processing_filename and Path(self.processing_filename).is_file() ): self.log_error( f"The workflow was cancelled. {self.processing_filename} already exist. Choose another name!" ) self.outputs.continue_pipeline = False return # Check and load some parameters only if it's the first loop (and only in the first reading node) if new_workflow: # This will be check only during the execution of the first node in the first cycle if self.scan_memory_url: # We trust this is an online processing self.log_info( f"Beginning of the online workflow with key {self.scan_memory_url}. Loading some parameters..." ) if self.detector_name is None: raise ValueError("Online processing requires a detector_name") if not self.beacon_host: raise ValueError("Online processing requires a beacon_host") if not self.filename_data or not self.filename_lima or not self.scan_nb: scan = self.load_scan() scan_info = scan.info self.filename_data = self.filename_data or scan_info["filename"] self.scan_nb = self.scan_nb or scan_info["scan_nb"] lima_index_number_format = self.get_input_value( "lima_index_number_format", LIMA_INDEX_NUMBER_FORMAT_ID02 ) self.filename_lima = ( self.filename_lima or f"{scan_info['images_path'].format(img_acq_device=self.detector_name)}{lima_index_number_format % 0}.h5" ) elif self.filename_data: # We trusts this is an offline processing, either from RAW_DATA or PROCESSED_DATA self.log_info( f"Beginning of the offline workflow with file {self.filename_data}. Loading some parameters..." ) self.scan_memory_url = None self.beacon_host = None with self.open_id02_file( filename=self.filename_data, ) as file: if isinstance(file, ID02BlissLimaFile): raise RuntimeError( f"Offline processing cannot be run with a Lima format file {self.filename_data}" ) elif isinstance(file, ID02BlissMasterFile): # Offline processing from RAW_DATA master file if not self.scan_nb: raise RuntimeError( "Offline processing from RAW_DATA needs a scan number (scan_nb)" ) if not self.filename_lima and not self.detector_name: raise RuntimeError( "Offline processing from RAW_DATA needs either detector_name or filename_lima" ) if self.detector_name and not self.filename_lima: self.filename_lima = ( self.filename_lima or file.filename_lima ) elif self.filename_lima and not self.detector_name: with ID02BlissLimaFile( name=self.filename_lima, mode="r" ) as flima: self.detector_name = flima.detector_name elif isinstance(file, ID02ProcessedFileHS32): # Offline processing from PROCESSED_DATA file self.detector_name = self.detector_name or file.detector_name else: raise RuntimeError( "Processing needs either scan_memory_url or filename_data" ) # Retrieve ewoks history either from input or from the filename_data if it's PROCESSED_DATA if not self.info_history: # Try to read from self.filename_data is it's processed file with self.open_id02_file(filename=self.filename_data) as file: if isinstance(file, ID02ProcessedFileHS32): self.info_history = file.read_ewoks_history() if first_loop and do_process: # Every node contributes to ewoks history, only in the first loop if self.info_history: index = self.info_history[-1]["index"] + 1 else: index = 0 if self.processing_type in PYFAI_PROCESSES: # Avoid including scalers self.info_history = self.info_history + [ self._get_ewoks_info(index=index) ] # Load the headers into a HeadersHS32 instance if self.headers: # headers comes initially as an input only for online processing (from blissoda) self.outputs.headers = self.headers else: # Load the headers from a file (either Lima or processed file) filename_headers = ( self.filename_lima if self.filename_lima else self.filename_data ) with self.open_id02_file(filename=filename_headers) as file: if file.headers: # self.headers = HeadersHS32(headers_dict=file.headers) self.headers = file.headers self.outputs.headers = file.headers else: self.log_error(f"No headers can be loaded from {filename_headers}") self.outputs.filename_data = self.filename_data self.outputs.filename_lima = self.filename_lima self.outputs.detector_name = self.detector_name self.outputs.scan_nb = self.scan_nb self.outputs.info_history = self.info_history # Load and split the streams into subscan1 and 2 self.streams_subscan1 = self.get_input_value("streams_subscan1", {}) self.streams_subscan2 = self.get_input_value("streams_subscan2", {}) if new_workflow: reset_streams = True elif new_cycle and self.scan_memory_url: reset_streams = True else: reset_streams = False if reset_streams: self.log_info("Streams will be reset") if self.scan_memory_url: scan = self.load_scan() stream_names_subscan1 = get_stream_names_subscan1(scan=scan) stream_names_subscan2 = get_stream_names_subscan2(scan=scan) # Read the streams (TODO check if optimizing this is worth it) raw and interpreted names come later self.streams_subscan1 = { stream_name: scan.streams[stream_name] for stream_name in stream_names_subscan1 } self.streams_subscan2 = { stream_name: scan.streams[stream_name] for stream_name in stream_names_subscan2 } else: with self.open_id02_file( filename=self.filename_data, ) as file: if file.streams_subscan1_path not in file.root: raise RuntimeError( f"There is no group {file.streams_subscan1_path} in {self.filename_data}. No counters can be loaded." ) measurement_subscan1_grp = file.root[file.streams_subscan1_path] for dset_name in measurement_subscan1_grp: dset = measurement_subscan1_grp[dset_name] if dset.ndim == 3: # Skip the images continue self.streams_subscan1[dset_name] = dset[:] if file.streams_subscan2_path in file.root: measurement_subscan2_grp = file.root[file.streams_subscan2_path] for dset_name in measurement_subscan2_grp: dset = measurement_subscan2_grp[dset_name] if dset.ndim == 3: # Skip the images continue self.streams_subscan2[dset_name] = dset[:] self.log_info("Streams were reset.") self.outputs.streams_subscan1 = self.streams_subscan1 self.outputs.streams_subscan2 = self.streams_subscan2 # Load the big datasets, new from memory or file, or from inputs dataset_signal = self.get_input_value("dataset_signal", None) dataset_variance = self.get_input_value("dataset_variance", None) dataset_sigma = self.get_input_value("dataset_sigma", None) reading_node = self.get_input_value("reading_node", False) if reading_node or dataset_signal is None: ptdata = self._get_new_datasets() dataset_signal = ptdata["dataset_signal"] dataset_variance = ptdata["dataset_variance"] dataset_sigma = ptdata["dataset_sigma"] index_range_new = ptdata["index_range"] if len(dataset_signal) == 0: self.outputs.continue_pipeline = False self.outputs.index_range_last = self.index_range_last return # We define here the actual index limits because we read the data from streams self.index_range_last = index_range_new if ( dataset_signal is not None and len(dataset_signal) > 0 and self.index_range_last is None ): # Only possible if the data was sent as inputs without any index_range_last self.index_range_last = [0, len(dataset_signal)] # self.outputs.loop_nb = self.loop_nb self.outputs.index_range_last = self.index_range_last self.outputs.dataset_signal = dataset_signal self.outputs.dataset_variance = dataset_variance self.outputs.dataset_sigma = dataset_sigma self.dataset_signal = dataset_signal self.dataset_variance = dataset_variance self.dataset_sigma = dataset_sigma # Decide if do process and save if do_process: self.processing_params = {} if self.dataset_signal.size != 0: nb_frames = len(self.dataset_signal) with Benchmark( nb_frames=nb_frames, benchmark_name="process", processing_type=self.processing_type, ) as bench_process: self.process() self._log_benchmark(bench=bench_process) else: self.log_warning( f"Skipping processing {self.processing_type} due to empty array." ) if do_save: if not self.processing_filename: raise RuntimeError( f"Saving needs a processing_filename for {self.processing_type}" ) if self.outputs.dataset_signal.size != 0: with Benchmark( nb_frames=nb_frames, benchmark_name="saving", processing_type=self.processing_type, ) as bench_save: self.save() self._log_benchmark(bench=bench_save) # Save both process and save benchmarks with self.open_id02_file( filename=self.processing_filename, mode="a", ) as processed_file: processed_file._save_benchmark(bench=bench_process) processed_file._save_benchmark(bench=bench_save) else: self.log_warning( f"Skipping saving {self.processing_type} due to empty array." ) else: self.log_warning("Save flag was set to False, data will not be saved") else: self.log_warning(msg=f"Processing {self.processing_type} will be skipped.") # Finish with logging the used memory and garbage collecting self.log_allocated_memory() if self.get_input_value("gc_collect", True): gc.collect()
def _get_ewoks_info(self, index: int = 0) -> dict: return { "task_identifier": f"{self.__module__}.{self.__class__.__name__}", "class": self.__class__.__name__, "title": f"{index:02} - {self.__class__.__name__}", "index": index, "processing_type": self.processing_type, "datetime": str(get_isotime()), "version": version("ewoksid02"), "host": socket.gethostname(), }
[docs] def log_debug(self, msg): self._log(level="debug", msg=msg)
[docs] def log_info(self, msg): self._log(level="info", msg=msg)
[docs] def log_warning(self, msg): self._log(level="warning", msg=msg)
[docs] def log_error(self, msg): self._log(level="error", msg=msg)
def _log(self, level, msg): msg = f"Loop #{self.loop_nb}: {self.__class__.__name__}: (PID: {self._pid}): {msg}" logger.__getattribute__(level)(msg)
[docs] def log_allocated_memory(self): memory_info = self.get_memory_info() mem_usage_GB = memory_info["used"] total_mem_GB = memory_info["total"] available_mem_GB = memory_info["available"] if available_mem_GB / total_mem_GB < 0.1: mem_message = "Low memory available" color_prefix = "\033[91m" elif available_mem_GB / total_mem_GB < 0.3: mem_message = "Medium memory available" color_prefix = "\033[93m" else: mem_message = "Sufficient memory available" color_prefix = "\033[92m" color_suffix = "\033[0m" global MEM_USAGE_START if MEM_USAGE_START is None: memory_delta = 0.0 MEM_USAGE_START = mem_usage_GB else: memory_delta = mem_usage_GB - MEM_USAGE_START logger.info( f"{color_prefix}Loop #{self.loop_nb}: {self.__class__.__name__}: (PID: {self._pid}): Memory: {mem_usage_GB:.2f}GB used, increased by {memory_delta:.2f}GB; {available_mem_GB:.2f}GB available. {mem_message}{color_suffix}" )
[docs] def get_memory_info(self): # Return memory info in GBs return { "used": self._process.memory_info().rss / 1e9, "total": psutil.virtual_memory().total / 1e9, "available": psutil.virtual_memory().available / 1e9, }
def _log_benchmark(self, bench): self.log_info( f"Benchmark. Total ({bench.nb_frames}). {bench.benchmark_name}: {bench.bench_total_s:.2f} s. Per frame: {bench.bench_per_frame_ms:.2f} ms" )
[docs] def load_scan(self): return load_scan( scan_memory_url=self.scan_memory_url, beacon_host=self.beacon_host, )
def _get_new_datasets(self) -> dict: # TODO optimize open/close files for reading from files out = { "dataset_signal": numpy.array([]), "dataset_variance": numpy.array([]), "dataset_sigma": numpy.array([]), "index_range": None, } if self.scan_memory_url: # Reading online # - dataset_signal comes from the blissdata stream # - dataset_variance = empty # - dataset_sigma = empty if self.processing_type in PYFAI_PROCESSES: stream_name = f"{self.detector_name}:image" else: stream_name = "mcs:epoch" indew_range_new = _get_new_slice_limits( stream_name=stream_name, scan_memory_url=self.scan_memory_url, beacon_host=self.beacon_host, index_range=self.index_range, index_range_last=self.index_range_last, max_slice_size=self.max_slice_size, ) if indew_range_new is None: return out ptdata_signal = _slice_dataset_online( stream_name=stream_name, detector_name=self.detector_name, scan_memory_url=self.scan_memory_url, beacon_host=self.beacon_host, lima_url_template=self.get_input_value( "lima_url_template", LIMA_URL_TEMPLATE_ID02 ), lima_url_template_args=self.get_input_value( "lima_url_template_args", {} ), index_range=indew_range_new, start_from_memory=True, ) if ptdata_signal["dataset"] is not None: out["dataset_signal"] = ptdata_signal["dataset"] out["index_range"] = ptdata_signal["index_range"] else: # Reading offline # - dataset_signal comes from the RAW_DATA or PROCESSED_DATA # - dataset_variance = comes only from PROCESSED_DATA # - dataset_sigma = comes only from PROCESSED_DATA h5path_datasignal = None h5path_datavariance = None h5path_datasigma = None with self.open_id02_file( filename=self.filename_data, ) as f: if isinstance(f, ID02BlissMasterFile): h5path_datasignal = f.nxdata_dataset_signal_path elif isinstance(f, ID02ProcessedFileHS32): h5path_datasignal = f.nxdata_dataset_signal_path h5path_datavariance = f.nxdata_dataset_variance_path h5path_datasigma = f.nxdata_dataset_sigma_path if h5path_datasignal: indew_range_new = _get_new_slice_limits( filename_data=self.filename_data, h5path=h5path_datasignal, index_range=self.index_range, index_range_last=self.index_range_last, max_slice_size=self.max_slice_size, ) if indew_range_new is None: return out ptdata_signal = _slice_dataset_offline( filename_data=self.filename_data, h5path_to_data=h5path_datasignal, index_range=indew_range_new, ) if ptdata_signal["dataset"] is not None: out["dataset_signal"] = ptdata_signal["dataset"] out["index_range"] = ptdata_signal["index_range"] if h5path_datavariance: ptdata_variance = _slice_dataset_offline( filename_data=self.filename_data, h5path_to_data=h5path_datavariance, index_range=indew_range_new, ) if ptdata_variance["dataset"] is not None: out["dataset_variance"] = ptdata_variance["dataset"] if h5path_datasigma: ptdata_sigma = _slice_dataset_offline( filename_data=self.filename_data, h5path_to_data=h5path_datasigma, index_range=indew_range_new, ) if ptdata_sigma["dataset"] is not None: out["dataset_sigma"] = ptdata_sigma["dataset"] nb_frames_read = len(out["dataset_signal"]) index_range_sliced = out["index_range"] if nb_frames_read > 0: self.log_info(f""" \n\tIncoming ({nb_frames_read} frames) in the range {index_range_sliced[0]} -> {index_range_sliced[-1]}, """) else: self.log_info(""" \n\tNo more data to read. End of the workflow. """) return out
[docs] def process(self): self.log_warning("Nothing implemented yet...")
[docs] def get_processing_parameters(self) -> dict: return {}
[docs] def get_parameter(self, key: str, to_integer: bool = False, default=None): value = self.get_input_value(key=key) if value == missing_data.MISSING_DATA: # Try to get it from header value = self.headers.get(key, default) if to_integer: value = int(value) return value
[docs] def save(self): if self.loop_nb == 1: # Create the file only if it's loop_nb=1, do not overwrite if Path(self.processing_filename).is_file(): raise RuntimeError( f"Data cannot be saved as the file {self.processing_filename} already exists." ) # Create directories Path(self.processing_filename).parent.mkdir(parents=True, exist_ok=True) self.log_info(f"Creating new processed file: {self.processing_filename}") with self.create_id02_processed_file( filename=self.processing_filename, ) as processing_file: processing_file.write_ewoks_history( list_ewoks_history=self.outputs.info_history ) # Write/Overwrite explicit processing parameters (not headers) processing_file.write_processing_parameters( processing_parameters=self.get_processing_parameters() ) with ExitStack() as stack: processed_file = stack.enter_context( self.open_id02_file( filename=self.processing_filename, mode="a", ) ) stack.enter_context(lock) if self.processing_type in PYFAI_PROCESSES: # Update all three big datasets if ( processed_file.nxdata_name not in processed_file.root[processed_file.nxprocess_path] ): # Create the nexus data group processed_file.create_h5_group( h5_group_path=processed_file.nxdata_path, NX_class="NXdata", default=processed_file.nxdata_path, signal=ID02ProcessedFileHS32.DATASET_SIGNAL_NAME, ) datatype = self.get_input_value("datatype", "float32") processed_file.update_dataset( added_dataset=self.outputs.dataset_signal, h5_dataset_path=processed_file.nxdata_dataset_signal_path, index_read=self.index_range_last, datatype=datatype, ) if self.get_input_value("save_variance", False): processed_file.update_dataset( added_dataset=self.outputs.dataset_variance, h5_dataset_path=processed_file.nxdata_dataset_variance_path, index_read=self.index_range_last, datatype=datatype, ) if self.get_input_value("save_sigma", True): processed_file.update_dataset( added_dataset=self.outputs.dataset_sigma, h5_dataset_path=processed_file.nxdata_dataset_sigma_path, index_read=self.index_range_last, datatype=datatype, ) # Update metadata if self.get_input_value("save_metadata", True): # Update subscan1 for stream_name, stream_array in self.streams_subscan1.items(): stream_name_raw = refactor_stream_name_raw(stream_name=stream_name) stream_name_interpreted = refactor_stream_name_interpreted( stream_name=stream_name ) stream_values, slice_init, slice_end = self._read_from_stream( stream_sliceable=stream_array, stream_name=stream_name, slice_init=self.index_range_last[0], slice_end=self.index_range_last[-1], ) processed_file.update_dataset( added_dataset=stream_values, h5_dataset_path=str( Path(processed_file.mcs_path) / processed_file.MCS_RAW_NAME / processed_file.MCS_SUBSCAN1 / stream_name_raw ), index_read=[slice_init, slice_end], ) processed_file.update_dataset( added_dataset=stream_values, h5_dataset_path=str( Path(processed_file.mcs_path) / processed_file.MCS_INTERPRETED / stream_name_interpreted ), index_read=[slice_init, slice_end], ) # Update subscan2 if it exists for stream_name, stream_array in self.streams_subscan2.items(): stream_name_raw = refactor_stream_name_raw(stream_name=stream_name) stream_name_interpreted = refactor_stream_name_interpreted( stream_name=stream_name ) stream_values = stream_array[ self.index_range_last[0] : self.index_range_last[-1] ] # Read without any interpolation to send to raw-subscan2 stream_values, slice_init, slice_end = self._read_from_stream( stream_sliceable=stream_array, stream_name=stream_name, slice_init=0, slice_end=len(stream_array), ) if stream_values is None: continue processed_file.update_dataset( added_dataset=stream_values, h5_dataset_path=str( Path(processed_file.mcs_path) / processed_file.MCS_RAW_NAME / processed_file.MCS_SUBSCAN2 / stream_name_raw ), index_read=None, ) # Send the interpolated values to interpreted group interpolated_values = self.read_from_stream_interpolate( stream_sliceable=stream_array, stream_slice=None, ) if interpolated_values is None: continue processed_file.update_dataset( added_dataset=interpolated_values, h5_dataset_path=str( Path(processed_file.mcs_path) / processed_file.MCS_INTERPRETED / stream_name_interpreted ), ) # Update HS32 arrays HS32C_array = self.get_HS32C_array() processed_file.update_dataset( added_dataset=HS32C_array, h5_dataset_path=str( Path(processed_file.mcs_path) / HS32.KEY_HS32_COUNTERS ), index_read=self.index_range_last, datatype=ID02ProcessedFileHS32.FORMAT_HS32C, ) HS32V_array = self.get_HS32V_array() processed_file.update_dataset( added_dataset=HS32V_array, h5_dataset_path=str( Path(processed_file.mcs_path) / HS32.KEY_HS32_VALUES ), index_read=self.index_range_last, datatype=ID02ProcessedFileHS32.FORMAT_HS32V, ) # Update exposure and delta times exposuretime_values, slice_init, slice_end = ( self.get_exposuretime_values() ) processed_file.update_dataset( added_dataset=exposuretime_values, h5_dataset_path=str( Path(processed_file.mcs_path) / processed_file.KEY_EXPOSURE_TIME ), index_read=[slice_init, slice_end], ) stream_deltatime_name, stream_deltatime_array = ( self.get_stream_deltatime() ) deltatime, slice_init, slice_end = self._read_from_stream( stream_sliceable=stream_deltatime_array, stream_name=stream_deltatime_name, slice_init=self.index_range_last[0], slice_end=self.index_range_last[-1], ) processed_file.update_dataset( added_dataset=deltatime, h5_dataset_path=str( Path(processed_file.tfg_path) / processed_file.KEY_DELTA_TIME ), index_read=[slice_init, slice_end], ) if self.processing_type in PYFAI_PROCESSES: processed_file.update_dataset( added_dataset=deltatime, h5_dataset_path=processed_file.nxdata_delta_time_path, index_read=[slice_init, slice_end], h5_dataset_name="t", unit="s", ) # Update intensities/monitor arrays stream_intensity0_name, stream_intensity0_array = ( self.get_stream_monitor_0() ) intensity0_values, monitor0_slice_init, monitor0_slice_end = ( self._read_from_stream( stream_sliceable=stream_intensity0_array, stream_name=stream_intensity0_name, slice_init=slice_init, slice_end=slice_end, ) ) intensity0_factor = float(self.headers.get(HS32.KEY_MONITOR_0_FACTOR)) intensity0uncor = intensity0_values * intensity0_factor sot = float(self.headers.get(HS32.KEY_SOT, 0.0)) sct = float(self.headers.get(HS32.KEY_SCT, 0.0)) intensity0shutcor = ( intensity0uncor * (exposuretime_values - sot + sct) / (exposuretime_values - sot) ) processed_file.update_dataset( added_dataset=intensity0shutcor, h5_dataset_path=str( Path(processed_file.mcs_path) / ID02ProcessedFileHS32.KEY_I0_SHUTCOR ), index_read=[monitor0_slice_init, monitor0_slice_end], ) processed_file.update_dataset( added_dataset=intensity0uncor, h5_dataset_path=str( Path(processed_file.mcs_path) / ID02ProcessedFileHS32.KEY_I0_UNCOR ), index_read=[monitor0_slice_init, monitor0_slice_end], ) stream_intensity1_name, stream_intensity1_array = ( self.get_stream_monitor_1() ) intensity1_values, monitor1_slice_init, monitor1_slice_end = ( self._read_from_stream( stream_sliceable=stream_intensity1_array, stream_name=stream_intensity1_name, slice_init=slice_init, slice_end=slice_end, ) ) intensity1_factor = float(self.headers.get(HS32.KEY_MONITOR_1_FACTOR)) intensity1uncor = intensity1_values * intensity1_factor intensity1shutcor = ( intensity1uncor * (exposuretime_values - sot + sct) / (exposuretime_values - sot) ) processed_file.update_dataset( added_dataset=intensity1shutcor, h5_dataset_path=str( Path(processed_file.mcs_path) / ID02ProcessedFileHS32.KEY_I1_SHUTCOR ), index_read=[monitor1_slice_init, monitor1_slice_end], ) processed_file.update_dataset( added_dataset=intensity1uncor, h5_dataset_path=str( Path(processed_file.mcs_path) / ID02ProcessedFileHS32.KEY_I1_UNCOR ), index_read=[monitor1_slice_init, monitor1_slice_end], ) # Save the processing params (only once except for normalization values) # TODO # if self.processing_type in PYFAI_PROCESSES: # self._save_processing_params(root_group=root_group_destination) # Update the TitleExtension titleextension_template = self.headers.get( HS32.KEY_TITLEEXTENSION_TEMPLATE ) if not titleextension_template: self.log_warning( "There is no TitleExtensionTemplate in the header." ) else: title_extension_parsed, title_extension_formats = ( parse_titleextension_template(titleextension_template) ) title_extension_values = {} # We want to slice all the streams at once, and then build the array of strings for titleextension_dict in title_extension_formats: stream_name_from_titleextension = titleextension_dict[ "stream_name" ] stream_values = None title_extension_values[stream_name] = numpy.full( shape=(self.index_range_last[1],), fill_value=numpy.nan ) # Try first from subscan2 stream_name, stream_array = self.get_stream( name=stream_name_from_titleextension, subscan_2=True ) if stream_name: stream_values = self.read_from_stream_interpolate( stream_sliceable=stream_array, stream_slice=None, ) else: stream_name, stream_array = self.get_stream( name=stream_name_from_titleextension, subscan_2=False ) if stream_name is None: self.log_error( f"{stream_name_from_titleextension} stream from TitleExtension could not be found" ) continue stream_values, slice_init, slice_end = ( self._read_from_stream( stream_sliceable=stream_array, stream_name=stream_name, slice_init=0, slice_end=self.index_range_last[-1], ) ) if stream_values is None: self.log_error(f"Stream {stream_name} could not be read") continue title_extension_values[stream_name_from_titleextension] = ( stream_values ) # Now we go index by index, building the string cells # The title extensions will always be the size of last index (from 0 -> last_frame) new_title_extensions = numpy.full( shape=(self.index_range_last[1],), fill_value="", dtype=h5py.string_dtype(encoding="utf-8"), ) for index in range(len(new_title_extensions)): format_index = { stream_name: title_extension_values[stream_name][index] for stream_name in title_extension_values } try: new_title_extensions[index] = title_extension_parsed.format( **format_index ) except KeyError as e: self.log_error( f"{e} : {title_extension_parsed=}, {format_index=}" ) processed_file.update_dataset( added_dataset=new_title_extensions, h5_dataset_path=str( Path(processed_file.parameters_path) / ID02ProcessedFileHS32.KEY_TITLEEXTENSION ), index_read=[0, self.index_range_last[1]], ) # Update NexusDetector with metadata from the RAW_DATA file (has to be accesible). # To be done only once and do not wait as the file will be accessible eventually. if ( self.processing_type in PYFAI_PROCESSES and processed_file.nxdetector_path not in processed_file ): for file in (self.filename_data, self.filename_lima): filedata = stack.enter_context( self.open_id02_file( filename=file, ) ) if filedata is None: continue if filedata.nxdetector_path is not None: nxdetector_grp = filedata[filedata.nxdetector_path] if len(nxdetector_grp) > 0: metadata_detector_output = ( processed_file.create_h5_group( h5_group_path=processed_file.nxdetector_path, NX_class="NXdetector", ) ) update_nexus_detector_group( nxdetector_group_destination=metadata_detector_output, nxdetector_group_source=nxdetector_grp, ) break
[docs] def get_streams_HS32_scalers(self) -> list: hs32_names = HS32.get_HS32_names(headers=self.headers) streams_scalers = [] for key_name in hs32_names.values(): if self.scan_memory_url: # In blissdata streams, they appear as scalers:{name} stream_name = f"scalers:{key_name}" else: # In the file, they are saved with the raw refactored name (without scalers) stream_name = key_name stream_name, stream_array = self.get_stream( name=stream_name, ) streams_scalers.append((stream_name, stream_array)) return streams_scalers
[docs] def get_streams_HS32_raw(self) -> list: hs32_names = HS32.get_HS32_names(headers=self.headers) streams_raw = [] for key_name in hs32_names.values(): if self.scan_memory_url: # In blissdata streams, they appear as mcs:{name}_raw stream_name = f"mcs:{key_name}_raw" else: # In the file, they are saved with the raw refactored name (without mcs) stream_name = f"{key_name}_raw" stream_name, stream_array = self.get_stream( name=stream_name, ) streams_raw.append((stream_name, stream_array)) return streams_raw
[docs] def get_stream_slow_timer(self) -> tuple: stream_name_slow_timer = None stream_array_slow_timer = None for stream_name, stream_array in self.streams_subscan2.items(): if "epoch" in stream_name: stream_name_slow_timer, stream_array_slow_timer = ( stream_name, stream_array, ) break return (stream_name_slow_timer, stream_array_slow_timer)
[docs] def get_stream_fast_timer(self) -> tuple: return self.get_stream(name="epoch", prefix="mcs")
[docs] def get_stream_exposuretime(self) -> tuple: """ In blissdata, this stream is (normally) called scalers:time (in subscan1) In the header, there is key "HSTime" pointing to the string 'time' """ key_name = self.headers.get(HS32.KEY_EXPOSURE_TIME) stream_time_name = f"scalers:{key_name}" return self.get_stream( name=stream_time_name, subscan_2=False, )
[docs] def get_stream_exposuretime_raw(self) -> tuple: """ In blissdata, this stream is (normally) called mcs:time_raw (in subscan1) In the header, there is key "HSTime" pointing to the string 'time' """ key_name = self.headers.get(HS32.KEY_EXPOSURE_TIME) stream_time_name = f"mcs:{key_name}_raw" return self.get_stream( name=stream_time_name, subscan_2=False, )
[docs] def get_stream_deltatime(self) -> tuple: if self.scan_memory_url: stream_name = "mcs:elapsed_time" else: stream_name = "elapsed_time" stream_name, stream_array = self.get_stream( name=stream_name, ) if not stream_name: stream_name, stream_array = self.get_stream( name="mcs_elapsed_time", ) return (stream_name, stream_array)
[docs] def get_stream( self, index_pin: int = None, name: str = None, header_key_pin: str = None, prefix: str = "", suffix: str = "", subscan_2: bool = False, ) -> tuple: stream_out_name, stream_out_array = (None, None) if name is None: if header_key_pin is not None: name = self.headers.get(header_key_pin) elif index_pin is not None: name = self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}") if name: if prefix: name = f"{prefix}:{name}" if suffix: name = f"{name}_{suffix}" if subscan_2: streams = self.streams_subscan2 else: streams = self.streams_subscan1 stream_out_name, stream_out_array = match_stream(name=name, streams=streams) return (stream_out_name, stream_out_array)
def _read_from_stream( self, stream_sliceable, slice_init: int, slice_end: int, stream_name: str = "unknown", datatype: str = "float32", timeout: float = 0.0, ) -> tuple: stream_values_out = None if stream_sliceable is not None: nb_available_frames = len(stream_sliceable) if slice_end > nb_available_frames and timeout > 0.0: t_ = time.perf_counter() while ( slice_end > nb_available_frames and (time.perf_counter() - t_) < timeout ): time.sleep(0.1) nb_available_frames = len(stream_sliceable) if slice_init > nb_available_frames: self.log_error( f"Not enough frames in stream {stream_name} {stream_sliceable} ({nb_available_frames}). Requested init: {slice_init}" ) if slice_end > nb_available_frames: self.log_warning( f"Not enough frames in stream {stream_name} {stream_sliceable} ({nb_available_frames}). Requested end: {slice_end}" ) slice_end = nb_available_frames try: stream_values_out = stream_sliceable[slice_init:slice_end].astype( datatype ) except Exception as e: self.log_error(f"{stream_sliceable} could not be sliced: {e}") return stream_values_out, slice_init, slice_end
[docs] def read_from_stream_interpolate( self, stream_sliceable, stream_slice=None, datatype: str = "float32", ) -> numpy.ndarray: """ stream_object is a sliceable object (numpy.ndarray or blissdata stream object) Valid for all streams, from subscan1 and subscan2 """ if stream_slice is None: slice_init = 0 slice_end = len(stream_sliceable) else: slice_init, slice_end = stream_slice stream_values = stream_sliceable[slice_init:slice_end] stream_values = stream_values.astype(datatype) # 2) Read all the values in the slow epoch counter slow_epoch_stream_name, slow_epoch_stream_array = self.get_stream_slow_timer() if slow_epoch_stream_name is None: self.log_debug("Slow epoch stream could not be found.") return stream_values # Both epoch must be read with double precision slow_epoch_values, slice_init, slice_end = self._read_from_stream( stream_sliceable=slow_epoch_stream_array, stream_name=slow_epoch_stream_name, slice_init=0, slice_end=len(slow_epoch_stream_array), datatype="float64", ) if slow_epoch_values is None: self.log_error(f"Stream {slow_epoch_stream_name} could not be read.") return stream_values # 3) Match the slow streams if len(slow_epoch_values) != len(stream_values): nb_slow_frames = min(len(slow_epoch_values), len(stream_values)) slow_epoch_values = slow_epoch_values[0:nb_slow_frames] stream_values = stream_values[0:nb_slow_frames] # 4) Read the fast epoch counter fast_epoch_stream_name, fast_epoch_stream_array = self.get_stream_fast_timer() if fast_epoch_stream_name is None: self.log_error("Fast epoch stream could not be found.") return stream_values fast_epoch_values, slice_init, slice_end = self._read_from_stream( stream_sliceable=fast_epoch_stream_array, stream_name=fast_epoch_stream_name, slice_init=0, slice_end=self.index_range_last[-1], datatype="float64", ) if fast_epoch_values is None: self.log_error(f"Stream {fast_epoch_stream_name} could not be read.") elif fast_epoch_values.size == 0: self.log_error(f"Stream {fast_epoch_stream_name} is empty.") return # 5) Interpolate data, len(interpdata) = len(fast_epoch_values) try: return numpy.interp(fast_epoch_values, slow_epoch_values, stream_values) except Exception as e: self.log_error(f"Error during numpy interpolation: {e}") return
[docs] def get_HS32C_array(self): nb_hs32_pins = HS32.get_HS32_number_pins(headers=self.headers) new_HS32C_array = numpy.full( (self.index_range_last[1] - self.index_range_last[0], nb_hs32_pins), fill_value=-1, dtype="float64", ) streams_raw = self.get_streams_HS32_raw() for index_pin, stream_info in enumerate(streams_raw): stream_name, stream_array = stream_info if stream_array is None: continue stream_values, slice_init, slice_end = self._read_from_stream( stream_sliceable=stream_array, stream_name=stream_name, slice_init=self.index_range_last[0], slice_end=self.index_range_last[-1], ) if stream_values is not None: new_HS32C_array[0 : len(stream_values), index_pin] = stream_values return new_HS32C_array
[docs] def get_HS32V_array( self, ): nb_hs32_pins = HS32.get_HS32_number_pins(headers=self.headers) new_HS32V_array = numpy.zeros( (self.index_range_last[1] - self.index_range_last[0], nb_hs32_pins), dtype="float64", ) streams_scalers = self.get_streams_HS32_scalers() for index_pin, stream_info in enumerate(streams_scalers): stream_name, stream_array = stream_info if stream_array is None: continue stream_values, slice_init, slice_end = self._read_from_stream( stream_sliceable=stream_array, stream_name=stream_name, slice_init=self.index_range_last[0], slice_end=self.index_range_last[-1], ) if stream_values is not None: new_HS32V_array[0 : len(stream_values), index_pin] = stream_values return new_HS32V_array
[docs] def get_exposuretime_values( self, ) -> tuple: # Go first for the already normalized time values stream_exposuretime_name, stream_exposuretime_array = ( self.get_stream_exposuretime() ) exposuretime, slice_init, slice_end = self._read_from_stream( stream_sliceable=stream_exposuretime_array, stream_name=stream_exposuretime_name, slice_init=self.index_range_last[0], slice_end=self.index_range_last[-1], ) if exposuretime is None: # Try to go to the raw time stream_exposuretime_name, stream_exposuretime_array = ( self.get_stream_exposuretime_raw() ) exposuretime, slice_init, slice_end = self._read_from_stream( stream_sliceable=stream_exposuretime_array, stream_name=stream_exposuretime_name, slice_init=self.index_range_last[0], slice_end=self.index_range_last[-1], ) if exposuretime is None: self.log_error("Exposure time could not be read.") return None, None, None # This is a way to get the factor nb_pins = HS32.get_HS32_number_pins(headers=self.headers) pin_name = self.headers.get(HS32.KEY_EXPOSURE_TIME) factor_exposuretime = None for index_pin in range(nb_pins): if ( self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}") == pin_name ): factor_exposuretime = float( self.headers.get(f"{HS32.KEY_HS32_FACTOR}{(index_pin + 1):02}") ) break if factor_exposuretime is None: self.log_error("Default exposure time factor 1.0 will be used") factor_exposuretime = 1.0 exposuretime *= factor_exposuretime return exposuretime, slice_init, slice_end
[docs] def get_stream_monitor_0(self) -> tuple: key_name = self.headers.get(HS32.KEY_MONITOR_0) stream_name = f"scalers:{key_name}" return self.get_stream(name=stream_name)
[docs] def get_stream_monitor_1(self) -> tuple: key_name = self.headers.get(HS32.KEY_MONITOR_1) stream_name = f"scalers:{key_name}" return self.get_stream(name=stream_name)