import logging
import os
import time
from importlib.metadata import version
from typing import Any, Dict, Optional, Tuple
import h5py
import numpy
import numpy as np
from blissdata.beacon.data import BeaconData
from blissdata.h5api import dynamic_hdf5
from packaging.version import Version
if Version(version("blissdata")) >= Version("2.3.0"):
from blissdata.exceptions import (
IndexNoMoreThereError,
IndexNotYetThereError,
IndexWontBeThereError,
)
else:
from blissdata.redis_engine.exceptions import (
IndexNoMoreThereError,
IndexNotYetThereError,
IndexWontBeThereError,
)
from blissdata.redis_engine.scan import Scan
from blissdata.redis_engine.store import DataStore
from silx.io.h5py_utils import open_item as open_item_silx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
HEADERS_KEY_MONITOR = "HSI1"
HEADERS_KEY_EXPOSURE_TIME = "HSTime"
LIMA_URL_TEMPLATE_ID02 = (
"{dirname}/{images_prefix}{{file_index}}.h5::/entry_0000/measurement/data"
)
IMAGE_PREFIX_TEMPLATE_ID02 = "{collection_name}_{img_acq_device}_{scan_number}_"
FILE_SCAN_FORMAT_ID02 = "{COLLECTION}_{DETECTOR}_{SCAN:05}"
[docs]
def get_datastore(beacon_host: str = None) -> DataStore:
"""Returns the datastore object from blissdata
Inputs:
- beacon_host (str) : hostname and beacon port
"""
try:
os.environ["BEACON_HOST"] = beacon_host
datastore = DataStore(url=BeaconData().get_redis_data_db())
return datastore
except Exception:
return
[docs]
def load_scan(
scan_memory_url: str, wait_until_start: bool = True, beacon_host: str = None
) -> Scan:
"""
Loads a scan from the data store using the provided scan memory URL.
Inputs:
- scan_memory_url (str): The URL of the scan memory to load.
- wait_until_start (bool, optional): Whether to wait until the scan starts. Defaults to True.
- beacon_host (str) : hostname and beacon port
Outputs:
- Scan: The loaded scan object.
"""
datastore = get_datastore(beacon_host=beacon_host)
if not datastore:
return
if Version(version("blissdata")) >= Version("2.0.0"):
scan = datastore.load_scan(key=scan_memory_url)
else:
scan = datastore.load_scan(key=scan_memory_url, scan_cls=Scan)
if wait_until_start:
while scan.state < 2:
scan.update(block=False)
return scan
[docs]
def get_stream(
stream_name: str = None,
detector_name: str = None,
scan: Scan = None,
scan_memory_url: str = None,
beacon_host: str = None,
):
"""
Retrieves the (Lima) stream for a specific detector from the scan memory.
It can also be another stream (for scalers process for example)
Inputs:
- stream_name (str) : Name of stream to slice
- detector_name (str): The name of the detector.
- scan (Scan): blissdata.redis_engine.scan.Scan object
- scan_memory_url (str): The URL of the scan memory.
- beacon_host (str) : hostname and beacon port
Outputs:
- LimaStream: The Lima stream object for the specified detector.
"""
if not stream_name and detector_name:
stream_name = f"{detector_name}:image"
if not scan:
scan = load_scan(
scan_memory_url=scan_memory_url,
wait_until_start=True,
beacon_host=beacon_host,
)
if stream_name.split(":")[-1] == "image":
if Version(version("blissdata")) >= Version("2.0.0"):
stream = scan.streams.get(stream_name)
else:
from blissdata.stream import LimaStream
stream_ref = scan.streams.get(stream_name)
if stream_ref is None:
return
stream = LimaStream(stream=stream_ref)
else:
stream = scan.streams.get(stream_name)
return stream
[docs]
def get_lima_url_template_args_id02(
scan_number: int,
detector_name: str,
collection_name: str = None,
scan_number_format: str = "%05d",
image_prefix_template: str = IMAGE_PREFIX_TEMPLATE_ID02,
) -> Optional[Dict[str, str]]:
if scan_number is None:
return
lima_url_template_args = {
"images_prefix": image_prefix_template.format(
collection_name=collection_name,
img_acq_device=detector_name,
scan_number=scan_number_format % scan_number,
)
}
return lima_url_template_args
[docs]
def get_length_dataset_dynamic_file(
filename_data: str,
scan_nb: int,
detector_name: str,
lima_url_template="",
lima_url_template_args={},
subscan=1,
):
params_dynamic_file = {
"file": filename_data,
"lima_names": [detector_name],
"lima_url_template": lima_url_template,
"lima_url_template_args": lima_url_template_args,
}
with dynamic_hdf5.File(**params_dynamic_file) as root:
lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{detector_name}/data"]
length_dataset = len(lima_dataset)
return length_dataset
[docs]
def get_length_dataset_static_file(
filename_data: str,
data_path: str,
):
with h5py.File(filename_data, "r") as f:
if data_path in f:
return len(f[data_path])
else:
logger.error(f"{data_path} not found in {filename_data}")
return
[docs]
def track_length_dataset_dynamic_file(
lima_name,
scan_memory_url,
beacon_host=None,
lima_url_template="",
lima_url_template_args={},
subscan=1,
**kwargs,
):
scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host)
while scan.state < 2:
scan.update(block=False)
master_filename = scan.info["filename"]
scan_nb = scan.info["scan_nb"]
nb_points = scan.info["npoints"]
params_dynamic_file = {
"file": master_filename,
"lima_names": [lima_name],
"lima_url_template": lima_url_template,
"lima_url_template_args": lima_url_template_args,
**kwargs,
}
wait = True
while wait:
with dynamic_hdf5.File(**params_dynamic_file) as root:
lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{lima_name}/data"]
length = len(lima_dataset)
if length == nb_points:
wait = False
elif scan.state == 4:
length = len(lima_dataset)
wait = False
else:
scan.update(block=False)
time.sleep(1)
yield length
final_length = get_length_dataset_dynamic_file(
detector_name=lima_name,
scan_memory_url=scan_memory_url,
lima_url_template=lima_url_template,
lima_url_template_args=lima_url_template_args,
subscan=subscan,
**kwargs,
)
if final_length != length:
return final_length
[docs]
def get_length_lima_stream(
detector_name,
scan: Scan = None,
scan_memory_url: str = None,
beacon_host: str = None,
lima_url_template="",
lima_url_template_args={},
subscan=1,
):
limastream = get_stream(
scan=scan,
scan_memory_url=scan_memory_url,
detector_name=detector_name,
beacon_host=beacon_host,
)
# To overcome Lima bug with memory when mask processing is active
try:
# _ = limastream[0]
# Check the len of a limastream does not crash regarding the Lima Bug
last_index_available = len(limastream)
except Exception:
logger.warning("Data is no more available from Lima memory")
if not scan:
scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host)
last_index_available = get_length_dataset_dynamic_file(
filename_data=scan.info["filename"],
scan_nb=scan.info["scan_nb"],
detector_name=detector_name,
lima_url_template=lima_url_template,
lima_url_template_args=lima_url_template_args,
subscan=subscan,
)
return last_index_available
[docs]
def track_length_dataset(
lima_name,
scan_memory_url,
beacon_host=None,
lima_url_template="",
lima_url_template_args={},
subscan=1,
**kwargs,
):
scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host)
nb_points = scan.info["npoints"]
limastream_params = {
"scan_memory_url": scan_memory_url,
"detector_name": lima_name,
}
wait = True
memory_available = True
last_index_available = 0
while wait:
if memory_available:
limastream = get_stream(**limastream_params)
try:
_ = limastream[0]
last_index_available = len(limastream)
logger.info("Data is available from Lima memory")
except IndexNotYetThereError:
pass
except IndexNoMoreThereError:
logger.warning("Data is no more available from Lima memory")
memory_available = False
except RuntimeError:
pass
if last_index_available == nb_points:
wait = False
elif scan.state == 4:
last_index_available = len(limastream)
wait = False
else:
scan.update(block=False)
time.sleep(1)
else:
logger.info("Data is only available in the files")
params_dynamic_file = {
"file": scan.info["filename"],
"lima_names": [lima_name],
"lima_url_template": lima_url_template,
"lima_url_template_args": lima_url_template_args,
**kwargs,
}
with dynamic_hdf5.File(**params_dynamic_file) as root:
scan_nb = scan.info["scan_nb"]
lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{lima_name}/data"]
last_index_available = len(lima_dataset)
if last_index_available == nb_points:
wait = False
elif scan.state == 4:
last_index_available = len(lima_dataset)
wait = False
else:
scan.update(block=False)
time.sleep(1)
yield last_index_available
[docs]
def track_dataset(
lima_name,
scan_memory_url,
beacon_host=None,
lima_url_template="",
lima_url_template_args={},
subscan=1,
max_slice_size=10,
start_from_memory=True,
**kwargs,
):
scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host)
nb_points = scan.info["npoints"]
limastream_params = {
"scan_memory_url": scan_memory_url,
"detector_name": lima_name,
}
wait = True
memory_available = start_from_memory
last_index_read = 0
while wait:
dataset = None
if memory_available:
try:
scan.update(block=False)
limastream = get_stream(**limastream_params)
except Exception:
wait = False
continue
try:
_ = limastream[0]
last_index_available = len(limastream)
slice_end = min(last_index_read + max_slice_size, last_index_available)
dataset = limastream[last_index_read:slice_end]
if dataset and len(dataset) == 0:
continue
logger.info(
f"Data retrieved from Lima memory: {slice_end - last_index_read} frames:"
)
last_index_read = slice_end
except IndexNotYetThereError:
continue
except IndexNoMoreThereError:
logger.warning(
"Data is no more available from Lima memory. Switching to h5api..."
)
memory_available = False
continue
except RuntimeError:
continue
if last_index_read == nb_points:
wait = False
elif scan.state == 4:
limastream = get_stream(**limastream_params)
try:
length = len(limastream)
if length == last_index_read:
wait = False
except Exception:
memory_available = False
continue
if not memory_available:
params_dynamic_file = {
"file": scan.info["filename"],
"lima_names": [lima_name],
"lima_url_template": lima_url_template,
"lima_url_template_args": lima_url_template_args,
**kwargs,
}
with dynamic_hdf5.File(**params_dynamic_file) as root:
scan_nb = scan.info["scan_nb"]
lima_dataset = root[f"{scan_nb}.{subscan}/instrument/{lima_name}/data"]
length = len(lima_dataset)
if length > last_index_read:
slice_end = min(last_index_read + max_slice_size, length)
dataset = lima_dataset[last_index_read:slice_end]
logger.info(f"Data retrieved from hdf5 file: {len(dataset)} frames")
last_index_read = slice_end
if last_index_read == nb_points:
wait = False
elif scan.state == 4 and last_index_read == len(lima_dataset):
wait = False
else:
scan.update(block=False)
yield dataset
[docs]
def get_available_dataset(
lima_name: str,
scan_memory_url: str = "",
lima_url_template: str = "",
lima_url_template_args: Dict[str, Any] = {},
scan_nb: int = None,
subscan: int = 1,
last_index_read: int = 0,
max_slice_size: int = 10,
start_from_memory: bool = True,
range_index_read: Optional[Tuple[int, int]] = None,
data_filename: str = None,
) -> Optional[np.ndarray]:
"""
Retrieves the available dataset from either Lima memory or an HDF5 file.
Args:
lima_name (str): Name of the detector.
scan_memory_url (str): URL to the scan memory.
lima_url_template (str, optional): Template for Lima file URLs. Defaults to "".
lima_url_template_args (dict, optional): Arguments for the Lima URL template. Defaults to {}.
subscan (int, optional): Subscan number. Defaults to 1.
last_index_read (int, optional): Last index read from the dataset. Defaults to 0.
max_slice_size (int, optional): Maximum number of frames to read in one slice. Defaults to 10.
start_from_memory (bool, optional): Whether to start reading from memory. Defaults to True.
Returns:
Optional[np.ndarray]: The retrieved dataset, or None if no data is available.
"""
if scan_memory_url:
return _slice_dataset_online(
scan_memory_url=scan_memory_url,
detector_name=lima_name,
lima_url_template=lima_url_template,
lima_url_template_args=lima_url_template_args,
subscan=subscan,
last_index_read=last_index_read,
max_slice_size=max_slice_size,
start_from_memory=start_from_memory,
range_index_read=range_index_read,
)
else:
return _slice_dataset_offline(
filename_data=data_filename,
scan_nb=scan_nb,
subscan=subscan,
detector_name=lima_name,
last_index_read=last_index_read,
max_slice_size=max_slice_size,
range_index_read=range_index_read,
)
def _get_nb_frames_available_from_stream(
detector_name: str = None,
stream_name: str = None,
scan: Scan = None,
scan_memory_url: str = None,
beacon_host: str = None,
index_range_last: tuple = None,
):
if not scan:
scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host)
if index_range_last is None:
last_index = 0
else:
last_index = index_range_last[-1]
wait_for_data = True
while wait_for_data:
try:
scan.update(block=False)
except Exception as e:
# Handle canceled or non-existent scans
logger.warning(f"Scan canceled or does not exist. Exiting. {e}")
wait_for_data = False
continue
stream = get_stream(
stream_name=stream_name,
detector_name=detector_name,
scan=scan,
scan_memory_url=scan_memory_url,
beacon_host=beacon_host,
)
nb_available_frames = len(stream)
if last_index >= nb_available_frames:
continue
if nb_available_frames > 0:
wait_for_data = False
return nb_available_frames
def _get_nb_frames_available_from_file(
filename_data: str,
h5path: str,
):
params = {
"filename": filename_data,
"name": "/",
}
with open_item_silx(**params) as root:
if h5path not in root:
logger.error(
f"Attempt to read data offline but {h5path} not in {filename_data}"
)
return
h5item = root[h5path]
if isinstance(h5item, h5py.Dataset):
# Normally is a lima dataset
nb_frames_available = len(h5item)
elif isinstance(h5item, h5py.Group):
# In the case of scalers, we have to read the limits looking at all the counters
nb_frames_available = min(
[len(h5item[i]) for i in h5item if isinstance(h5item[i], h5py.Dataset)]
)
else:
logger.error(f"{h5item} not valid to read data")
return
return nb_frames_available
def _get_new_slice_limits(
detector_name: str = None,
stream_name: str = None,
scan: Scan = None,
scan_memory_url: str = None,
beacon_host: str = None,
filename_data: str = None,
h5path: str = None,
index_range=None,
index_range_last=None,
max_slice_size: int = 50,
):
# Get the minimum index to read
if index_range_last is None:
# Nothing has been read yet
if index_range is None:
# No specific range requested
slice_init = 0
else:
# There is a requested range
slice_init = index_range[0]
else:
# Continue from the previous last index
slice_init = index_range_last[-1]
slice_end = slice_init + max_slice_size
if scan_memory_url or scan:
if not scan:
scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host)
stream_params = {
"stream_name": stream_name,
"detector_name": detector_name,
"scan": scan,
"scan_memory_url": scan_memory_url,
"beacon_host": beacon_host,
}
wait_for_data = True
logger.info("Waiting for data...")
while wait_for_data:
try:
scan.update(block=False)
except Exception as e:
# Handle canceled or non-existent scans
logger.warning(f"Scan canceled or does not exist. Exiting. {e}")
wait_for_data = False
return
if scan.state == 4:
stream = get_stream(**stream_params)
if not stream:
logger.error(f"No detected stream for {stream_name}. Aborting.")
wait_for_data = False
return
nb_available_frames = len(stream)
# The scan is over, check the length and out of the loop
wait_for_data = False
continue
else:
# The scan is not over, if there are no new frames, continue waiting
stream = get_stream(**stream_params)
if not stream:
continue
nb_available_frames = len(stream)
if nb_available_frames > slice_init:
# There are new frames, out of the loop
wait_for_data = False
continue
else:
continue
else:
params = {
"filename": filename_data,
"name": "/",
}
with open_item_silx(**params) as root:
if h5path not in root:
logger.error(
f"Attempt to read data offline but {h5path} not in {filename_data}"
)
return
h5item = root[h5path]
if isinstance(h5item, h5py.Dataset):
# Normally is a lima dataset
nb_available_frames = len(h5item)
elif isinstance(h5item, h5py.Group):
# In the case of scalers, we have to read the limits looking at all the counters
nb_available_frames = min(
[
len(h5item[i])
for i in h5item
if isinstance(h5item[i], h5py.Dataset)
]
)
else:
logger.error(f"{h5item} not valid to read data")
return
slice_end = min(slice_end, nb_available_frames)
if index_range is not None:
slice_end = min(slice_end, index_range[-1])
if slice_init >= slice_end:
return
return [slice_init, slice_end]
[docs]
def track_slice_limits(
detector_name: str = None,
scan: Scan = None,
scan_memory_url: str = None,
beacon_host: str = None,
filename_data: str = None,
h5path: str = None,
index_range=None,
max_slice_size: int = 50,
):
new_slice_limits = None
while True:
new_slice_limits = _get_new_slice_limits(
detector_name=detector_name,
scan=scan,
scan_memory_url=scan_memory_url,
beacon_host=beacon_host,
filename_data=filename_data,
h5path=h5path,
max_slice_size=max_slice_size,
index_range=index_range,
index_range_last=new_slice_limits,
)
if new_slice_limits is None:
break
yield new_slice_limits
def _slice_dataset_online(
stream_name: str = None,
detector_name: str = None,
scan: Scan = None,
scan_memory_url: str = None,
beacon_host: str = None,
lima_url_template: str = "",
lima_url_template_args: Dict[str, Any] = {},
subscan: int = 1,
index_range: tuple = None,
start_from_memory: bool = True,
) -> dict:
"""
Reads a dataset from an online source, either from Lima memory or through dynamic hdf5 file.
This method attempts to retrieve data from Lima memory first. If the data is no longer
available in memory, it falls back to reading from an HDF5 file.
Args:
scan_memory_url (str): URL to the scan memory.
lima_name (str): Name of the detector.
lima_url_template (str, optional): Template for Lima file URLs. Defaults to "".
lima_url_template_args (Dict[str, Any], optional): Arguments for the Lima URL template. Defaults to an empty dictionary.
subscan (int, optional): Subscan number. Defaults to 1.
last_index_read (int, optional): The last index read from the dataset. Defaults to 0.
max_slice_size (int, optional): Maximum number of frames to read in one slice. Defaults to 10.
start_from_memory (bool, optional): Whether to start reading from Lima memory. Defaults to True.
range_index_read (tuple[int, int] | None, optional): Range of indices to read. If None, reads all available data. Defaults to None.
Returns:
None: The method does not return a value directly. Instead, it processes the dataset
and logs the retrieved data.
"""
if not stream_name and detector_name:
stream_name = f"{detector_name}:image"
if not scan:
scan = load_scan(scan_memory_url=scan_memory_url, beacon_host=beacon_host)
memory_available = start_from_memory
dataset = None
wait_for_data = True
while wait_for_data:
if memory_available:
try:
scan.update(block=False)
stream = get_stream(
stream_name=stream_name,
scan=scan,
scan_memory_url=scan_memory_url,
beacon_host=beacon_host,
)
except Exception as e:
# Handle canceled or non-existent scans
logger.warning(f"Scan canceled or does not exist. Exiting. {e}")
wait_for_data = False
continue
try:
# Check if the requested frame is available in memory
if index_range is None:
slice_init = 0
slice_end = len(stream)
else:
slice_init, slice_end = index_range
# _ = stream[slice_init]
# _ = stream[slice_end]
dataset = stream[slice_init:slice_end]
if dataset is not None:
if len(dataset) == 0:
dataset = None
continue
if len(dataset) > 0:
wait_for_data = False
logger.info(
f"Data retrieved from {detector_name} Lima memory: {len(dataset)} frames between {slice_init} -> {slice_end}"
)
else:
if scan.state == 4:
wait_for_data = False
continue
except IndexNotYetThereError:
# Frame not yet available
continue
except IndexNoMoreThereError:
logger.warning(
f"Data is no more available from {detector_name} Lima memory. Switching to h5api..."
)
memory_available = False
continue
except IndexWontBeThereError:
logger.warning(f"No more data can be retrieved from {detector_name}")
wait_for_data = False
continue
except Exception as e:
print(
f"Exception! Looks like {detector_name} Lima memory is not reachable. Switching to file... {e}"
)
memory_available = False
continue
if not memory_available:
params_dynamic_file = {
"file": scan.info["filename"],
"lima_names": [detector_name],
"lima_url_template": lima_url_template,
"lima_url_template_args": lima_url_template_args,
# "prioritize_non_native_h5items" : True,
}
with dynamic_hdf5.File(**params_dynamic_file) as root:
scan_nb = scan.info["scan_nb"]
dset_data = root[f"{scan_nb}.{subscan}/instrument/{detector_name}/data"]
if index_range is None:
slice_init = 0
slice_end = len(dset_data)
else:
slice_init, slice_end = index_range
dataset = dset_data[slice_init:slice_end, :, :]
if dataset is not None:
if len(dataset) == 0:
dataset = None
continue
if len(dataset) > 0:
wait_for_data = False
logger.info(
f"Data retrieved from hdf5 file: {len(dataset)} frames between {slice_init} -> {slice_end}"
)
return {"dataset": dataset, "index_range": (slice_init, slice_end)}
def _slice_dataset_offline(
filename_data: str,
h5path_to_data: str,
index_range: tuple = None,
) -> dict:
"""
Reads a dataset from an HDF5 file, handling different possible structures.
Args:
filename_data (str): Path to the HDF5 file.
h5path_to_data (str): Path to the H5 dataset with the data.
index_range (tuple) : Will attempt to slice between these two limits.
Returns:
numpy.ndarray: The sliced dataset, or None if no data is found.
"""
out = {
"dataset": None,
"index_range": None,
}
with open_item_silx(filename=filename_data, name="/") as root:
if h5path_to_data in root:
dset = root[h5path_to_data]
nb_frames_available = len(dset)
if index_range is None:
slice_init = 0
slice_end = nb_frames_available
else:
slice_init = min(index_range[0], nb_frames_available)
slice_end = min(index_range[-1], nb_frames_available)
if slice_init >= slice_end:
logger.error(
f"{filename_data}:{h5path_to_data} cannot be sliced between {slice_init} -> {slice_end}"
)
else:
logger.info(
f"Data retrieved from hdf5 file: {slice_end - slice_init} frames between {slice_init} -> {slice_end}"
)
out["dataset"] = dset[slice_init:slice_end]
out["index_range"] = (slice_init, slice_end)
return out
[docs]
def read_dataset_offline(
filename_data: str,
detector_name: str,
scan_nb: int,
last_index_read: int,
max_slice_size: int = 100,
range_index_read: Optional[Tuple[int, int]] = None,
) -> tuple:
dataset = None
with h5py.File(filename_data, "r") as f:
path_to_data_signal = f"{scan_nb}.1/instrument/{detector_name}/data"
if path_to_data_signal not in f:
logger.error(f"Dataset {path_to_data_signal} not found in {filename_data}")
return
dataset = f[path_to_data_signal]
length = len(dataset)
slice_init = last_index_read
if range_index_read is None:
slice_end = min(
last_index_read + max_slice_size,
length,
)
else:
slice_end = min(range_index_read[-1], length)
data_signal = dataset[slice_init:slice_end]
return data_signal
[docs]
def do_continue_pipeline(
detector_name: str = None,
scan_memory_url=None,
beacon_host: str = None,
last_index_read=0,
lima_url_template="",
lima_url_template_args={},
subscan=1,
filename_data=None,
path_to_data_signal: str = None, # To be used for static files
) -> bool:
"""
Checks if there are still frames to be read from a running/complete scan or from a file
"""
logger.info(
f"Checking if there are still frames to read. Last index read: {last_index_read}"
)
if scan_memory_url:
return continue_pipeline_bliss(
scan_memory_url=scan_memory_url,
beacon_host=beacon_host,
detector_name=detector_name,
last_index_read=last_index_read,
subscan=subscan,
lima_url_template=lima_url_template,
lima_url_template_args=lima_url_template_args,
)
elif filename_data:
return continue_pipeline_offline(
filename_data=filename_data,
last_index_read=last_index_read,
path_to_data_signal=path_to_data_signal,
)
[docs]
def continue_pipeline_bliss(
detector_name: str,
scan: Scan = None,
scan_memory_url: str = None,
beacon_host: str = None,
last_index_read: int = 0,
subscan: int = 1,
lima_url_template: str = None,
lima_url_template_args: dict = None,
):
try:
if not scan:
scan = load_scan(
scan_memory_url=scan_memory_url,
beacon_host=beacon_host,
wait_until_start=False,
)
state = scan.state
except Exception as e:
logger.error(f"scan {scan_memory_url} could not be loaded!: {e}")
return False
if state < 2:
logger.info("Scan started but acquisition did not. Wait, data is coming")
return True
elif state in (2, 3):
logger.info("Scan is running. Wait for data")
return True
if state == 4:
logger.info("Scan is complete")
if not scan.streams:
logger.warning(
"\n\tThe scan is complete and does not contain any streams. End of the workflow.\n\t"
)
return False
if f"{detector_name}:image" not in scan.streams:
logger.error(f"There is no stream {detector_name}:image in the scan")
return False
current_length = get_length_lima_stream(
scan=scan,
scan_memory_url=scan_memory_url,
beacon_host=beacon_host,
detector_name=detector_name,
lima_url_template=lima_url_template,
lima_url_template_args=lima_url_template_args,
subscan=subscan,
)
if current_length is None:
return False
logger.info(f"Current length of the dataset: {current_length}")
if current_length == last_index_read:
logger.info("\n\tNo more frames to read. End of the workflow\n\t")
return False
elif current_length > last_index_read:
logger.info("There are still frames to read. Continue")
return True
else:
logger.error(
"There are more read then stored frames. Something went wrong!"
)
return False
[docs]
def continue_pipeline_offline(
filename_data: str,
last_index_read: int = 0,
path_to_data_signal: str = None,
):
current_length = get_length_dataset_static_file(
filename_data=filename_data,
data_path=path_to_data_signal,
)
if current_length is None:
return False
logger.info(
f"Current length of the dataset: {current_length}. Last index read: {last_index_read}"
)
if current_length == last_index_read:
logger.info("\n\tNo more frames to read. End of the workflow\n\t")
return False
elif current_length > last_index_read:
logger.info("There are still frames to read. Continue")
return True
else:
logger.error("There are more read then stored frames. Something went wrong!")
return True
[docs]
def read_blissdata_stream(stream, range_to_read: list) -> numpy.ndarray:
"""
Centralized method to slice of blissdata stream
"""
if range_to_read is None:
range_to_read = [None, None]
else:
attempts = 0
while len(stream) < range_to_read[1] and attempts < 10:
time.sleep(0.1)
attempts += 1
if len(stream) < range_to_read[1]:
logger.error(
f"Requested range {range_to_read} is larger than the stream length {len(stream)}. Adjusting range."
)
try:
read_values = stream[range_to_read[0] : range_to_read[1]]
return read_values
except Exception as e:
logger.error(f"Error reading stream {stream} in {range_to_read=}: {e}")
return None
[docs]
def does_scan_contain_subscan2(scan: Scan) -> bool:
nb_acq_chains = len(scan.info["acquisition_chain"])
if nb_acq_chains == 1:
# Standard scan
return False
elif nb_acq_chains == 2:
# Subscan2
return True
else:
# Unkown scenario
return
[docs]
def get_stream_names_subscan1(scan: Scan) -> list:
acquisition_chains = scan.info.get("acquisition_chain")
if not acquisition_chains:
logger.error(f"There is no acquisition chains in {scan}")
return
if does_scan_contain_subscan2(scan=scan):
acquisition_chain1 = acquisition_chains.get("mcs")
if not acquisition_chain1:
logger.error(f"No mcs acq. chain in {scan}")
return []
else:
acquisition_chain1 = next(iter(acquisition_chains.values()))
streams = [
stream
for stream in _get_stream_names_from_acquisition_chain(
acq_chain=acquisition_chain1
)
if stream in scan.streams
]
return streams
[docs]
def get_stream_names_subscan2(scan: Scan) -> list:
acquisition_chains = scan.info.get("acquisition_chain")
if not acquisition_chains:
logger.error(f"There is no acquisition chains in {scan}")
return
if does_scan_contain_subscan2(scan=scan):
acquisition_chain2 = acquisition_chains.get("sampling_timer")
if not acquisition_chain2:
logger.info(f"No sampling_timer acq. chain in {scan}")
return []
streams = [
stream
for stream in _get_stream_names_from_acquisition_chain(
acq_chain=acquisition_chain2
)
if stream in scan.streams
]
return streams
else:
logger.debug(f"There is no subscan2 in {scan}")
return []
# def _get_streams_subscan2(scan: Scan) -> list:
# acquisition_chains = scan.info.get("acquisition_chain")
# acquisition_chain2 = acquisition_chains.get("sampling_timer")
# streams = [
# stream
# for stream in _get_stream_names_from_acquisition_chain(acq_chain=acquisition_chain2)
# if stream in scan.streams
# ]
# return streams
def _get_stream_names_from_acquisition_chain(
acq_chain: dict, include_images: bool = False
) -> list:
master = acq_chain.get("master", {})
if not master:
return []
stream_names = []
for acq_chain_type in (acq_chain, master):
for stream_type in ("scalars", "spectra"):
stream_names += acq_chain_type.get(stream_type)
if include_images:
stream_names += acq_chain_type.get("images")
return stream_names