"""
Module for the MCSCard CSAXS implementation at cSAXS.
Please respect the comments regarding timing and procedures of the MCS card. These
are highlighted with NOTE comments directly in the code, indicating requirements
for stable device operation. Most of these constraints were identified
empirically through extensive testing with the SIS3820 MCS card IOC and are intended
to prevent unexpected hardware or IOC behavior.
"""
from __future__ import annotations
import threading
import time
import traceback
from contextlib import contextmanager
from functools import partial
from threading import RLock
from typing import TYPE_CHECKING, Callable, Literal
import numpy as np
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd.utils.errors import WaitTimeoutError
from ophyd_devices import (
AsyncMultiSignal,
CompareStatus,
ProgressSignal,
StatusBase,
TransitionStatus,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.mcs_card.mcs_card import (
ACQUIREMODE,
ACQUIRING,
CHANNEL1SOURCE,
CHANNELADVANCE,
INPUTMODE,
OUTPUTMODE,
POLARITY,
READMODE,
MCSCard,
)
from csaxs_bec.devices.utils.utils import fetch_scan_info
[docs]
@contextmanager
def suppress_mca_callbacks(mcs_card: MCSCard, restore_after_timeout: None | float = None):
"""
Utility context manager to suppress MCA channel callbacks temporarily.
It is required because erasing all channels via 'erase_all' PV triggers
callbacks for each channel. Depending on timing, this can interfere with
ongoing data acquisition so this context manager can be used to suppress
those callbacks temporarily. If used with restore_after_timeout, the suppression
will be automatically cleared after the specified timeout in seconds.
NOTE: Please be aware that it does not restore previous state, which means
that _omit_mca_callbacks will remain set after exiting the context. It has
to be cleared manually if needed. This can be improved in the future, but
should be carefully coordinated with the logic implemented within '_on_counter_update'.
Args:
mcs_card (MCSCard): The MCSCard instance to suppress callbacks for.
restore_after_timeout (float | None): Optional timeout in seconds to automatically
clear the suppression after the specified time. If None, the original state
is not restored.
"""
with mcs_card._rlock:
mcs_card._omit_mca_callbacks.set() # pylint: disable=protected-access
try:
yield
finally:
if restore_after_timeout is not None:
time.sleep(restore_after_timeout)
mcs_card._omit_mca_callbacks.clear() # pylint: disable=protected-access
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase, ScanInfo
logger = bec_logger.logger
[docs]
class MCSCardCSAXS(PSIDeviceBase, MCSCard):
"""
Implementation of the MCSCard SIS3820 for CSAXS, prefix 'X12SA-MCS:'.
The basic functionality is inherited from the MCSCard class.
Please note that the number of channels is fixed to 32, so there will be data for all
32 channels. In addition, the logic of the card is linked to the timing system (DDG)
and therefore changes have to be coordinated with the logic on the DDG side.
Args:
name (str): Name of the device.
prefix (str, optional): Prefix for the EPICS PVs. Defaults to "".
"""
USER_ACCESS = ["mcs_recovery", "get_transition_status", "get_compare_status"]
# NOTE The number of MCA channels is fixed to 32 for the CSAXS MCS card.
# On the IOC, we receive a 'warning' or 'error' once we set this channel for the
# envisioned input/output mode settings of the card. However, we need to know the
# channels set as callback timing relies on the channels to be set.
# For the future, we may consider adding an initialization parameter to set
# the number of channels, which in return limits the number of subscriptions
# on the channels. However, mux_output should still be set to 32 on the IOC side.
# If this limits performance, this should be investigated with Controls engineers and
# the IOC.
NUM_MCA_CHANNELS: int = 32
# MCA counters for the card. Channels 1-32 will be sent to BEC.
mca = Cpt(
AsyncMultiSignal,
name="counters",
signals=[
f"mca{i}" for i in range(1, 33)
], # NOTE Channels 1-32, they need to be in sync with the 'counters' component (DynamicDeviceComponent) of the MCSCard
ndim=1,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
kind=Kind.normal,
doc=(
"AsyncMultiSignal for MCA card channels 1-32."
"Cabling of the MCS card determines which channel corresponds to which input."
),
)
progress = Cpt(ProgressSignal, doc="ProgressSignal indicating the progress of the device")
def __init__(
self,
name: str,
prefix: str = "",
scan_info: ScanInfo | None = None,
device_manager: DeviceManagerBase | None = None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
# NOTE MCS Clock frequency. This is linked to the settings of the SIS3820 IOC and
# cabeling of the card. Currently, the 'output_mode' is set to MODE_2 and one of the outputs
# 6 or 7 (both 10MHz clocks) is used on channel 5 input for the timing signal of the IOC.
# Please adjust this comment if the cabling or IOC settings change.
self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz
self._pv_timeout = 2.0 # seconds
self._rlock = RLock()
# NOTE This parameter will be sent with async data of the mcs counters.
# Based on scan-paramters, e.g. frames_per_trigger, this will be either
# 'monitored' or 'burst_group'. This means whether data from this channel
# is in sync with monitored devices or another group. In this scenario,
# the other group is called burst_group. Other detectors connected and
# triggered through the same timing system should implement the same logic
# to allow data to be properly grouped afterwards.
self._acquisition_group: str = "monitored" # default value, will be updated in on_stage
self._num_total_triggers: int = 0
# Thread and event logic for monitoring async data emission after scan is done
# These are mostly internal variables for which values should not be changed externally.
# Adjusting the logic of them should also be handled with care and proper testing.
self._scan_done_thread_kill_event: threading.Event = threading.Event()
self._start_monitor_async_data_emission: threading.Event = threading.Event()
self._scan_done_callbacks: list[Callable[[], None]] = []
self._scan_done_thread: threading.Thread = threading.Thread(
target=self._monitor_async_data_emission, daemon=True
)
self._current_data_index: int = 0
self._mca_counter_index: int = 0
self._current_data: dict[str, dict[Literal["value", "timestamp"], list[int] | float]] = {}
self._omit_mca_callbacks: threading.Event = threading.Event()
self.scan_parameters: ScanServerScanInfo | None = None
[docs]
def on_connected(self):
"""
This method is called once the device and all its PVs are connected. Any initial
setup of PVs should be managed here. Please be aware that settings of the MCS card
correlate with its operation mode, input/output modes, and timing. Changing single
parameters without understanding the overall logic may lead to unexpected behavior
of the device.Therefore, any modification of these parameters should be handled
with care and tested.
A brief summary of the procesdure that is implemented here:
- Stop any ongoing acquisiton.
- Setup the Initial initial settings of the MCS card with respective operation modes
- Run 'mcs_recovery' procedure to ensure that no pending acquisition data is scheduled
to be pushed through mcs channels
- Subscribe a callback '_on_counter_update' to mcs counter PVs to forward
data through AsyncMultiSignal to BEC
- Start the monitoring thread for async data emission after scan is done
"""
# NOTE Stop any ongoing acquisition first. This shut be done before setting any PVs.
self.stop_all.put(1)
#########################
### Setup MCS Card ###
#########################
# Setup the MCS card settings. Please note that any runtime modification
# these parameter may lead to unexpected behavior of the device.
# Therefore this has to be set up correctly.
self.channel_advance.set(CHANNELADVANCE.EXTERNAL).wait(timeout=self._pv_timeout)
self.channel1_source.set(CHANNEL1SOURCE.EXTERNAL).wait(timeout=self._pv_timeout)
self.prescale.set(1).wait(timeout=self._pv_timeout)
self.user_led.set(0).wait(timeout=self._pv_timeout)
# NOTE The number of output channels has to be set to NUM_MCA_CHANNELS.
# The logic to send data to BEC relies on knowing how many channels are active.
self.mux_output.put(self.NUM_MCA_CHANNELS)
# Set the input and output modes & polarities
self.input_mode.set(INPUTMODE.MODE_3).wait(timeout=self._pv_timeout)
self.input_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout)
self.output_mode.set(OUTPUTMODE.MODE_2).wait(timeout=self._pv_timeout)
self.output_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout)
self.count_on_start.set(0).wait(timeout=self._pv_timeout)
# NOTE Data is read out when the MCS card finishes an acquisition. The logic for this
# is also linked to triggering on the DDG.
# Set ReadMode to PASSIVE, the card will wait either wait for readout command or
# automatically readout once acquisition is done.
self.read_mode.set(READMODE.PASSIVE).wait(timeout=self._pv_timeout)
# Set the acquire mode
self.acquire_mode.set(ACQUIREMODE.MCS).wait(timeout=self._pv_timeout)
# Subscribe the progress signal
self.current_channel.subscribe(self._progress_update, run=False)
# NOTE: Run a recovery procedure to ensure that the card has no pending data
# that needs to be pushed through the mca channels. The procedure involves
# stopping any ongoing acquisition and erasing all data on the card. Including
# a short sleep to allow the IOC to process the commands.
self.mcs_recovery(timeout=1)
####################################
### Setup MCS Subscriptions ###
####################################
for sig in self.counters.component_names:
sig_obj: EpicsSignalRO = getattr(self.counters, sig)
sig_obj.subscribe(self._on_counter_update, run=False)
# Start monitoring thread
self._scan_done_thread.start()
def _on_counter_update(self, value: float | np.ndarray, **kwargs) -> None:
"""
Callback for counter updates of the mca channels (1-32). This callback is attached
to each mca channel PV on the MCS card. It collects data from all channels
and once all channels have been updated for a given acquisition, it pushes
the data to BEC through the AsyncMultiSignal 'mca'.
It is important that mux_output is set to the correct number of channels in on_connected,
because the callback here waits for updates on all channels before pushing data to BEC.
The _rlock is used to ensure thread safety as multiple callbacks may be executed
simultaneously from different threads.
If _omit_mca_callbacks is set, the callback will return immediately without processing the
data. This is used when erasing all channels to avoid interference with ongoing acquisition.
It has to manually cleared after the context manager 'suppress_mca_callbacks' is used.
Args:
value: The new value from the counter PV.
**kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance).
"""
with self._rlock:
if self._omit_mca_callbacks.is_set():
return # Suppress callbacks when erasing all channels
self._mca_counter_index += 1
signal: EpicsSignalRO | None = kwargs.get("obj", None)
if signal is None:
logger.error(f"Called without 'obj' in kwargs: {kwargs}")
return
# NOTE: This relies on the naming convention of the mca channels being 'mca1', 'mca2', ..., 'mca32'.
# for the MCSCard class with the 'counters' DynamicDeviceComponent.
# Ignore any updates from channels beyond NUM_MCA_CHANNELS
attr_name = signal.attr_name
index = int(attr_name[3:]) # Extract index from 'mcaX'
if index > self.NUM_MCA_CHANNELS:
return
# NOTE Depending on the scan parameters, we may either receive single values or numpy arrays.
# Therefore, we need to handle both cases here to ensure that data is always stored. We do
# this by converting single values to a list with one element, and numpy arrays to lists.
if isinstance(value, np.ndarray):
value = value.tolist() # Convert numpy array to list
else:
value = [value] # Received single value, convert to list
# Store the value with timestamp. If available in kwargs, use provided timestamp from CA,
# otherwise use current time when received.
self._current_data.update(
{attr_name: {"value": value, "timestamp": kwargs.get("timestamp") or time.time()}}
)
# Once we have received all channels, push data to BEC and reset for next accumulation
if len(self._current_data) == self.NUM_MCA_CHANNELS:
logger.info(
f"Current data index {self._current_data_index} complete, pushing to BEC."
)
self.mca.put(self._current_data, acquisition_group=self._acquisition_group)
self._current_data.clear()
self._mca_counter_index = 0
self._current_data_index += 1
# NOTE The logic for the device progress is not yet fully refined for all scan types.
# This has to be adjusted once fly scan and step scan logic is fully implemented.
# pylint: disable=unused-argument
def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None:
"""
Callback to update the progress signals base on values of current_channel in respect to expected total triggers.
Logic for these updates need to be extended once fly and step scan logic is fully implemented.
Args:
old_value: Previous value of the signal.
value: New value of the signal.
"""
try:
if (
self._num_lines > 1 and old_value > value
): # This indicates that we have moved to the next line in a cont scan
self._current_line += 1
value = value + (self._current_line - 1) * self.scan_parameters.frames_per_trigger
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
except Exception:
content = traceback.format_exc()
logger.info(f"Device {self.name} error: {content}")
[docs]
def on_stage(self) -> None:
"""
This method is called when the device is staged before a scan. Any bootstrapping required
for the scan should be handled here. We also need to handle MCS card specific logic to ensure
that the card is properly prepared for the scan.
The following procedure is implemented here:
- Ensure that any ongoing acquisition is stopped (should never happen if not interfered with manually)
- Erase all data on the MCS card to ensure a clean start (should never
- Set acquisition parameters based on scan parameters (frames_per_trigger, num_points, acquisition_group)
- Clear any events and buffers related to async data emission. This includes '_omit_mca_callbacks',
'_start_monitor_async_data_emission', '_scan_done_callbacks', and '_current_data'.
"""
logger.info(f"MCS Card {self.name} on_stage called.")
start_time = time.time()
self.scan_parameters = fetch_scan_info(self.scan_info)
# NOTE: If for some reason, the card is still acquiring, we need to stop it first
# This should never happen as the card is properly stopped during unstage
# Can only happen if user manually interferes with the IOC through other means
if self.acquiring.get() == ACQUIRING.ACQUIRING:
logger.warning(
f"MCS Card {self.name} was still acquiring on staging. Stopping acquisition."
)
self.stop_all.put(1)
status = CompareStatus(self.acquiring, ACQUIRING.DONE)
status.wait(timeout=10)
# NOTE: If current_channel != 0, erase all data on the card. This
# needs to be done with the 'suppress_mca_callbacks' context manager as erase_all will result
# in data emission through mca callback subscriptions.
# The buffer needs to be cleared as this will otherwise lead to missing
# triggers during the scan. Again, this should not happen if unstage is properly called.
# But user interference or a restart of the device_server may lead to this situation.
if self.current_channel.get() != 0:
with suppress_mca_callbacks(self, restore_after_timeout=1.0):
logger.warning(
f"MCS Card {self.name} had still data in buffer Erased all data on staging and sleeping for 1 second."
)
# Erase all data on the MCS card
self.erase_all.put(1)
#####################################
### Setup Acquisition Parameters ###
#####################################
triggers = self.scan_parameters.frames_per_trigger
num_points = self.scan_parameters.num_points
self._num_total_triggers = triggers * num_points
self._num_lines = self.scan_parameters.additional_scan_parameters.get("num_lines", 1)
self._current_line = 1
self._acquisition_group = "monitored" if triggers == 1 else "burst_group"
self.preset_real.set(0).wait() # TODO consider using put...
if self.scan_parameters.scan_type == "software_triggered":
self.num_use_all.set(triggers).wait()
elif self.scan_parameters.scan_type == "hardware_triggered":
self.num_use_all.set(self._num_total_triggers).wait()
# Clear any previous data, just to be sure
with self._rlock:
self._current_data.clear()
self._mca_counter_index = 0
# NOTE Reset events for monitoring async_data_emission thread which is
# running during complete to wait for all data from the card
# to be emitted to BEC.
self._start_monitor_async_data_emission.clear()
# Clear any previous scan done callbacks
self._scan_done_callbacks.clear()
# Reset counter for data index of emitted data, NOTE for fly scans, this logic may have to be adjusted.
self._current_data_index = 0
# NOTE Make sure that the signal that omits mca callbacks is cleared
# DO NOT REMOVE!!
self._omit_mca_callbacks.clear()
# For a fly scan we need to start the mcs card ourselves
if self.scan_parameters.scan_type == "hardware_triggered":
self.erase_start.put(1)
logger.info(f"MCS Card {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
[docs]
def on_prescan(self) -> None | StatusBase:
"""
This method is called after on_stage and before the scan starts. For the MCS card, we need to make sure
that the card is properly started for fly scans. For step scans, this will be handled by the DDG,
so no action is required here.
"""
if self.scan_parameters.scan_type == "hardware_triggered":
status_acquiring = CompareStatus(self.acquiring, ACQUIRING.ACQUIRING)
self.cancel_on_stop(status_acquiring)
return status_acquiring
return None
[docs]
def on_unstage(self) -> None:
"""
Called when the device is unstaged. This method should be omnipotent and resolve fast.
It stops any ongoing acquisition, erases all data on the MCS and clears the local buffer '_current_data'.
NOTE: It is important that the logic for on_complete is solid and properly waiting for mca data to be emitted
to BEC. Otherwise, unstage may interfere with ongoing data emission. Unstage is called after complete during scans.
It is crucial that the device itself calls '_omit_mca_callbacks' in its on_stage method to make sure
that data is emitted once the card is properly staged.
"""
self.stop_all.put(1)
with suppress_mca_callbacks(self):
with self._rlock:
self._current_data.clear()
self._current_data_index = 0
self.erase_all.put(1)
def _monitor_async_data_emission(self) -> None:
"""
Monitoring loop that runs in a separate thread to check if all async data has been emitted to BEC.
It is IDLE most of the time, but activate in the 'on_complete' method called by 'complete'.
The check is done by comparing the number of data updates '_current_data_index' received through
mca channel callbacks with the expected number of points in the scan. Once they match, all
callbacks in _scan_done_callbacks are called to indicate that data emission is done.
Callbacks need to also accept and handle exceptions to properly report failure.
NOTE! This logic currently works for any step scan, but has to be extended for fly scans.
"""
while not self._scan_done_thread_kill_event.is_set():
while self._start_monitor_async_data_emission.wait():
try:
if (
hasattr(self.scan_parameters, "num_points")
and self.scan_parameters.num_points is not None
):
if self.scan_parameters.scan_type == "software_triggered":
logger.info(
f"Software triggered scan: {self._current_data_index}/{self.scan_parameters.num_points} points received."
)
if self._current_data_index == self.scan_parameters.num_points:
for callback in self._scan_done_callbacks:
callback(exception=None)
else:
if self._current_data_index >= 1:
for callback in self._scan_done_callbacks:
callback(exception=None)
time.sleep(0.02) # 20ms delay to avoid busy loop
except Exception as exc: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Exception in monitoring thread of complete for {self.name}:\n{content}"
"Running callbacks to avoid deadlock."
)
for callback in self._scan_done_callbacks:
callback(exception=exc)
def _status_callback(self, status: StatusBase, exception=None) -> None:
"""Callback for status completion."""
self._start_monitor_async_data_emission.clear() # Stop monitoring
# NOTE Important check as set_finished or set_exception should not be called
# if the status is already done (e.g. cancelled externally)
if status.done:
return # Already done and cancelled externally.
if exception is not None:
status.set_exception(exception)
else:
status.set_finished()
def _status_failed_callback(self, status: StatusBase) -> None:
"""Callback for status failure, the monitoring thread should be stopped."""
# NOTE Check for status.done and status.success is important to avoid
if status.done:
self._start_monitor_async_data_emission.clear() # Stop monitoring
[docs]
def on_complete(self) -> CompareStatus:
"""
Method that is called at the end of scan core, but before unstage. This method is
used to report whether the device successfully completed its data acquisition for the scan.
The check has to be implemented asynchronously and resolve through a status (future) object
returned by this method.
NOTE: For the MCS card, we need to ensure that all data has been acquired
and emitted to BEC as updates after 'on_complete' resolved will be rejected by BEC.
Therefore, we need to ensure that all data has been emitted to BEC before
reporting completion of the device.
This method implements the following procedure:
- Starts the IDLE async data monitoring thread that checks if all expected data
has been emitted to BEC through the mca channel callbacks.
- Use a CompareStatus to monitor when the MCS card becomes DONE. Please note that this
only indicates that the card has finished acquisition, but not that all data has been
emitted to BEC.
- Return combined status object. A callback is registered to handle failure of the status
if it is stopped externally, e.g. through scan abort. This should ensure that the
monitoring thread is stopped properly.
"""
# NOTE For fly scans with EXT/EN enabled triggering, the MCS card needs to receive an
# additional trigger at the end of the scan to advance the channel. This will ensure
# that the acquisition finishes on the card and that data is emitted to BEC. If the acquisition
# was already finished (i.e. normal step scan sends 1 extra pulse per burst cycle), this will
# not have any effect as the card will already be in DONE state and signal.
if self.scan_parameters.scan_type == "hardware_triggered":
expected_points = int(
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
status = CompareStatus(
self.current_channel, expected_points - 1, operation_success=">="
)
try:
status.wait(timeout=5)
except WaitTimeoutError:
text = f"Device {self.name} received num points {self.current_channel.get()} / {expected_points}. Device timed out after 5s."
logger.error(text)
raise TimeoutError(text)
# Manually set the last advance
self.software_channel_advance.put(1)
# Prepare and register status callback for the async monitoring loop
status_async_data = StatusBase(obj=self, timeout=5)
self._scan_done_callbacks.append(partial(self._status_callback, status_async_data))
# Set the event to start monitoring async data emission
logger.debug(f"Starting to monitor async data emission for {self.name}...")
self._start_monitor_async_data_emission.set()
# Add CompareStatus for Acquiring DONE
status = CompareStatus(self.acquiring, ACQUIRING.DONE, timeout=5)
# status.wait(timeout=3) # timeout is passed to individual status objects, so don't wait here.
# Combine both statuses
ret_status = status & status_async_data
# NOTE: Handle external stop/cancel, and stop monitoring
ret_status.add_callback(self._status_failed_callback)
# ret_status.wait(timeout=3) # timeout is passed to individual status objects, so don't wait here.
self.cancel_on_stop(ret_status)
return ret_status
[docs]
def on_destroy(self):
"""
The on destroy hook is called when the device is destroyed, but also reloaded.
Here, we need to clean up all resources used up by the device, including running threads.
"""
self._scan_done_thread_kill_event.set()
self._start_monitor_async_data_emission.set()
if self._scan_done_thread.is_alive():
self._scan_done_thread.join(timeout=2.0)
if self._scan_done_thread.is_alive():
logger.warning(f"Thread for device {self.name} did not terminate properly.")
[docs]
def on_stop(self) -> None:
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
with suppress_mca_callbacks(self):
self.stop_all.put(1)
self.erase_all.put(1)
[docs]
def mcs_recovery(self, timeout: int = 1) -> None:
"""
Recovery procedure for the mcs card. This procedure has been empirically found and can
be used to ensure that the MCS card is stopped and has no pending data to be emitted.
It involves stopping any ongoing acquisition and erasing all data on the card, with
a sleep in between to allow the IOC to process the commands.
Args:
timeout (int): Total timeout for the recovery procedure. Defaults to 1 second.
"""
sleep_time = timeout / 2 # 2 sleeps
logger.debug(
f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep"
)
# First erase and start ongoing acquisition.
self.erase_start.put(1)
time.sleep(sleep_time)
# After a brief processing time, we stop any ongoing acquisition.
self.stop_all.put(1)
# Finally, we erase all data while suppressing mca callbacks to avoid interference.
# We restore the callback suppression after timeout to ensure proper operation afterwards.
with suppress_mca_callbacks(self, restore_after_timeout=sleep_time):
self.erase_all.put(1)
[docs]
def get_transition_status(self, signal_name: str, transition: list) -> TransitionStatus:
"""
Get the transition status for a signal of the device.
Args:
signal_name: The name of the signal to check the transition status for.
transition (list): List of transitions to check.
"""
signal = getattr(self, signal_name, None)
if signal is None:
raise ValueError(f"Signal {signal_name} not found in device {self.name}.")
return TransitionStatus(signal, transition)
[docs]
def get_compare_status(
self, signal_name: str, expected_value: any, operation_success: str = "=="
) -> CompareStatus:
"""
Get the compare status for a signal of the device.
Args:
signal_name: The name of the signal to check the compare status for.
expected_value: The expected value to compare against.
operation_success: The comparison operation to use. Defaults to "==".
"""
signal = getattr(self, signal_name, None)
if signal is None:
raise ValueError(f"Signal {signal_name} not found in device {self.name}.")
return CompareStatus(signal, expected_value, operation_success=operation_success)