Source code for csaxs_bec.devices.omny.rt.rt_ophyd
"""
This module contains base signals for RT devices. Controller and motors are implemented in the
bespoke modules such as `rt_flomni_ophyd.py` or `rt_lamni_ophyd.py`.
"""
import functools
import time
from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.controller import ControllerCommunicationError, threadlocked
from ophyd_devices.utils.socket import SocketSignal
logger = bec_logger.logger
[docs]
class RtCommunicationError(ControllerCommunicationError):
pass
[docs]
class RtError(ControllerCommunicationError):
pass
[docs]
class BECConfigError(Exception):
pass
[docs]
def retry_once(fcn):
"""Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (RtCommunicationError, RtError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
[docs]
class RtSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.sock = self.parent.controller.sock
[docs]
class RtSignalRO(RtSignalBase):
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self._metadata["write_access"] = False
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
[docs]
class RtReadbackSignal(RtSignalRO):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
return_table = (self.controller.socket_put_and_receive("J4")).split(",")
if self.parent.axis_Id_numeric == 0:
readback_index = 2
elif self.parent.axis_Id_numeric == 1:
readback_index = 1
else:
raise RtError("Currently, only two axes are supported.")
current_pos = float(return_table[readback_index])
current_pos *= self.parent.sign
return current_pos
[docs]
class RtSetpointSignal(RtSignalBase):
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self.setpoint = 0.0
def _socket_get(self) -> float:
"""Get command for receiving the setpoint / target value.
The value is not pulled from the controller but instead just the last setpoint used.
Returns:
float: setpoint / target value
"""
return self.setpoint
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
RtError: Raised if interferometer feedback is disabled.
"""
interferometer_feedback_not_running = int(
(self.controller.socket_put_and_receive("J2")).split(",")[0]
)
if interferometer_feedback_not_running != 0:
raise RtError(
"The interferometer feedback is not running. Either it is turned off or and interferometer error occured."
)
self.set_with_feedback_disabled(val)
def set_with_feedback_disabled(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
[docs]
class RtMotorIsMoving(RtSignalRO):
def _socket_get(self):
return self.controller.is_axis_moving(self.parent.axis_Id_numeric)
[docs]
def get(self):
val = super().get()
if val is not None:
self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time())
return val
[docs]
class RtFeedbackRunning(RtSignalRO):
@threadlocked
def _socket_get(self):
if int((self.controller.socket_put_and_receive("J2")).split(",")[0]) == 0:
return 1
else:
return 0