import logging
import threading
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.device import DEFAULT_CONNECTION_TIMEOUT
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from csaxs_bec.devices.smaract.smaract_controller import SmaractController
from csaxs_bec.devices.smaract.smaract_errors import SmaractCommunicationError, SmaractError
logger = bec_logger.logger
[docs]
class SmaractSignalBase(SocketSignal):
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller = self.parent.controller
self.sock = self.parent.controller.sock
[docs]
class SmaractSignalRO(SmaractSignalBase):
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self._metadata["write_access"] = False
@threadlocked
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
[docs]
class SmaractReadbackSignal(SmaractSignalRO):
@threadlocked
def _socket_get(self):
return self.controller.get_position(self.parent.axis_Id_numeric) * self.parent.sign
[docs]
class SmaractSetpointSignal(SmaractSignalBase):
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self.setpoint = 0
@threadlocked
def _socket_get(self):
return self.setpoint
@threadlocked
def _socket_set(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
if self.controller.axis_is_referenced(self.parent.axis_Id_numeric):
self.controller.move_axis_to_absolute_position(self.parent.axis_Id_numeric, target_val)
# parameters are axis_no,pos_mm*1e6, hold_time_sec*1e3
else:
raise SmaractError(f"Axis {self.parent.axis_Id_numeric} is not referenced.")
[docs]
class SmaractMotorIsMoving(SmaractSignalRO):
@threadlocked
def _socket_get(self):
return self.controller.is_axis_moving(self.parent.axis_Id_numeric)
[docs]
class SmaractAxisReferenced(SmaractSignalRO):
@threadlocked
def _socket_get(self):
return self.parent.controller.axis_is_referenced(self.parent.axis_Id_numeric)
[docs]
class SmaractAxisLimits(SmaractSignalBase):
@threadlocked
def _socket_get(self):
limits_msg = self.controller.socket_put_and_receive(f"GPL{self.parent.axis_Id_numeric}")
if limits_msg.startswith(":PL"):
limits = [
float(limit)
for limit in limits_msg.strip(f":PL{self.parent.axis_Id_numeric},").split(",")
]
else:
raise SmaractCommunicationError("Expected to receive message starting with :PL.")
return limits
# def _socket_set(self, val):
[docs]
class SmaractMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(SmaractReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(SmaractSetpointSignal, signal_name="setpoint")
# motor_resolution = Cpt(
# SmaractMotorResolution, signal_name="resolution", kind="config"
# )
motor_is_moving = Cpt(SmaractMotorIsMoving, signal_name="motor_is_moving", kind="normal")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
# all_axes_referenced = Cpt(
# SmaractAxesReferenced, signal_name="all_axes_referenced", kind="config"
# )
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2680.psi.ch",
port=8085,
limits=None,
sign=1,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
self.controller = SmaractController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.axis_Id = axis_Id
self.sign = sign
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = kwargs.pop("tolerance", 0.5)
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
[docs]
def wait_for_connection(self, timeout: float = 30.0) -> bool:
self.controller.on(timeout=timeout)
self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self):
"""
The setpoint is only stored locally. After a restart,
we need to update it to match the current readback value.
"""
self.user_setpoint.setpoint = self.readback.get()
[docs]
def destroy(self):
"""Make sure to turn off the controller socket on destroy."""
self.controller.off(update_config=False)
return super().destroy()
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
[docs]
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
[docs]
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 4)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
while self.motor_is_moving.get():
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(0.1)
val = self.readback.read()
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
self._done_moving(success=success)
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError(f"Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val > 26:
raise ValueError(f"Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "mm"
[docs]
def stage(self) -> list[object]:
return super().stage()
[docs]
def unstage(self) -> list[object]:
return super().unstage()
[docs]
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
mock = False
if not mock:
lsmarA = SmaractMotor("A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1)
lsmarB = SmaractMotor("B", name="lsmarB", host="mpc2680.psi.ch", port=8085, sign=1)
lsmarA.stage()
lsmarB.stage()
# status = leyey.move(2, wait=True)
# status = leyey.move(2, wait=True)
lsmarA.read()
lsmarB.read()
lsmarA.get()
lsmarB.get()
lsmarA.describe()
lsmarA.unstage()
lsmarA.controller.off()
# status = leyey.move(10, wait=False)
# print(lSmaract_controller)
else:
from ophyd_devices.utils.socket import SocketMock
lsmarA = SmaractMotor(
"A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
)
lsmarB = SmaractMotor(
"B", name="lsmarB", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
)
lsmarA.stage()
lsmarB.stage()
lsmarA.read()
lsmarB.read()
lsmarA.get()
lsmarB.get()
lsmarA.describe()
lsmarA.unstage()
lsmarA.controller.off()
# status = leyey.move(10, wait=False)
# print(lSmaract_controller)