Source code for ewoksid02.utils.blissdata

import logging
import os
import time
from importlib.metadata import version
from packaging.version import Version
from typing import Any, Dict, Optional, Tuple
import h5py
import numpy
import numpy as np
from blissdata.beacon.data import BeaconData
from blissdata.h5api import dynamic_hdf5

if Version(version("blissdata")) >= Version("2.3.0"):
    from blissdata.exceptions import (
        IndexNoMoreThereError,
        IndexNotYetThereError,
        IndexWontBeThereError,
    )
else:
    from blissdata.redis_engine.exceptions import (
        IndexNoMoreThereError,
        IndexNotYetThereError,
        IndexWontBeThereError,
    )
from blissdata.redis_engine.scan import Scan
from blissdata.redis_engine.store import DataStore
from silx.io.h5py_utils import open_item as open_item_silx

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

HEADERS_KEY_MONITOR = "HSI1"
HEADERS_KEY_EXPOSURE_TIME = "HSTime"

LIMA_URL_TEMPLATE_ID02 = (
    "{dirname}/{images_prefix}{{file_index}}.h5::/entry_0000/measurement/data"
)
IMAGE_PREFIX_TEMPLATE_ID02 = "{collection_name}_{img_acq_device}_{scan_number}_"
FILE_SCAN_FORMAT_ID02 = "{COLLECTION_DATASET}_{SCAN:05}_{DETECTOR}"


