Source code for csaxs_bec.devices.omny.rt.rt_ophyd

"""
This module contains base signals for RT devices. Controller and motors are implemented in the 
bespoke modules such as `rt_flomni_ophyd.py` or `rt_lamni_ophyd.py`.
"""

import functools
import time

from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.controller import ControllerCommunicationError, threadlocked
from ophyd_devices.utils.socket import SocketSignal

logger = bec_logger.logger


[docs] class RtCommunicationError(ControllerCommunicationError): pass
[docs] class RtError(ControllerCommunicationError): pass
[docs] class BECConfigError(Exception): pass
[docs] def retry_once(fcn): """Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty.""" @functools.wraps(fcn) def wrapper(self, *args, **kwargs): try: val = fcn(self, *args, **kwargs) except (RtCommunicationError, RtError): val = fcn(self, *args, **kwargs) return val return wrapper
[docs] class RtSignalBase(SocketSignal): def __init__(self, signal_name, **kwargs): self.signal_name = signal_name super().__init__(**kwargs) self.controller = self.parent.controller self.sock = self.parent.controller.sock
[docs] class RtSignalRO(RtSignalBase): def __init__(self, signal_name, **kwargs): super().__init__(signal_name, **kwargs) self._metadata["write_access"] = False def _socket_set(self, val): raise ReadOnlyError("Read-only signals cannot be set")
[docs] class RtReadbackSignal(RtSignalRO): @retry_once @threadlocked def _socket_get(self) -> float: """Get command for the readback signal Returns: float: Readback value after adjusting for sign and motor resolution. """ return_table = (self.controller.socket_put_and_receive("J4")).split(",") if self.parent.axis_Id_numeric == 0: readback_index = 2 elif self.parent.axis_Id_numeric == 1: readback_index = 1 else: raise RtError("Currently, only two axes are supported.") current_pos = float(return_table[readback_index]) current_pos *= self.parent.sign return current_pos
[docs] class RtSetpointSignal(RtSignalBase): def __init__(self, signal_name, **kwargs): super().__init__(signal_name, **kwargs) self.setpoint = 0.0 def _socket_get(self) -> float: """Get command for receiving the setpoint / target value. The value is not pulled from the controller but instead just the last setpoint used. Returns: float: setpoint / target value """ return self.setpoint @retry_once @threadlocked def _socket_set(self, val: float) -> None: """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. Args: val (float): Target value / setpoint value Raises: RtError: Raised if interferometer feedback is disabled. """ interferometer_feedback_not_running = int( (self.controller.socket_put_and_receive("J2")).split(",")[0] ) if interferometer_feedback_not_running != 0: raise RtError( "The interferometer feedback is not running. Either it is turned off or and interferometer error occured." ) self.set_with_feedback_disabled(val) def set_with_feedback_disabled(self, val): target_val = val * self.parent.sign self.setpoint = target_val self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
[docs] class RtMotorIsMoving(RtSignalRO): def _socket_get(self): return self.controller.is_axis_moving(self.parent.axis_Id_numeric)
[docs] def get(self): val = super().get() if val is not None: self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time()) return val
[docs] class RtFeedbackRunning(RtSignalRO): @threadlocked def _socket_get(self): if int((self.controller.socket_put_and_receive("J2")).split(",")[0]) == 0: return 1 else: return 0