import os
import gc
import threading
import socket
import time
from importlib.metadata import version
from contextlib import ExitStack
from ewokscore import Task
import psutil
from pathlib import Path
import h5py
import numpy
import logging
from ewokscore import missing_data
from ewoksid02.id02_format import (
open_id02_file,
ID02ProcessedFileHS32,
ID02BlissMasterFile,
ID02BlissLimaFile,
update_nexus_detector_group,
)
from ewoksid02.headers import HS32
from ewoksid02.utils.io import (
get_isotime,
refactor_stream_name_raw,
refactor_stream_name_interpreted,
parse_titleextension_template,
match_stream,
)
from ewoksid02.utils.blissdata import (
LIMA_URL_TEMPLATE_ID02,
load_scan,
_slice_dataset_online,
_slice_dataset_offline,
_get_new_slice_limits,
get_stream_names_subscan1,
get_stream_names_subscan2,
)
lock = threading.Lock()
PYFAI_PROCESSES = ["norm", "gaps", "2scat", "cave", "azim", "ave", "caving"]
TRUSAXS_PROCESSES = ["scalers", "dispatch", "debug"]
ALL_PROCESSES = PYFAI_PROCESSES + TRUSAXS_PROCESSES
PROCESSING_TYPE_TASK = {
"norm": "ewoksid02.tasks.normalizationtask.NormalizationTask",
"gaps": "ewoksid02.tasks.cavingtask.CavingGapsTask",
"2scat": "ewoksid02.tasks.secondaryscatteringtask.SecondaryScatteringTask",
"caving": "ewoksid02.tasks.cavingtask.CavingTask",
"gaps": "ewoksid02.tasks.cavingtask.CavingGapsTask",
"cave": "ewoksid02.tasks.cavingtask.CavingBeamstopTask",
"azim": "ewoksid02.tasks.azimuthaltask.AzimuthalTask",
"ave": "ewoksid02.tasks.averagetask.AverageTask",
"scalers": "ewoksid02.tasks.scalerstask.ScalersTask",
}
MAX_SLICE_SIZE = 50
LOG_LEVEL_DEFAULT = "info"
LIMA_INDEX_NUMBER_FORMAT_ID02 = "%02d"
MEM_USAGE_START = None
# Global logger at ewoksid02.tasks.id02processingtask
logger = logging.getLogger("ewoksid02")
logger.propagate = True
[docs]
class Benchmark:
"""A context manager for benchmarking."""
def __init__(
self, nb_frames, benchmark_name="processing", processing_type: str = "unknown"
):
self.nb_frames = nb_frames
self.benchmark_name = benchmark_name
self.bench_total_s = 0.0
self.bench_per_frame_ms = 0.0
self.processing_type = processing_type
def __enter__(self):
logger.info(f"Start {self.benchmark_name} {self.processing_type}...")
self.start = time.perf_counter()
return self
def __exit__(self, exc_type, exc_value, traceback):
logger.info(f"Finished {self.benchmark_name} {self.processing_type}.")
self.end = time.perf_counter()
self.bench_total_s = self.end - self.start
if self.nb_frames > 0:
self.bench_per_frame_ms = self.bench_total_s / self.nb_frames * 1000
else:
self.bench_per_frame_ms = 0
[docs]
class ID02ProcessingTask(
Task,
optional_input_names=[
"detector_name",
"scan_memory_url",
"beacon_host",
"reading_node",
"filename_data", # Bliss master file for a dataset
"filename_lima",
"scan_nb",
"headers",
"dataset_signal",
"dataset_variance",
"dataset_sigma",
"datatype",
"lima_url_template",
"lima_url_template_args",
"log_level",
"processing_filename",
"processing_subtitle",
"subtitle",
"do_process",
"do_save",
"save_variance",
"save_sigma",
"save_metadata",
"index_range", # Global range, do not propagate
"index_range_last", # Dynamic range, propagate and change every loop
"max_slice_size",
"loop_nb",
"info",
"info_history",
"gc_collect",
"lima_index_number_format",
"streams_subscan1",
"streams_subscan2",
],
output_names=[
"filename_data",
"filename_lima",
"detector_name",
"scan_nb",
"headers",
"index_range_last",
"loop_nb",
"dataset_signal",
"dataset_variance",
"dataset_sigma",
"continue_pipeline",
"info_history",
"streams_subscan1",
"streams_subscan2",
],
):
"""This class contains processing support methods and saving methods in the ID02 SAXS pipeline.
It extends the `ID02LoopTask` class and provides additional functionality for handling metadata, processing flags,
and saving processed data to HDF5 files.This class is designed to be used as part of the ID02 pipeline.It does not contain a process method, that has to be implemented in the child class.
Optional Inputs:
- detector_name (str): Name of the detector used for data acquisition. This is the only mandatory input.
- scan_memory_url (str): URL for accessing scan memory in online processing.
- beacon_host (str): Host and port to plug blissdata to the correct beacon server. Only for online processing.
- reading_node (bool): Flag to indicate if the task should read data from the node.
- filename_data (str): Path to the dataset file (Master file, Nexus writer) for offline processing.
- filename_lima (str): Path to the first Lima file, the only place where some detector metadata can be found.
- scan_nb (int): Scan number for identifying the dataset.
- headers (dict): Only for Online processing. Dictionary containing headers information.
- max_slice_size (int): Maximum number of frames to process in one iteration. Default is `20`.
- dataset_signal (numpy.ndarray): Signal dataset to be processed.
- dataset_variance (numpy.ndarray): Variance dataset to be processed.
- dataset_sigma (numpy.ndarray): Sigma dataset to be processed.
- datatype (str): Datatype to be used to save the 2D data. Default and recommended is float32.
- lima_url_template (str): Format string to locate the Lima file and the path to the data inside that file.
- lima_url_template_args (dict): Dictionary to format the lima_url_template.
- log_level (str): Logging level for the task. Default is `"warning"`.
- processing_filename (str): Full path to the (new) output file.
- processing_subtitle (str): Additional subtitle for the processing task.
- subtitle (str): Subtitle for the processing task to be added to the output filename.
- do_process (bool): Flag to enable or disable processing. Default is `True`.
- do_save (bool): Flag to enable or disable saving of processed data. Default is `True`.
- save_variance (bool): Flag to enable or disable saving of variance dataset. Default is `False`.
- save_sigma (bool): Flag to enable or disable saving of sigma dataset. Default is `True`.
- save_metadata (bool): Flag to enable or disable saving of metadata. Default is `True`.
- last_index_read (int): Index of the last frame read in the dataset. Default is `0`.
- range_index_read (list): Range of indices to read from the dataset. This parameter is not propagated to the next task.
- loop_nb (int): Current loop iteration number. Default is `0`.
- info (dict): Additional metadata to save.
- info_history (dict): Additional metadata to propagate and save, creating a history of processing.
- gc_collect (bool): Manually collect garbage at the end of every task.
- lima_index_number_format (str): format to find the first Lima file (02%d by default)
Outputs:
- last_index_read (int): Updated index of the last frame read.
- loop_nb (int): Updated loop iteration number.
- dataset_signal (numpy.ndarray): Processed signal dataset.
- dataset_variance (numpy.ndarray): Processed variance dataset.
- dataset_sigma (numpy.ndarray): Processed sigma dataset.
- continue_pipeline (bool): Flag to indicate whether the pipeline should continue.
- info_history (dict): Additional metadata to propagate and save, creating a history of processing.
"""
[docs]
def is_new_cycle(self):
new_cycle = self.get_input_value("reading_node", False)
if new_cycle is True:
return new_cycle
if self.get_input_value("loop_nb", 0) == 0:
new_cycle = True
return new_cycle
[docs]
def is_new_workflow(self):
if self.get_input_value("loop_nb", 0) == 0:
return True
return False
[docs]
def is_first_loop(self):
if self.get_input_value("loop_nb", 0) in (0, 1):
return True
return False
[docs]
def open_id02_file(self, filename: str, mode: str = "r"):
return open_id02_file(
filename=filename,
mode=mode,
detector_name=self.detector_name,
scan_nb=self.scan_nb,
processing_type=self.processing_type,
)
[docs]
def create_id02_processed_file(self, filename: str):
return open_id02_file(
filename=filename,
mode="w",
detector_name=self.detector_name,
scan_nb=self.scan_nb,
processing_type=self.processing_type,
headers=self.headers,
)
[docs]
def run(self):
self.processing_type = next(
(
k
for k, v in PROCESSING_TYPE_TASK.items()
if self.__class__.__name__ in v
),
None,
)
if not self.processing_type:
raise RuntimeError(
f"Please register the class {self.__class__.__name__} before using it"
)
self._pid = os.getpid()
self._process = psutil.Process()
logger.setLevel(self.get_input_value("log_level", LOG_LEVEL_DEFAULT).upper())
self.processing_params = {}
self.detector_name = self.get_input_value("detector_name", None)
self.scan_memory_url = self.get_input_value("scan_memory_url", None)
self.beacon_host = self.get_input_value(
"beacon_host", os.environ.get("BEACON_HOST")
)
self.filename_data = self.get_input_value("filename_data", None)
self.filename_lima = self.get_input_value("filename_lima", None)
self.scan_nb = self.get_input_value("scan_nb", None)
self.max_slice_size = self.get_input_value("max_slice_size", MAX_SLICE_SIZE)
self.index_range = self.get_input_value("index_range", None)
self.index_range_last = self.get_input_value("index_range_last", None)
self.processing_filename = self.get_input_value("processing_filename", None)
self.info_history = self.get_input_value("info_history", []).copy()
self.headers = self.get_input_value("headers", {})
self.outputs.continue_pipeline = True
do_process = self.get_input_value("do_process", True)
do_save = self.get_input_value("do_save", True)
# Mark the beginning of a new cycle
self.loop_nb = self.get_input_value("loop_nb", 0)
if not isinstance(self.loop_nb, int) or (
isinstance(self.loop_nb, int) and self.loop_nb < 0
):
raise RuntimeError(f"Loop format is not compatible: {self.loop_nb}")
new_cycle = self.is_new_cycle()
new_workflow = self.is_new_workflow()
first_loop = self.is_first_loop()
if new_cycle:
self.loop_nb += 1
self.log_info(f"New cycle: new loop index {self.loop_nb}")
self.outputs.loop_nb = self.loop_nb
# Check-point to avoid data overwritting
if (
self.loop_nb == 1
and do_save
and self.processing_filename
and Path(self.processing_filename).is_file()
):
self.log_error(
f"The workflow was cancelled. {self.processing_filename} already exist. Choose another name!"
)
self.outputs.continue_pipeline = False
return
# Check and load some parameters only if it's the first loop (and only in the first reading node)
if new_workflow:
# This will be check only during the execution of the first node in the first cycle
if self.scan_memory_url:
# We trust this is an online processing
self.log_info(
f"Beginning of the online workflow with key {self.scan_memory_url}. Loading some parameters..."
)
if self.detector_name is None:
raise ValueError("Online processing requires a detector_name")
if not self.beacon_host:
raise ValueError("Online processing requires a beacon_host")
if not self.filename_data or not self.filename_lima or not self.scan_nb:
scan = self.load_scan()
scan_info = scan.info
self.filename_data = self.filename_data or scan_info["filename"]
self.scan_nb = self.scan_nb or scan_info["scan_nb"]
lima_index_number_format = self.get_input_value(
"lima_index_number_format", LIMA_INDEX_NUMBER_FORMAT_ID02
)
self.filename_lima = (
self.filename_lima
or f"{scan_info['images_path'].format(img_acq_device=self.detector_name)}{lima_index_number_format % 0}.h5"
)
elif self.filename_data:
# We trusts this is an offline processing, either from RAW_DATA or PROCESSED_DATA
self.log_info(
f"Beginning of the offline workflow with file {self.filename_data}. Loading some parameters..."
)
self.scan_memory_url = None
self.beacon_host = None
with self.open_id02_file(
filename=self.filename_data,
) as file:
if isinstance(file, ID02BlissLimaFile):
raise RuntimeError(
f"Offline processing cannot be run with a Lima format file {self.filename_data}"
)
elif isinstance(file, ID02BlissMasterFile):
# Offline processing from RAW_DATA master file
if not self.scan_nb:
raise RuntimeError(
"Offline processing from RAW_DATA needs a scan number (scan_nb)"
)
if not self.filename_lima and not self.detector_name:
raise RuntimeError(
"Offline processing from RAW_DATA needs either detector_name or filename_lima"
)
if self.detector_name and not self.filename_lima:
self.filename_lima = (
self.filename_lima or file.filename_lima
)
elif self.filename_lima and not self.detector_name:
with ID02BlissLimaFile(
name=self.filename_lima, mode="r"
) as flima:
self.detector_name = flima.detector_name
elif isinstance(file, ID02ProcessedFileHS32):
# Offline processing from PROCESSED_DATA file
self.detector_name = self.detector_name or file.detector_name
else:
raise RuntimeError(
"Processing needs either scan_memory_url or filename_data"
)
# Retrieve ewoks history either from input or from the filename_data if it's PROCESSED_DATA
if not self.info_history:
# Try to read from self.filename_data is it's processed file
with self.open_id02_file(filename=self.filename_data) as file:
if isinstance(file, ID02ProcessedFileHS32):
self.info_history = file.read_ewoks_history()
if first_loop and do_process:
# Every node contributes to ewoks history, only in the first loop
if self.info_history:
index = self.info_history[-1]["index"] + 1
else:
index = 0
if self.processing_type in PYFAI_PROCESSES: # Avoid including scalers
self.info_history = self.info_history + [
self._get_ewoks_info(index=index)
]
# Load the headers into a HeadersHS32 instance
if self.headers:
# headers comes initially as an input only for online processing (from blissoda)
self.outputs.headers = self.headers
else:
# Load the headers from a file (either Lima or processed file)
filename_headers = (
self.filename_lima if self.filename_lima else self.filename_data
)
with self.open_id02_file(filename=filename_headers) as file:
if file.headers:
# self.headers = HeadersHS32(headers_dict=file.headers)
self.headers = file.headers
self.outputs.headers = file.headers
else:
self.log_error(f"No headers can be loaded from {filename_headers}")
self.outputs.filename_data = self.filename_data
self.outputs.filename_lima = self.filename_lima
self.outputs.detector_name = self.detector_name
self.outputs.scan_nb = self.scan_nb
self.outputs.info_history = self.info_history
# Load and split the streams into subscan1 and 2
self.streams_subscan1 = self.get_input_value("streams_subscan1", {})
self.streams_subscan2 = self.get_input_value("streams_subscan2", {})
if new_workflow:
reset_streams = True
elif new_cycle and self.scan_memory_url:
reset_streams = True
else:
reset_streams = False
if reset_streams:
self.log_info("Streams will be reset")
if self.scan_memory_url:
scan = self.load_scan()
stream_names_subscan1 = get_stream_names_subscan1(scan=scan)
stream_names_subscan2 = get_stream_names_subscan2(scan=scan)
# Read the streams (TODO check if optimizing this is worth it) raw and interpreted names come later
self.streams_subscan1 = {
stream_name: scan.streams[stream_name]
for stream_name in stream_names_subscan1
}
self.streams_subscan2 = {
stream_name: scan.streams[stream_name]
for stream_name in stream_names_subscan2
}
else:
with self.open_id02_file(
filename=self.filename_data,
) as file:
if file.streams_subscan1_path not in file.root:
raise RuntimeError(
f"There is no group {file.streams_subscan1_path} in {self.filename_data}. No counters can be loaded."
)
measurement_subscan1_grp = file.root[file.streams_subscan1_path]
for dset_name in measurement_subscan1_grp:
dset = measurement_subscan1_grp[dset_name]
if dset.ndim == 3: # Skip the images
continue
self.streams_subscan1[dset_name] = dset[:]
if file.streams_subscan2_path in file.root:
measurement_subscan2_grp = file.root[file.streams_subscan2_path]
for dset_name in measurement_subscan2_grp:
dset = measurement_subscan2_grp[dset_name]
if dset.ndim == 3: # Skip the images
continue
self.streams_subscan2[dset_name] = dset[:]
self.log_info("Streams were reset.")
self.outputs.streams_subscan1 = self.streams_subscan1
self.outputs.streams_subscan2 = self.streams_subscan2
# Load the big datasets, new from memory or file, or from inputs
dataset_signal = self.get_input_value("dataset_signal", None)
dataset_variance = self.get_input_value("dataset_variance", None)
dataset_sigma = self.get_input_value("dataset_sigma", None)
reading_node = self.get_input_value("reading_node", False)
if reading_node or dataset_signal is None:
ptdata = self._get_new_datasets()
dataset_signal = ptdata["dataset_signal"]
dataset_variance = ptdata["dataset_variance"]
dataset_sigma = ptdata["dataset_sigma"]
index_range_new = ptdata["index_range"]
if len(dataset_signal) == 0:
self.outputs.continue_pipeline = False
self.outputs.index_range_last = self.index_range_last
return
# We define here the actual index limits because we read the data from streams
self.index_range_last = index_range_new
if (
dataset_signal is not None
and len(dataset_signal) > 0
and self.index_range_last is None
):
# Only possible if the data was sent as inputs without any index_range_last
self.index_range_last = [0, len(dataset_signal)]
# self.outputs.loop_nb = self.loop_nb
self.outputs.index_range_last = self.index_range_last
self.outputs.dataset_signal = dataset_signal
self.outputs.dataset_variance = dataset_variance
self.outputs.dataset_sigma = dataset_sigma
self.dataset_signal = dataset_signal
self.dataset_variance = dataset_variance
self.dataset_sigma = dataset_sigma
# Decide if do process and save
if do_process:
self.processing_params = {}
if self.dataset_signal.size != 0:
nb_frames = len(self.dataset_signal)
with Benchmark(
nb_frames=nb_frames,
benchmark_name="process",
processing_type=self.processing_type,
) as bench_process:
self.process()
self._log_benchmark(bench=bench_process)
else:
self.log_warning(
f"Skipping processing {self.processing_type} due to empty array."
)
if do_save:
if not self.processing_filename:
raise RuntimeError(
f"Saving needs a processing_filename for {self.processing_type}"
)
if self.outputs.dataset_signal.size != 0:
with Benchmark(
nb_frames=nb_frames,
benchmark_name="saving",
processing_type=self.processing_type,
) as bench_save:
self.save()
self._log_benchmark(bench=bench_save)
# Save both process and save benchmarks
with self.open_id02_file(
filename=self.processing_filename,
mode="a",
) as processed_file:
processed_file._save_benchmark(bench=bench_process)
processed_file._save_benchmark(bench=bench_save)
else:
self.log_warning(
f"Skipping saving {self.processing_type} due to empty array."
)
else:
self.log_warning("Save flag was set to False, data will not be saved")
else:
self.log_warning(msg=f"Processing {self.processing_type} will be skipped.")
# Finish with logging the used memory and garbage collecting
self.log_allocated_memory()
if self.get_input_value("gc_collect", True):
gc.collect()
def _get_ewoks_info(self, index: int = 0) -> dict:
return {
"task_identifier": f"{self.__module__}.{self.__class__.__name__}",
"class": self.__class__.__name__,
"title": f"{index:02} - {self.__class__.__name__}",
"index": index,
"processing_type": self.processing_type,
"datetime": str(get_isotime()),
"version": version("ewoksid02"),
"host": socket.gethostname(),
}
[docs]
def log_debug(self, msg):
self._log(level="debug", msg=msg)
[docs]
def log_info(self, msg):
self._log(level="info", msg=msg)
[docs]
def log_warning(self, msg):
self._log(level="warning", msg=msg)
[docs]
def log_error(self, msg):
self._log(level="error", msg=msg)
def _log(self, level, msg):
msg = f"Loop #{self.loop_nb}: {self.__class__.__name__}: (PID: {self._pid}): {msg}"
logger.__getattribute__(level)(msg)
[docs]
def log_allocated_memory(self):
memory_info = self.get_memory_info()
mem_usage_GB = memory_info["used"]
total_mem_GB = memory_info["total"]
available_mem_GB = memory_info["available"]
if available_mem_GB / total_mem_GB < 0.1:
mem_message = "Low memory available"
color_prefix = "\033[91m"
elif available_mem_GB / total_mem_GB < 0.3:
mem_message = "Medium memory available"
color_prefix = "\033[93m"
else:
mem_message = "Sufficient memory available"
color_prefix = "\033[92m"
color_suffix = "\033[0m"
global MEM_USAGE_START
if MEM_USAGE_START is None:
memory_delta = 0.0
MEM_USAGE_START = mem_usage_GB
else:
memory_delta = mem_usage_GB - MEM_USAGE_START
logger.info(
f"{color_prefix}Loop #{self.loop_nb}: {self.__class__.__name__}: (PID: {self._pid}): Memory: {mem_usage_GB:.2f}GB used, increased by {memory_delta:.2f}GB; {available_mem_GB:.2f}GB available. {mem_message}{color_suffix}"
)
[docs]
def get_memory_info(self):
# Return memory info in GBs
return {
"used": self._process.memory_info().rss / 1e9,
"total": psutil.virtual_memory().total / 1e9,
"available": psutil.virtual_memory().available / 1e9,
}
def _log_benchmark(self, bench):
self.log_info(
f"Benchmark. Total ({bench.nb_frames}). {bench.benchmark_name}: {bench.bench_total_s:.2f} s. Per frame: {bench.bench_per_frame_ms:.2f} ms"
)
[docs]
def load_scan(self):
return load_scan(
scan_memory_url=self.scan_memory_url,
beacon_host=self.beacon_host,
)
def _get_new_datasets(self) -> dict:
# TODO optimize open/close files for reading from files
out = {
"dataset_signal": numpy.array([]),
"dataset_variance": numpy.array([]),
"dataset_sigma": numpy.array([]),
"index_range": None,
}
if self.scan_memory_url:
# Reading online
# - dataset_signal comes from the blissdata stream
# - dataset_variance = empty
# - dataset_sigma = empty
if self.processing_type in PYFAI_PROCESSES:
stream_name = f"{self.detector_name}:image"
else:
stream_name = "mcs:epoch"
indew_range_new = _get_new_slice_limits(
stream_name=stream_name,
scan_memory_url=self.scan_memory_url,
beacon_host=self.beacon_host,
index_range=self.index_range,
index_range_last=self.index_range_last,
max_slice_size=self.max_slice_size,
)
if indew_range_new is None:
return out
ptdata_signal = _slice_dataset_online(
stream_name=stream_name,
detector_name=self.detector_name,
scan_memory_url=self.scan_memory_url,
beacon_host=self.beacon_host,
lima_url_template=self.get_input_value(
"lima_url_template", LIMA_URL_TEMPLATE_ID02
),
lima_url_template_args=self.get_input_value(
"lima_url_template_args", {}
),
index_range=indew_range_new,
start_from_memory=True,
)
if ptdata_signal["dataset"] is not None:
out["dataset_signal"] = ptdata_signal["dataset"]
out["index_range"] = ptdata_signal["index_range"]
else:
# Reading offline
# - dataset_signal comes from the RAW_DATA or PROCESSED_DATA
# - dataset_variance = comes only from PROCESSED_DATA
# - dataset_sigma = comes only from PROCESSED_DATA
h5path_datasignal = None
h5path_datavariance = None
h5path_datasigma = None
with self.open_id02_file(
filename=self.filename_data,
) as f:
if isinstance(f, ID02BlissMasterFile):
h5path_datasignal = f.nxdata_dataset_signal_path
elif isinstance(f, ID02ProcessedFileHS32):
h5path_datasignal = f.nxdata_dataset_signal_path
h5path_datavariance = f.nxdata_dataset_variance_path
h5path_datasigma = f.nxdata_dataset_sigma_path
if h5path_datasignal:
indew_range_new = _get_new_slice_limits(
filename_data=self.filename_data,
h5path=h5path_datasignal,
index_range=self.index_range,
index_range_last=self.index_range_last,
max_slice_size=self.max_slice_size,
)
if indew_range_new is None:
return out
ptdata_signal = _slice_dataset_offline(
filename_data=self.filename_data,
h5path_to_data=h5path_datasignal,
index_range=indew_range_new,
)
if ptdata_signal["dataset"] is not None:
out["dataset_signal"] = ptdata_signal["dataset"]
out["index_range"] = ptdata_signal["index_range"]
if h5path_datavariance:
ptdata_variance = _slice_dataset_offline(
filename_data=self.filename_data,
h5path_to_data=h5path_datavariance,
index_range=indew_range_new,
)
if ptdata_variance["dataset"] is not None:
out["dataset_variance"] = ptdata_variance["dataset"]
if h5path_datasigma:
ptdata_sigma = _slice_dataset_offline(
filename_data=self.filename_data,
h5path_to_data=h5path_datasigma,
index_range=indew_range_new,
)
if ptdata_sigma["dataset"] is not None:
out["dataset_sigma"] = ptdata_sigma["dataset"]
nb_frames_read = len(out["dataset_signal"])
index_range_sliced = out["index_range"]
if nb_frames_read > 0:
self.log_info(f"""
\n\tIncoming ({nb_frames_read} frames) in the range {index_range_sliced[0]} -> {index_range_sliced[-1]},
""")
else:
self.log_info("""
\n\tNo more data to read. End of the workflow.
""")
return out
[docs]
def process(self):
self.log_warning("Nothing implemented yet...")
[docs]
def get_processing_parameters(self) -> dict:
return {}
[docs]
def get_parameter(self, key: str, to_integer: bool = False, default=None):
value = self.get_input_value(key=key)
if value == missing_data.MISSING_DATA:
# Try to get it from header
value = self.headers.get(key, default)
if to_integer:
value = int(value)
return value
[docs]
def save(self):
if self.loop_nb == 1:
# Create the file only if it's loop_nb=1, do not overwrite
if Path(self.processing_filename).is_file():
raise RuntimeError(
f"Data cannot be saved as the file {self.processing_filename} already exists."
)
# Create directories
Path(self.processing_filename).parent.mkdir(parents=True, exist_ok=True)
self.log_info(f"Creating new processed file: {self.processing_filename}")
with self.create_id02_processed_file(
filename=self.processing_filename,
) as processing_file:
processing_file.write_ewoks_history(
list_ewoks_history=self.outputs.info_history
)
# Write/Overwrite explicit processing parameters (not headers)
processing_file.write_processing_parameters(
processing_parameters=self.get_processing_parameters()
)
with ExitStack() as stack:
processed_file = stack.enter_context(
self.open_id02_file(
filename=self.processing_filename,
mode="a",
)
)
stack.enter_context(lock)
if self.processing_type in PYFAI_PROCESSES:
# Update all three big datasets
if (
processed_file.nxdata_name
not in processed_file.root[processed_file.nxprocess_path]
):
# Create the nexus data group
processed_file.create_h5_group(
h5_group_path=processed_file.nxdata_path,
NX_class="NXdata",
default=processed_file.nxdata_path,
signal=ID02ProcessedFileHS32.DATASET_SIGNAL_NAME,
)
datatype = self.get_input_value("datatype", "float32")
processed_file.update_dataset(
added_dataset=self.outputs.dataset_signal,
h5_dataset_path=processed_file.nxdata_dataset_signal_path,
index_read=self.index_range_last,
datatype=datatype,
)
if self.get_input_value("save_variance", False):
processed_file.update_dataset(
added_dataset=self.outputs.dataset_variance,
h5_dataset_path=processed_file.nxdata_dataset_variance_path,
index_read=self.index_range_last,
datatype=datatype,
)
if self.get_input_value("save_sigma", True):
processed_file.update_dataset(
added_dataset=self.outputs.dataset_sigma,
h5_dataset_path=processed_file.nxdata_dataset_sigma_path,
index_read=self.index_range_last,
datatype=datatype,
)
# Update metadata
if self.get_input_value("save_metadata", True):
# Update subscan1
for stream_name, stream_array in self.streams_subscan1.items():
stream_name_raw = refactor_stream_name_raw(stream_name=stream_name)
stream_name_interpreted = refactor_stream_name_interpreted(
stream_name=stream_name
)
stream_values, slice_init, slice_end = self._read_from_stream(
stream_sliceable=stream_array,
stream_name=stream_name,
slice_init=self.index_range_last[0],
slice_end=self.index_range_last[-1],
)
processed_file.update_dataset(
added_dataset=stream_values,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ processed_file.MCS_RAW_NAME
/ processed_file.MCS_SUBSCAN1
/ stream_name_raw
),
index_read=[slice_init, slice_end],
)
processed_file.update_dataset(
added_dataset=stream_values,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ processed_file.MCS_INTERPRETED
/ stream_name_interpreted
),
index_read=[slice_init, slice_end],
)
# Update subscan2 if it exists
for stream_name, stream_array in self.streams_subscan2.items():
stream_name_raw = refactor_stream_name_raw(stream_name=stream_name)
stream_name_interpreted = refactor_stream_name_interpreted(
stream_name=stream_name
)
stream_values = stream_array[
self.index_range_last[0] : self.index_range_last[-1]
]
# Read without any interpolation to send to raw-subscan2
stream_values, slice_init, slice_end = self._read_from_stream(
stream_sliceable=stream_array,
stream_name=stream_name,
slice_init=0,
slice_end=len(stream_array),
)
if stream_values is None:
continue
processed_file.update_dataset(
added_dataset=stream_values,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ processed_file.MCS_RAW_NAME
/ processed_file.MCS_SUBSCAN2
/ stream_name_raw
),
index_read=None,
)
# Send the interpolated values to interpreted group
interpolated_values = self.read_from_stream_interpolate(
stream_sliceable=stream_array,
stream_slice=None,
)
if interpolated_values is None:
continue
processed_file.update_dataset(
added_dataset=interpolated_values,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ processed_file.MCS_INTERPRETED
/ stream_name_interpreted
),
)
# Update HS32 arrays
HS32C_array = self.get_HS32C_array()
processed_file.update_dataset(
added_dataset=HS32C_array,
h5_dataset_path=str(
Path(processed_file.mcs_path) / HS32.KEY_HS32_COUNTERS
),
index_read=self.index_range_last,
datatype=ID02ProcessedFileHS32.FORMAT_HS32C,
)
HS32V_array = self.get_HS32V_array()
processed_file.update_dataset(
added_dataset=HS32V_array,
h5_dataset_path=str(
Path(processed_file.mcs_path) / HS32.KEY_HS32_VALUES
),
index_read=self.index_range_last,
datatype=ID02ProcessedFileHS32.FORMAT_HS32V,
)
# Update exposure and delta times
exposuretime_values, slice_init, slice_end = (
self.get_exposuretime_values()
)
processed_file.update_dataset(
added_dataset=exposuretime_values,
h5_dataset_path=str(
Path(processed_file.mcs_path) / processed_file.KEY_EXPOSURE_TIME
),
index_read=[slice_init, slice_end],
)
stream_deltatime_name, stream_deltatime_array = (
self.get_stream_deltatime()
)
deltatime, slice_init, slice_end = self._read_from_stream(
stream_sliceable=stream_deltatime_array,
stream_name=stream_deltatime_name,
slice_init=self.index_range_last[0],
slice_end=self.index_range_last[-1],
)
processed_file.update_dataset(
added_dataset=deltatime,
h5_dataset_path=str(
Path(processed_file.tfg_path) / processed_file.KEY_DELTA_TIME
),
index_read=[slice_init, slice_end],
)
if self.processing_type in PYFAI_PROCESSES:
processed_file.update_dataset(
added_dataset=deltatime,
h5_dataset_path=processed_file.nxdata_delta_time_path,
index_read=[slice_init, slice_end],
h5_dataset_name="t",
unit="s",
)
# Update intensities/monitor arrays
stream_intensity0_name, stream_intensity0_array = (
self.get_stream_monitor_0()
)
intensity0_values, monitor0_slice_init, monitor0_slice_end = (
self._read_from_stream(
stream_sliceable=stream_intensity0_array,
stream_name=stream_intensity0_name,
slice_init=slice_init,
slice_end=slice_end,
)
)
intensity0_factor = float(self.headers.get(HS32.KEY_MONITOR_0_FACTOR))
intensity0uncor = intensity0_values * intensity0_factor
sot = float(self.headers.get(HS32.KEY_SOT, 0.0))
sct = float(self.headers.get(HS32.KEY_SCT, 0.0))
intensity0shutcor = (
intensity0uncor
* (exposuretime_values - sot + sct)
/ (exposuretime_values - sot)
)
processed_file.update_dataset(
added_dataset=intensity0shutcor,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ ID02ProcessedFileHS32.KEY_I0_SHUTCOR
),
index_read=[monitor0_slice_init, monitor0_slice_end],
)
processed_file.update_dataset(
added_dataset=intensity0uncor,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ ID02ProcessedFileHS32.KEY_I0_UNCOR
),
index_read=[monitor0_slice_init, monitor0_slice_end],
)
stream_intensity1_name, stream_intensity1_array = (
self.get_stream_monitor_1()
)
intensity1_values, monitor1_slice_init, monitor1_slice_end = (
self._read_from_stream(
stream_sliceable=stream_intensity1_array,
stream_name=stream_intensity1_name,
slice_init=slice_init,
slice_end=slice_end,
)
)
intensity1_factor = float(self.headers.get(HS32.KEY_MONITOR_1_FACTOR))
intensity1uncor = intensity1_values * intensity1_factor
intensity1shutcor = (
intensity1uncor
* (exposuretime_values - sot + sct)
/ (exposuretime_values - sot)
)
processed_file.update_dataset(
added_dataset=intensity1shutcor,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ ID02ProcessedFileHS32.KEY_I1_SHUTCOR
),
index_read=[monitor1_slice_init, monitor1_slice_end],
)
processed_file.update_dataset(
added_dataset=intensity1uncor,
h5_dataset_path=str(
Path(processed_file.mcs_path)
/ ID02ProcessedFileHS32.KEY_I1_UNCOR
),
index_read=[monitor1_slice_init, monitor1_slice_end],
)
# Save the processing params (only once except for normalization values)
# TODO
# if self.processing_type in PYFAI_PROCESSES:
# self._save_processing_params(root_group=root_group_destination)
# Update the TitleExtension
titleextension_template = self.headers.get(
HS32.KEY_TITLEEXTENSION_TEMPLATE
)
if not titleextension_template:
self.log_warning(
"There is no TitleExtensionTemplate in the header."
)
else:
title_extension_parsed, title_extension_formats = (
parse_titleextension_template(titleextension_template)
)
title_extension_values = {}
# We want to slice all the streams at once, and then build the array of strings
for titleextension_dict in title_extension_formats:
stream_name_from_titleextension = titleextension_dict[
"stream_name"
]
stream_values = None
title_extension_values[stream_name] = numpy.full(
shape=(self.index_range_last[1],), fill_value=numpy.nan
)
# Try first from subscan2
stream_name, stream_array = self.get_stream(
name=stream_name_from_titleextension, subscan_2=True
)
if stream_name:
stream_values = self.read_from_stream_interpolate(
stream_sliceable=stream_array,
stream_slice=None,
)
else:
stream_name, stream_array = self.get_stream(
name=stream_name_from_titleextension, subscan_2=False
)
if stream_name is None:
self.log_error(
f"{stream_name_from_titleextension} stream from TitleExtension could not be found"
)
continue
stream_values, slice_init, slice_end = (
self._read_from_stream(
stream_sliceable=stream_array,
stream_name=stream_name,
slice_init=0,
slice_end=self.index_range_last[-1],
)
)
if stream_values is None:
self.log_error(f"Stream {stream_name} could not be read")
continue
title_extension_values[stream_name_from_titleextension] = (
stream_values
)
# Now we go index by index, building the string cells
# The title extensions will always be the size of last index (from 0 -> last_frame)
new_title_extensions = numpy.full(
shape=(self.index_range_last[1],),
fill_value="",
dtype=h5py.string_dtype(encoding="utf-8"),
)
for index in range(len(new_title_extensions)):
format_index = {
stream_name: title_extension_values[stream_name][index]
for stream_name in title_extension_values
}
try:
new_title_extensions[index] = title_extension_parsed.format(
**format_index
)
except KeyError as e:
self.log_error(
f"{e} : {title_extension_parsed=}, {format_index=}"
)
processed_file.update_dataset(
added_dataset=new_title_extensions,
h5_dataset_path=str(
Path(processed_file.parameters_path)
/ ID02ProcessedFileHS32.KEY_TITLEEXTENSION
),
index_read=[0, self.index_range_last[1]],
)
# Update NexusDetector with metadata from the RAW_DATA file (has to be accesible).
# To be done only once and do not wait as the file will be accessible eventually.
if (
self.processing_type in PYFAI_PROCESSES
and processed_file.nxdetector_path not in processed_file
):
for file in (self.filename_data, self.filename_lima):
filedata = stack.enter_context(
self.open_id02_file(
filename=file,
)
)
if filedata is None:
continue
if filedata.nxdetector_path is not None:
nxdetector_grp = filedata[filedata.nxdetector_path]
if len(nxdetector_grp) > 0:
metadata_detector_output = (
processed_file.create_h5_group(
h5_group_path=processed_file.nxdetector_path,
NX_class="NXdetector",
)
)
update_nexus_detector_group(
nxdetector_group_destination=metadata_detector_output,
nxdetector_group_source=nxdetector_grp,
)
break
[docs]
def get_streams_HS32_scalers(self) -> list:
hs32_names = HS32.get_HS32_names(headers=self.headers)
streams_scalers = []
for key_name in hs32_names.values():
if self.scan_memory_url:
# In blissdata streams, they appear as scalers:{name}
stream_name = f"scalers:{key_name}"
else:
# In the file, they are saved with the raw refactored name (without scalers)
stream_name = key_name
stream_name, stream_array = self.get_stream(
name=stream_name,
)
streams_scalers.append((stream_name, stream_array))
return streams_scalers
[docs]
def get_streams_HS32_raw(self) -> list:
hs32_names = HS32.get_HS32_names(headers=self.headers)
streams_raw = []
for key_name in hs32_names.values():
if self.scan_memory_url:
# In blissdata streams, they appear as mcs:{name}_raw
stream_name = f"mcs:{key_name}_raw"
else:
# In the file, they are saved with the raw refactored name (without mcs)
stream_name = f"{key_name}_raw"
stream_name, stream_array = self.get_stream(
name=stream_name,
)
streams_raw.append((stream_name, stream_array))
return streams_raw
[docs]
def get_stream_slow_timer(self) -> tuple:
stream_name_slow_timer = None
stream_array_slow_timer = None
for stream_name, stream_array in self.streams_subscan2.items():
if "epoch" in stream_name:
stream_name_slow_timer, stream_array_slow_timer = (
stream_name,
stream_array,
)
break
return (stream_name_slow_timer, stream_array_slow_timer)
[docs]
def get_stream_fast_timer(self) -> tuple:
return self.get_stream(name="epoch", prefix="mcs")
[docs]
def get_stream_exposuretime(self) -> tuple:
"""
In blissdata, this stream is (normally) called scalers:time (in subscan1)
In the header, there is key "HSTime" pointing to the string 'time'
"""
key_name = self.headers.get(HS32.KEY_EXPOSURE_TIME)
stream_time_name = f"scalers:{key_name}"
return self.get_stream(
name=stream_time_name,
subscan_2=False,
)
[docs]
def get_stream_exposuretime_raw(self) -> tuple:
"""
In blissdata, this stream is (normally) called mcs:time_raw (in subscan1)
In the header, there is key "HSTime" pointing to the string 'time'
"""
key_name = self.headers.get(HS32.KEY_EXPOSURE_TIME)
stream_time_name = f"mcs:{key_name}_raw"
return self.get_stream(
name=stream_time_name,
subscan_2=False,
)
[docs]
def get_stream_deltatime(self) -> tuple:
if self.scan_memory_url:
stream_name = "mcs:elapsed_time"
else:
stream_name = "elapsed_time"
stream_name, stream_array = self.get_stream(
name=stream_name,
)
if not stream_name:
stream_name, stream_array = self.get_stream(
name="mcs_elapsed_time",
)
return (stream_name, stream_array)
[docs]
def get_stream(
self,
index_pin: int = None,
name: str = None,
header_key_pin: str = None,
prefix: str = "",
suffix: str = "",
subscan_2: bool = False,
) -> tuple:
stream_out_name, stream_out_array = (None, None)
if name is None:
if header_key_pin is not None:
name = self.headers.get(header_key_pin)
elif index_pin is not None:
name = self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}")
if name:
if prefix:
name = f"{prefix}:{name}"
if suffix:
name = f"{name}_{suffix}"
if subscan_2:
streams = self.streams_subscan2
else:
streams = self.streams_subscan1
stream_out_name, stream_out_array = match_stream(name=name, streams=streams)
return (stream_out_name, stream_out_array)
def _read_from_stream(
self,
stream_sliceable,
slice_init: int,
slice_end: int,
stream_name: str = "unknown",
datatype: str = "float32",
timeout: float = 0.0,
) -> tuple:
stream_values_out = None
if stream_sliceable is not None:
nb_available_frames = len(stream_sliceable)
if slice_end > nb_available_frames and timeout > 0.0:
t_ = time.perf_counter()
while (
slice_end > nb_available_frames
and (time.perf_counter() - t_) < timeout
):
time.sleep(0.1)
nb_available_frames = len(stream_sliceable)
if slice_init > nb_available_frames:
self.log_error(
f"Not enough frames in stream {stream_name} {stream_sliceable} ({nb_available_frames}). Requested init: {slice_init}"
)
if slice_end > nb_available_frames:
self.log_warning(
f"Not enough frames in stream {stream_name} {stream_sliceable} ({nb_available_frames}). Requested end: {slice_end}"
)
slice_end = nb_available_frames
try:
stream_values_out = stream_sliceable[slice_init:slice_end].astype(
datatype
)
except Exception as e:
self.log_error(f"{stream_sliceable} could not be sliced: {e}")
return stream_values_out, slice_init, slice_end
[docs]
def read_from_stream_interpolate(
self,
stream_sliceable,
stream_slice=None,
datatype: str = "float32",
) -> numpy.ndarray:
"""
stream_object is a sliceable object (numpy.ndarray or blissdata stream object)
Valid for all streams, from subscan1 and subscan2
"""
if stream_slice is None:
slice_init = 0
slice_end = len(stream_sliceable)
else:
slice_init, slice_end = stream_slice
stream_values = stream_sliceable[slice_init:slice_end]
stream_values = stream_values.astype(datatype)
# 2) Read all the values in the slow epoch counter
slow_epoch_stream_name, slow_epoch_stream_array = self.get_stream_slow_timer()
if slow_epoch_stream_name is None:
self.log_debug("Slow epoch stream could not be found.")
return stream_values
# Both epoch must be read with double precision
slow_epoch_values, slice_init, slice_end = self._read_from_stream(
stream_sliceable=slow_epoch_stream_array,
stream_name=slow_epoch_stream_name,
slice_init=0,
slice_end=len(slow_epoch_stream_array),
datatype="float64",
)
if slow_epoch_values is None:
self.log_error(f"Stream {slow_epoch_stream_name} could not be read.")
return stream_values
# 3) Match the slow streams
if len(slow_epoch_values) != len(stream_values):
nb_slow_frames = min(len(slow_epoch_values), len(stream_values))
slow_epoch_values = slow_epoch_values[0:nb_slow_frames]
stream_values = stream_values[0:nb_slow_frames]
# 4) Read the fast epoch counter
fast_epoch_stream_name, fast_epoch_stream_array = self.get_stream_fast_timer()
if fast_epoch_stream_name is None:
self.log_error("Fast epoch stream could not be found.")
return stream_values
fast_epoch_values, slice_init, slice_end = self._read_from_stream(
stream_sliceable=fast_epoch_stream_array,
stream_name=fast_epoch_stream_name,
slice_init=0,
slice_end=self.index_range_last[-1],
datatype="float64",
)
if fast_epoch_values is None:
self.log_error(f"Stream {fast_epoch_stream_name} could not be read.")
elif fast_epoch_values.size == 0:
self.log_error(f"Stream {fast_epoch_stream_name} is empty.")
return
# 5) Interpolate data, len(interpdata) = len(fast_epoch_values)
try:
return numpy.interp(fast_epoch_values, slow_epoch_values, stream_values)
except Exception as e:
self.log_error(f"Error during numpy interpolation: {e}")
return
[docs]
def get_HS32C_array(self):
nb_hs32_pins = HS32.get_HS32_number_pins(headers=self.headers)
new_HS32C_array = numpy.full(
(self.index_range_last[1] - self.index_range_last[0], nb_hs32_pins),
fill_value=-1,
dtype="float64",
)
streams_raw = self.get_streams_HS32_raw()
for index_pin, stream_info in enumerate(streams_raw):
stream_name, stream_array = stream_info
if stream_array is None:
continue
stream_values, slice_init, slice_end = self._read_from_stream(
stream_sliceable=stream_array,
stream_name=stream_name,
slice_init=self.index_range_last[0],
slice_end=self.index_range_last[-1],
)
if stream_values is not None:
new_HS32C_array[0 : len(stream_values), index_pin] = stream_values
return new_HS32C_array
[docs]
def get_HS32V_array(
self,
):
nb_hs32_pins = HS32.get_HS32_number_pins(headers=self.headers)
new_HS32V_array = numpy.zeros(
(self.index_range_last[1] - self.index_range_last[0], nb_hs32_pins),
dtype="float64",
)
streams_scalers = self.get_streams_HS32_scalers()
for index_pin, stream_info in enumerate(streams_scalers):
stream_name, stream_array = stream_info
if stream_array is None:
continue
stream_values, slice_init, slice_end = self._read_from_stream(
stream_sliceable=stream_array,
stream_name=stream_name,
slice_init=self.index_range_last[0],
slice_end=self.index_range_last[-1],
)
if stream_values is not None:
new_HS32V_array[0 : len(stream_values), index_pin] = stream_values
return new_HS32V_array
[docs]
def get_exposuretime_values(
self,
) -> tuple:
# Go first for the already normalized time values
stream_exposuretime_name, stream_exposuretime_array = (
self.get_stream_exposuretime()
)
exposuretime, slice_init, slice_end = self._read_from_stream(
stream_sliceable=stream_exposuretime_array,
stream_name=stream_exposuretime_name,
slice_init=self.index_range_last[0],
slice_end=self.index_range_last[-1],
)
if exposuretime is None:
# Try to go to the raw time
stream_exposuretime_name, stream_exposuretime_array = (
self.get_stream_exposuretime_raw()
)
exposuretime, slice_init, slice_end = self._read_from_stream(
stream_sliceable=stream_exposuretime_array,
stream_name=stream_exposuretime_name,
slice_init=self.index_range_last[0],
slice_end=self.index_range_last[-1],
)
if exposuretime is None:
self.log_error("Exposure time could not be read.")
return None, None, None
# This is a way to get the factor
nb_pins = HS32.get_HS32_number_pins(headers=self.headers)
pin_name = self.headers.get(HS32.KEY_EXPOSURE_TIME)
factor_exposuretime = None
for index_pin in range(nb_pins):
if (
self.headers.get(f"{HS32.KEY_HS32_NAME}{(index_pin + 1):02}")
== pin_name
):
factor_exposuretime = float(
self.headers.get(f"{HS32.KEY_HS32_FACTOR}{(index_pin + 1):02}")
)
break
if factor_exposuretime is None:
self.log_error("Default exposure time factor 1.0 will be used")
factor_exposuretime = 1.0
exposuretime *= factor_exposuretime
return exposuretime, slice_init, slice_end
[docs]
def get_stream_monitor_0(self) -> tuple:
key_name = self.headers.get(HS32.KEY_MONITOR_0)
stream_name = f"scalers:{key_name}"
return self.get_stream(name=stream_name)
[docs]
def get_stream_monitor_1(self) -> tuple:
key_name = self.headers.get(HS32.KEY_MONITOR_1)
stream_name = f"scalers:{key_name}"
return self.get_stream(name=stream_name)