[docs] def get_datastore(beacon_host: str = None) -> DataStore: """Returns the datastore object from blissdata Inputs: - beacon_host (str) : hostname and beacon port """ try: os.environ["BEACON_HOST"] = beacon_host datastore = DataStore(url=BeaconData().get_redis_data_db()) return datastore except Exception: return
[docs] def load_scan( scan_memory_url: str, wait_until_start: bool = True, beacon_host: str = None ) -> Scan: """ Loads a scan from the data store using the provided scan memory URL. Inputs: - scan_memory_url (str): The URL of the scan memory to load. - wait_until_start (bool, optional): Whether to wait until the scan starts. Defaults to True. - beacon_host (str) : hostname and beacon port Outputs: - Scan: The loaded scan object. """ datastore = get_datastore(beacon_host=beacon_host) if not datastore: return if Version(version("blissdata")) >= Version("2.0.0"): scan = datastore.load_scan(key=scan_memory_url) else: scan = datastore.load_scan(key=scan_memory_url, scan_cls=Scan) if wait_until_start: while scan.state < 2: scan.update(block=False) return scan
[docs] def get_stream( stream_name: str = None, detector_name: str = None, scan: Scan = None, scan_memory_url: str = None, beacon_host: str = None, ): """ Retrieves the (Lima) stream for a specific detector from the scan memory. It can also be another stream (for scalers process for example) Inputs: - stream_name (str) : Name of stream to slice - detector_name (str): The name of the detector. - scan (Scan): blissdata.redis_engine.scan.Scan object - scan_memory_url (str): The URL of the scan memory. - beacon_host (str) : hostname and beacon port Outputs: - LimaStream: The Lima stream object for the specified detector. """ if not stream_name and detector_name: stream_name = f"{detector_name}:image" if not scan: scan = load_scan( scan_memory_url=scan_memory_url, wait_until_start=True, beacon_host=beacon_host, ) if stream_name.split(":")[-1] == "image": if Version(version("blissdata")) >= Version("2.0.0"): stream = scan.streams.get(stream_name) else: from blissdata.stream import LimaStream stream_ref = scan.streams.get(stream_name) if stream_ref is None: return stream = LimaStream(stream=stream_ref) else: stream = scan.streams.get(stream_name) return stream
[docs] def get_lima_url_template_args_id02( scan_number: int, detector_name: str, collection_name: str = None, scan_number_format: str = "%05d", image_prefix_template: str = IMAGE_PREFIX_TEMPLATE_ID02, ) -> Optional[Dict[str, str]]: if scan_number is None: return lima_url_template_args = { "images_prefix": image_prefix_template.format( collection_name=collection_name, img_acq_device=detector_name, scan_number=scan_number_format % scan_number, ) } return lima_url_template_args
[docs] def get_length_dataset_dynamic_file( filename_data: str, scan_nb: int, detector_name: str, lima_url_template="", lima_url_template_args={}, subscan=1, ): params_dynamic_file = { "file": filename_data, "lima_names": [detector_name], "lima_url_template": lima_url_template, "lima_url_template_args": lima_url_template_args, } with dynamic_hdf5.File(**params_dynamic_file) as root: lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{detector_name}/data"] length_dataset = len(lima_dataset) return length_dataset
[docs] def get_length_dataset_static_file( filename_data: str, data_path: str, ): with h5py.File(filename_data, "r") as f: if data_path in f: return len(f[data_path]) else: logger.error(f"{data_path} not found in {filename_data}") return
[docs] def track_length_dataset_dynamic_file( lima_name, scan_memory_url, beacon_host=None, lima_url_template="", lima_url_template_args={}, subscan=1, **kwargs, ): scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host) while scan.state < 2: scan.update(block=False) master_filename = scan.info["filename"] scan_nb = scan.info["scan_nb"] nb_points = scan.info["npoints"] params_dynamic_file = { "file": master_filename, "lima_names": [lima_name], "lima_url_template": lima_url_template, "lima_url_template_args": lima_url_template_args, **kwargs, } wait = True while wait: with dynamic_hdf5.File(**params_dynamic_file) as root: lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{lima_name}/data"] length = len(lima_dataset) if length == nb_points: wait = False elif scan.state == 4: length = len(lima_dataset) wait = False else: scan.update(block=False) time.sleep(1) yield length final_length = get_length_dataset_dynamic_file( detector_name=lima_name, scan_memory_url=scan_memory_url, lima_url_template=lima_url_template, lima_url_template_args=lima_url_template_args, subscan=subscan, **kwargs, ) if final_length != length: return final_length
[docs] def get_length_lima_stream( detector_name, scan: Scan = None, scan_memory_url: str = None, beacon_host: str = None, lima_url_template="", lima_url_template_args={}, subscan=1, ): limastream = get_stream( scan=scan, scan_memory_url=scan_memory_url, detector_name=detector_name, beacon_host=beacon_host, ) # To overcome Lima bug with memory when mask processing is active try: # _ = limastream[0] # Check the len of a limastream does not crash regarding the Lima Bug last_index_available = len(limastream) except Exception: logger.warning("Data is no more available from Lima memory") if not scan: scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host) last_index_available = get_length_dataset_dynamic_file( filename_data=scan.info["filename"], scan_nb=scan.info["scan_nb"], detector_name=detector_name, lima_url_template=lima_url_template, lima_url_template_args=lima_url_template_args, subscan=subscan, ) return last_index_available
[docs] def track_length_dataset( lima_name, scan_memory_url, beacon_host=None, lima_url_template="", lima_url_template_args={}, subscan=1, **kwargs, ): scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host) nb_points = scan.info["npoints"] limastream_params = { "scan_memory_url": scan_memory_url, "detector_name": lima_name, } wait = True memory_available = True last_index_available = 0 while wait: if memory_available: limastream = get_stream(**limastream_params) try: _ = limastream[0] last_index_available = len(limastream) logger.info("Data is available from Lima memory") except IndexNotYetThereError: pass except IndexNoMoreThereError: logger.warning("Data is no more available from Lima memory") memory_available = False except RuntimeError: pass if last_index_available == nb_points: wait = False elif scan.state == 4: last_index_available = len(limastream) wait = False else: scan.update(block=False) time.sleep(1) else: logger.info("Data is only available in the files") params_dynamic_file = { "file": scan.info["filename"], "lima_names": [lima_name], "lima_url_template": lima_url_template, "lima_url_template_args": lima_url_template_args, **kwargs, } with dynamic_hdf5.File(**params_dynamic_file) as root: scan_nb = scan.info["scan_nb"] lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{lima_name}/data"] last_index_available = len(lima_dataset) if last_index_available == nb_points: wait = False elif scan.state == 4: last_index_available = len(lima_dataset) wait = False else: scan.update(block=False) time.sleep(1) yield last_index_available
[docs] def track_dataset( lima_name, scan_memory_url, beacon_host=None, lima_url_template="", lima_url_template_args={}, subscan=1, max_slice_size=10, start_from_memory=True, **kwargs, ): scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host) nb_points = scan.info["npoints"] limastream_params = { "scan_memory_url": scan_memory_url, "detector_name": lima_name, } wait = True memory_available = start_from_memory last_index_read = 0 while wait: dataset = None if memory_available: try: scan.update(block=False) limastream = get_stream(**limastream_params) except Exception: wait = False continue try: _ = limastream[0] last_index_available = len(limastream) slice_end = min(last_index_read + max_slice_size, last_index_available) dataset = limastream[last_index_read:slice_end] if dataset and len(dataset) == 0: continue logger.info( f"Data retrieved from Lima memory: {slice_end - last_index_read} frames:" ) last_index_read = slice_end except IndexNotYetThereError: continue except IndexNoMoreThereError: logger.warning( "Data is no more available from Lima memory. Switching to h5api..." ) memory_available = False continue except RuntimeError: continue if last_index_read == nb_points: wait = False elif scan.state == 4: limastream = get_stream(**limastream_params) try: length = len(limastream) if length == last_index_read: wait = False except Exception: memory_available = False continue if not memory_available: params_dynamic_file = { "file": scan.info["filename"], "lima_names": [lima_name], "lima_url_template": lima_url_template, "lima_url_template_args": lima_url_template_args, **kwargs, } with dynamic_hdf5.File(**params_dynamic_file) as root: scan_nb = scan.info["scan_nb"] lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{lima_name}/data"] length = len(lima_dataset) if length > last_index_read: slice_end = min(last_index_read + max_slice_size, length) dataset = lima_dataset[last_index_read:slice_end] logger.info(f"Data retrieved from hdf5 file: {len(dataset)} frames") last_index_read = slice_end if last_index_read == nb_points: wait = False elif scan.state == 4 and last_index_read == len(lima_dataset): wait = False else: scan.update(block=False) yield dataset
[docs] def get_available_dataset( lima_name: str, scan_memory_url: str = "", lima_url_template: str = "", lima_url_template_args: Dict[str, Any] = {}, scan_nb: int = None, subscan: int = 1, last_index_read: int = 0, max_slice_size: int = 10, start_from_memory: bool = True, range_index_read: Optional[Tuple[int, int]] = None, data_filename: str = None, ) -> Optional[np.ndarray]: """ Retrieves the available dataset from either Lima memory or an HDF5 file. Args: lima_name (str): Name of the detector. scan_memory_url (str): URL to the scan memory. lima_url_template (str, optional): Template for Lima file URLs. Defaults to "". lima_url_template_args (dict, optional): Arguments for the Lima URL template. Defaults to {}. subscan (int, optional): Subscan number. Defaults to 1. last_index_read (int, optional): Last index read from the dataset. Defaults to 0. max_slice_size (int, optional): Maximum number of frames to read in one slice. Defaults to 10. start_from_memory (bool, optional): Whether to start reading from memory. Defaults to True. Returns: Optional[np.ndarray]: The retrieved dataset, or None if no data is available. """ if scan_memory_url: return _slice_dataset_online( scan_memory_url=scan_memory_url, detector_name=lima_name, lima_url_template=lima_url_template, lima_url_template_args=lima_url_template_args, subscan=subscan, last_index_read=last_index_read, max_slice_size=max_slice_size, start_from_memory=start_from_memory, range_index_read=range_index_read, ) else: return _slice_dataset_offline( filename_data=data_filename, scan_nb=scan_nb, subscan=subscan, detector_name=lima_name, last_index_read=last_index_read, max_slice_size=max_slice_size, range_index_read=range_index_read, )
def _get_nb_frames_available_from_stream( detector_name: str = None, stream_name: str = None, scan: Scan = None, scan_memory_url: str = None, beacon_host: str = None, index_range_last: tuple = None, ): if not scan: scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host) if index_range_last is None: last_index = 0 else: last_index = index_range_last[-1] wait_for_data = True while wait_for_data: try: scan.update(block=False) except Exception as e: # Handle canceled or non-existent scans logger.warning(f"Scan canceled or does not exist. Exiting. {e}") wait_for_data = False continue stream = get_stream( stream_name=stream_name, detector_name=detector_name, scan=scan, scan_memory_url=scan_memory_url, beacon_host=beacon_host, ) nb_available_frames = len(stream) if last_index >= nb_available_frames: continue if nb_available_frames > 0: wait_for_data = False return nb_available_frames def _get_nb_frames_available_from_file( filename_data: str, h5path: str, ): params = { "filename": filename_data, "name": "/", } with open_item_silx(**params) as root: if h5path not in root: logger.error( f"Attempt to read data offline but {h5path} not in {filename_data}" ) return h5item = root[h5path] if isinstance(h5item, h5py.Dataset): # Normally is a lima dataset nb_frames_available = len(h5item) elif isinstance(h5item, h5py.Group): # In the case of scalers, we have to read the limits looking at all the counters nb_frames_available = min( [len(h5item[i]) for i in h5item if isinstance(h5item[i], h5py.Dataset)] ) else: logger.error(f"{h5item} not valid to read data") return return nb_frames_available def _get_new_slice_limits( detector_name: str = None, stream_name: str = None, scan: Scan = None, scan_memory_url: str = None, beacon_host: str = None, filename_data: str = None, h5path: str = None, index_range=None, index_range_last=None, max_slice_size: int = 50, ): # Get the minimum index to read if index_range_last is None: # Nothing has been read yet if index_range is None: # No specific range requested slice_init = 0 else: # There is a requested range slice_init = index_range[0] else: # Continue from the previous last index slice_init = index_range_last[-1] slice_end = slice_init + max_slice_size if scan_memory_url or scan: if not scan: scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host) stream_params = { "stream_name": stream_name, "detector_name": detector_name, "scan": scan, "scan_memory_url": scan_memory_url, "beacon_host": beacon_host, } wait_for_data = True logger.info("Waiting for data...") while wait_for_data: try: scan.update(block=False) except Exception as e: # Handle canceled or non-existent scans logger.warning(f"Scan canceled or does not exist. Exiting. {e}") wait_for_data = False return if scan.state == 4: stream = get_stream(**stream_params) if not stream: logger.error(f"No detected stream for {stream_name}. Aborting.") wait_for_data = False return nb_available_frames = len(stream) # The scan is over, check the length and out of the loop wait_for_data = False continue else: # The scan is not over, if there are no new frames, continue waiting stream = get_stream(**stream_params) if not stream: continue nb_available_frames = len(stream) if nb_available_frames > slice_init: # There are new frames, out of the loop wait_for_data = False continue else: continue else: params = { "filename": filename_data, "name": "/", } with open_item_silx(**params) as root: if h5path not in root: logger.error( f"Attempt to read data offline but {h5path} not in {filename_data}" ) return h5item = root[h5path] if isinstance(h5item, h5py.Dataset): # Normally is a lima dataset nb_available_frames = len(h5item) elif isinstance(h5item, h5py.Group): # In the case of scalers, we have to read the limits looking at all the counters nb_available_frames = min( [ len(h5item[i]) for i in h5item if isinstance(h5item[i], h5py.Dataset) ] ) else: logger.error(f"{h5item} not valid to read data") return slice_end = min(slice_end, nb_available_frames) if index_range is not None: slice_end = min(slice_end, index_range[-1]) if slice_init >= slice_end: return return [slice_init, slice_end]
[docs] def track_slice_limits( detector_name: str = None, scan: Scan = None, scan_memory_url: str = None, beacon_host: str = None, filename_data: str = None, h5path: str = None, index_range=None, max_slice_size: int = 50, ): new_slice_limits = None while True: new_slice_limits = _get_new_slice_limits( detector_name=detector_name, scan=scan, scan_memory_url=scan_memory_url, beacon_host=beacon_host, filename_data=filename_data, h5path=h5path, max_slice_size=max_slice_size, index_range=index_range, index_range_last=new_slice_limits, ) if new_slice_limits is None: break yield new_slice_limits
def _slice_dataset_online( stream_name: str = None, detector_name: str = None, scan: Scan = None, scan_memory_url: str = None, beacon_host: str = None, lima_url_template: str = "", lima_url_template_args: Dict[str, Any] = {}, subscan: int = 1, index_range: tuple = None, start_from_memory: bool = True, ) -> dict: """ Reads a dataset from an online source, either from Lima memory or through dynamic hdf5 file. This method attempts to retrieve data from Lima memory first. If the data is no longer available in memory, it falls back to reading from an HDF5 file. Args: scan_memory_url (str): URL to the scan memory. lima_name (str): Name of the detector. lima_url_template (str, optional): Template for Lima file URLs. Defaults to "". lima_url_template_args (Dict[str, Any], optional): Arguments for the Lima URL template. Defaults to an empty dictionary. subscan (int, optional): Subscan number. Defaults to 1. last_index_read (int, optional): The last index read from the dataset. Defaults to 0. max_slice_size (int, optional): Maximum number of frames to read in one slice. Defaults to 10. start_from_memory (bool, optional): Whether to start reading from Lima memory. Defaults to True. range_index_read (tuple[int, int] | None, optional): Range of indices to read. If None, reads all available data. Defaults to None. Returns: None: The method does not return a value directly. Instead, it processes the dataset and logs the retrieved data. """ if not stream_name and detector_name: stream_name = f"{detector_name}:image" if not scan: scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host) memory_available = start_from_memory dataset = None wait_for_data = True while wait_for_data: if memory_available: try: scan.update(block=False) stream = get_stream( stream_name=stream_name, scan=scan, scan_memory_url=scan_memory_url, beacon_host=beacon_host, ) except Exception as e: # Handle canceled or non-existent scans logger.warning(f"Scan canceled or does not exist. Exiting. {e}") wait_for_data = False continue try: # Check if the requested frame is available in memory if index_range is None: slice_init = 0 slice_end = len(stream) else: slice_init, slice_end = index_range # _ = stream[slice_init] # _ = stream[slice_end] dataset = stream[slice_init:slice_end] if dataset is not None: if len(dataset) == 0: dataset = None continue if len(dataset) > 0: wait_for_data = False logger.info( f"Data retrieved from {detector_name} Lima memory: {len(dataset)} frames between {slice_init} -> {slice_end}" ) else: if scan.state == 4: wait_for_data = False continue except IndexNotYetThereError: # Frame not yet available continue except IndexNoMoreThereError: logger.warning( f"Data is no more available from {detector_name} Lima memory. Switching to h5api..." ) memory_available = False continue except IndexWontBeThereError: logger.warning(f"No more data can be retrieved from {detector_name}") wait_for_data = False continue except Exception as e: print( f"Exception! Looks like {detector_name} Lima memory is not reachable. Switching to file... {e}" ) memory_available = False continue if not memory_available: params_dynamic_file = { "file": scan.info["filename"], "lima_names": [detector_name], "lima_url_template": lima_url_template, "lima_url_template_args": lima_url_template_args, # "prioritize_non_native_h5items" : True, } with dynamic_hdf5.File(**params_dynamic_file) as root: scan_nb = scan.info["scan_nb"] dset_data = root[f"{scan_nb}.{subscan}/instrument/{detector_name}/data"] if index_range is None: slice_init = 0 slice_end = len(dset_data) else: slice_init, slice_end = index_range dataset = dset_data[slice_init:slice_end, :, :] if dataset is not None: if len(dataset) == 0: dataset = None continue if len(dataset) > 0: wait_for_data = False logger.info( f"Data retrieved from hdf5 file: {len(dataset)} frames between {slice_init} -> {slice_end}" ) return {"dataset": dataset, "index_range": (slice_init, slice_end)} def _slice_dataset_offline( filename_data: str, h5path_to_data: str, index_range: tuple = None, ) -> dict: """ Reads a dataset from an HDF5 file, handling different possible structures. Args: filename_data (str): Path to the HDF5 file. h5path_to_data (str): Path to the H5 dataset with the data. index_range (tuple) : Will attempt to slice between these two limits. Returns: numpy.ndarray: The sliced dataset, or None if no data is found. """ out = { "dataset": None, "index_range": None, } with open_item_silx(filename=filename_data, name="/") as root: if h5path_to_data in root: dset = root[h5path_to_data] nb_frames_available = len(dset) if index_range is None: slice_init = 0 slice_end = nb_frames_available else: slice_init = min(index_range[0], nb_frames_available) slice_end = min(index_range[-1], nb_frames_available) if slice_init >= slice_end: logger.error( f"{filename_data}:{h5path_to_data} cannot be sliced between {slice_init} -> {slice_end}" ) else: logger.info( f"Data retrieved from hdf5 file: {slice_end - slice_init} frames between {slice_init} -> {slice_end}" ) out["dataset"] = dset[slice_init:slice_end] out["index_range"] = (slice_init, slice_end) return out
[docs] def read_dataset_offline( filename_data: str, detector_name: str, scan_nb: int, last_index_read: int, max_slice_size: int = 100, range_index_read: Optional[Tuple[int, int]] = None, ) -> tuple: dataset = None with h5py.File(filename_data, "r") as f: path_to_data_signal = f"{scan_nb}.1/instrument/{detector_name}/data" if path_to_data_signal not in f: logger.error(f"Dataset {path_to_data_signal} not found in {filename_data}") return dataset = f[path_to_data_signal] length = len(dataset) slice_init = last_index_read if range_index_read is None: slice_end = min( last_index_read + max_slice_size, length, ) else: slice_end = min(range_index_read[-1], length) data_signal = dataset[slice_init:slice_end] return data_signal
[docs] def do_continue_pipeline( detector_name: str = None, scan_memory_url=None, beacon_host: str = None, last_index_read=0, lima_url_template="", lima_url_template_args={}, subscan=1, filename_data=None, path_to_data_signal: str = None, # To be used for static files ) -> bool: """ Checks if there are still frames to be read from a running/complete scan or from a file """ logger.info( f"Checking if there are still frames to read. Last index read: {last_index_read}" ) if scan_memory_url: return continue_pipeline_bliss( scan_memory_url=scan_memory_url, beacon_host=beacon_host, detector_name=detector_name, last_index_read=last_index_read, subscan=subscan, lima_url_template=lima_url_template, lima_url_template_args=lima_url_template_args, ) elif filename_data: return continue_pipeline_offline( filename_data=filename_data, last_index_read=last_index_read, path_to_data_signal=path_to_data_signal, )
[docs] def continue_pipeline_bliss( detector_name: str, scan: Scan = None, scan_memory_url: str = None, beacon_host: str = None, last_index_read: int = 0, subscan: int = 1, lima_url_template: str = None, lima_url_template_args: dict = None, ): try: if not scan: scan = load_scan( scan_memory_url=scan_memory_url, beacon_host=beacon_host, wait_until_start=False, ) state = scan.state except Exception as e: logger.error(f"scan {scan_memory_url} could not be loaded!: {e}") return False if state < 2: logger.info("Scan started but acquisition did not. Wait, data is coming") return True elif state in (2, 3): logger.info("Scan is running. Wait for data") return True if state == 4: logger.info("Scan is complete") if not scan.streams: logger.warning( "\n\tThe scan is complete and does not contain any streams. End of the workflow.\n\t" ) return False if f"{detector_name}:image" not in scan.streams: logger.error(f"There is no stream {detector_name}:image in the scan") return False current_length = get_length_lima_stream( scan=scan, scan_memory_url=scan_memory_url, beacon_host=beacon_host, detector_name=detector_name, lima_url_template=lima_url_template, lima_url_template_args=lima_url_template_args, subscan=subscan, ) if current_length is None: return False logger.info(f"Current length of the dataset: {current_length}") if current_length == last_index_read: logger.info("\n\tNo more frames to read. End of the workflow\n\t") return False elif current_length > last_index_read: logger.info("There are still frames to read. Continue") return True else: logger.error( "There are more read then stored frames. Something went wrong!" ) return False
[docs] def continue_pipeline_offline( filename_data: str, last_index_read: int = 0, path_to_data_signal: str = None, ): current_length = get_length_dataset_static_file( filename_data=filename_data, data_path=path_to_data_signal, ) if current_length is None: return False logger.info( f"Current length of the dataset: {current_length}. Last index read: {last_index_read}" ) if current_length == last_index_read: logger.info("\n\tNo more frames to read. End of the workflow\n\t") return False elif current_length > last_index_read: logger.info("There are still frames to read. Continue") return True else: logger.error("There are more read then stored frames. Something went wrong!") return True
[docs] def read_blissdata_stream(stream, range_to_read: list) -> numpy.ndarray: """ Centralized method to slice of blissdata stream """ if range_to_read is None: range_to_read = [None, None] else: attempts = 0 while len(stream) < range_to_read[1] and attempts < 10: time.sleep(0.1) attempts += 1 if len(stream) < range_to_read[1]: logger.error( f"Requested range {range_to_read} is larger than the stream length {len(stream)}. Adjusting range." ) try: read_values = stream[range_to_read[0] : range_to_read[1]] return read_values except Exception as e: logger.error(f"Error reading stream {stream} in {range_to_read=}: {e}") return None
[docs] def does_scan_contain_subscan2(scan: Scan) -> bool: nb_acq_chains = len(scan.info["acquisition_chain"]) if nb_acq_chains == 1: # Standard scan return False elif nb_acq_chains == 2: # Subscan2 return True else: # Unkown scenario return
[docs] def get_stream_names_subscan1(scan: Scan) -> list: acquisition_chains = scan.info.get("acquisition_chain") if not acquisition_chains: logger.error(f"There is no acquisition chains in {scan}") return if does_scan_contain_subscan2(scan=scan): acquisition_chain1 = acquisition_chains.get("mcs") if not acquisition_chain1: logger.error(f"No mcs acq. chain in {scan}") return [] else: acquisition_chain1 = next(iter(acquisition_chains.values())) streams = [ stream for stream in _get_stream_names_from_acquisition_chain( acq_chain=acquisition_chain1 ) if stream in scan.streams ] return streams
[docs] def get_stream_names_subscan2(scan: Scan) -> list: acquisition_chains = scan.info.get("acquisition_chain") if not acquisition_chains: logger.error(f"There is no acquisition chains in {scan}") return if does_scan_contain_subscan2(scan=scan): acquisition_chain2 = acquisition_chains.get("sampling_timer") if not acquisition_chain2: logger.info(f"No sampling_timer acq. chain in {scan}") return [] streams = [ stream for stream in _get_stream_names_from_acquisition_chain( acq_chain=acquisition_chain2 ) if stream in scan.streams ] return streams else: logger.debug(f"There is no subscan2 in {scan}") return []
# def _get_streams_subscan2(scan: Scan) -> list: # acquisition_chains = scan.info.get("acquisition_chain") # acquisition_chain2 = acquisition_chains.get("sampling_timer") # streams = [ # stream # for stream in _get_stream_names_from_acquisition_chain(acq_chain=acquisition_chain2) # if stream in scan.streams # ] # return streams def _get_stream_names_from_acquisition_chain( acq_chain: dict, include_images: bool = False ) -> list: master = acq_chain.get("master", {}) if not master: return [] stream_names = [] for acq_chain_type in (acq_chain, master): for stream_type in ("scalars", "spectra"): stream_names += acq_chain_type.get(stream_type) if include_images: stream_names += acq_chain_type.get("images") return stream_names