import functools
import threading
import time
import urllib.request
import xml.etree.ElementTree as ET
import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from csaxs_bec.devices.omny.galil.galil_ophyd import (
BECConfigError,
GalilAxesReferenced,
GalilCommunicationError,
GalilController,
GalilError,
GalilMotorIsMoving,
GalilSetpointSignal,
GalilSignalRO,
)
logger = bec_logger.logger
[docs]
def retry_once(fcn):
"""Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except (GalilCommunicationError, GalilError):
val = fcn(self, *args, **kwargs)
return val
return wrapper
[docs]
class GalilMotorResolution(GalilSignalRO):
@retry_once
@threadlocked
def _socket_get(self):
if self.controller.sock.port == 8083 and self.parent.axis_Id_numeric == 2:
# rotation stage
return 89565.8666667
else:
return 51200
[docs]
class OMNYGalilReadbackSignal(GalilSignalRO):
previous_rotation_angle = 0
ignore_glitch = True
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get()
# here we introduce an offset of 25 to the rotation axis
# when setting a position this is taken into account in the controller
# that way we just do tomography from 0 to 180 degrees
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
return (current_pos / step_mm) + 25
else:
return current_pos / step_mm
[docs]
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()
# if reading rotation stage angle
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
current_readback_value = val[self.parent.name]["value"]
# print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.")
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10):
message = f"Glitch detected in rotation stage. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}."
print(message)
self.parent.device_manager.connector.send_client_info(
message, scope="glitch detector", show_asap=True
)
val = super().read()
current_readback_value = val[self.parent.name]["value"]
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10):
message = f"Glitch detected in rotation stage second read. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}. Disabling the controller."
print(message)
self.parent.device_manager.connector.send_client_info(
message, scope="glitch detector", show_asap=True
)
self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed(
"allaxref=0"
)
self.parent.device_manager.devices["osamroy"].obj.enabled = False
return val
self.previous_rotation_angle = current_readback_value
try:
rt = self.parent.device_manager.devices["rtx"]
if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"] - 25 + 54)
except KeyError:
logger.warning("Failed to set RT value during ogalil readback.")
return val
[docs]
class OMNYGalilController(GalilController):
USER_ACCESS = [
"describe",
"show_running_threads",
"galil_show_all",
"socket_put_and_receive",
"socket_put_confirmed",
"get_motor_limit_switch",
"is_motor_on",
"all_axes_referenced",
"_ogalil_switchsocket",
"_ogalil_switchsocket_switch_all_on",
"_ogalil_switchsocket_status",
"_ogalil_switchsocket_are_all_on",
"_ogalil_folerr_not_ignore",
]
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
[docs]
def on(self, timeout: int = 10) -> None:
"""Open a new socket connection to the controller"""
self._ogalil_switchsocket_switch_all_on()
time.sleep(0.3)
super().on(timeout=timeout)
def _ogalil_switchsocket(self, number: int, switch: bool):
# number is socket number ranging from 1 to 4
# switch is either 0 or 1
if number not in range(1, 5):
raise Exception("Socket number ranges from 1 to 4")
else:
contents = urllib.request.urlopen(
f"http://mpc3217:8091/netio.cgi?pass=24A42C3929C5&output{number}={switch}"
).read()
# print(contents)
if b"OK" in contents:
print(f"Controller number switchsocket {number} is now {switch}")
return 1
else:
print(f"Failed to switch controller number {number}")
return 0
def _ogalil_switchsocket_are_all_on(self):
if self._ogalil_switchsocket_status() == [1, 1, 1, 1]:
return 1
else:
return 0
def _ogalil_tempdisabledebug(self):
# sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],"WH\r\n") < 1) {
# ret_str = sock_get(_ogalil_debugging_host_and_port_str[ogalil_no][],_ogalil_prompt_str)
# printf("%s _ogalil_redirect_debug_output(%d): expected \"IHx\" received \"%s\"\n",\
# cmd_str = sprintf("CF %s\r\n",substr(ret_str,3,1))
# if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) {
# cmd_str = sprintf("CW 2\r\n")
# if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) {
print("not yet implemented")
return 0
def _ogalil_folerr_reset_and_ignore(self, axis_id_numeric: int) -> None:
self.socket_put_confirmed(f"folaxerr[{axis_id_numeric}]=0")
self.socket_put_confirmed("folerr=0")
self.socket_put_confirmed("IgNoFol=1")
self.socket_put_confirmed("XQ#STOP,1")
def _ogalil_set_axis_to_pos_wo_reference_search(
self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign
):
self.socket_put_confirmed("IgNoFol=1")
# pos_mm = pos_encoder / motor_resolution
pos_encoder = pos_mm * motor_resolution * motor_sign
# print(motor_resolution)
self.socket_put_confirmed(f"DE{axis_id}={pos_encoder:.0f}")
self.socket_put_confirmed(f"DP{axis_id}=_TP{axis_id}*ratio[{axis_id_numeric:.0f}]")
self.socket_put_confirmed(f"folaxerr[{axis_id_numeric}]=0")
self.socket_put_confirmed(f"axisref[{axis_id_numeric}]=1")
self.socket_put_confirmed("folerr=0")
self._ogalil_folerr_not_ignore()
def _ogalil_folerr_not_ignore(self):
self.socket_put_confirmed("IgNoFol=0")
def _ogalil_switchsocket_switch_all_on(self):
if not self._ogalil_switchsocket_are_all_on():
for j in range(1, 5):
self._ogalil_switchsocket(j, 1)
time.sleep(0.4)
def _ogalil_switchsocket_status(self):
contents = urllib.request.urlopen("http://mpc3217:8091/netio.xml").read()
root = ET.fromstring(contents)
returnvalue = []
for j in range(0, 4):
status = int(root[1][j][2].text)
returnvalue.append(int(root[1][j][2].text))
if status:
print(f"Controller {j+1} is ON")
else:
print(f"Controller {j+1} is OFF")
return returnvalue
[docs]
def show_status_other(self):
swver = float(self.socket_put_and_receive("MGswver"))
allaxref = float(self.socket_put_and_receive("MGallaxref"))
tempab = float(self.socket_put_and_receive("MGtempab"))
timeab = float(self.socket_put_and_receive("MGtimeab"))
IgNoFol = float(self.socket_put_and_receive("MGIgNoFol"))
print(
f"OMNY galil firmware {swver:2.0f}, TempAbort: {tempab:1.0f}, Allaxref: {allaxref:1.0f}, TimeAbort: {timeab:1.0f}, Ignore Folerr: {IgNoFol:1.0f}\n"
)
if self.sock.port == 8083:
self._ogalil_switchsocket_status()
[docs]
class OMNYGalilMotor(Device, PositionerBase):
USER_ACCESS = [
"controller",
"find_reference",
"omny_osamx_to_scan_center",
"drive_axis_to_limit",
"_ogalil_folerr_reset_and_ignore",
"_ogalil_set_axis_to_pos_wo_reference_search",
"get_motor_limit_switch",
"axis_is_referenced",
"get_motor_temperature",
"folerr_status",
]
readback = Cpt(OMNYGalilReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config")
motor_is_moving = Cpt(GalilMotorIsMoving, signal_name="motor_is_moving", kind="normal")
all_axes_referenced = Cpt(GalilAxesReferenced, signal_name="all_axes_referenced", kind="config")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc3217.psi.ch",
port=8081,
limits=None,
sign=1,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
self.controller = OMNYGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.sign = sign
self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_mapping = kwargs.pop("device_mapping", {})
self.device_manager = device_manager
if len(self.device_mapping) > 0 and self.device_manager is None:
raise BECConfigError(
"device_mapping has been specified but the device_manager cannot be accessed."
)
self.rt = self.device_mapping.get("rt")
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
[docs]
def wait_for_connection(self, timeout: float = 30.0) -> bool:
self.controller.on(timeout=timeout)
self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self):
"""
The setpoint is only stored locally. After a restart,
we need to update it to match the current readback value.
"""
self.user_setpoint.setpoint = self.readback.get()
[docs]
def destroy(self):
"""Make sure to turn off the controller socket on destroy."""
self.controller.off(update_config=False)
return super().destroy()
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
[docs]
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
def _forward_readback(self, **kwargs):
kwargs.pop("sub_type")
self._run_subs(sub_type="readback", **kwargs)
[docs]
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 100)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
while self.motor_is_moving.get():
logger.info("motor is moving")
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(0.1)
val = self.readback.read()
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
if not success:
print(" stop")
self._done_moving(success=success)
logger.info("Move finished")
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError("Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = self.controller.axis_Id_to_numeric(val)
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val > 26:
raise ValueError("Numeric value exceeds supported range.")
self._axis_Id_alpha = self.controller.axis_Id_numeric_to_alpha(val)
self._axis_Id_numeric = val
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "mm"
def _ogalil_folerr_reset_and_ignore(self):
# pylint: disable=protected-access
self.controller._ogalil_folerr_reset_and_ignore(self.axis_Id_numeric)
def _ogalil_set_axis_to_pos_wo_reference_search(self, pos_mm):
motor_resolution = self.motor_resolution.get()
self.controller._ogalil_set_axis_to_pos_wo_reference_search(
self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign
)
# now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs]
def find_reference(self, raise_error=1):
"""
Find the reference of the axis.
"""
verbose = 1
self.controller.find_reference(self.axis_Id_numeric, verbose, raise_error)
# now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs]
def drive_axis_to_limit(self, direction: str) -> None:
"""
Drive an axis to the limit in a specified direction.
Args:
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
"""
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction, verbose=1)
# now force position read to cache
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs]
def get_motor_limit_switch(self) -> list:
"""
Get status of the motor limit switches
"""
return self.controller.get_motor_limit_switch(self.axis_Id)
[docs]
def get_motor_temperature(self) -> float:
"""
Get motor temperature
"""
return self.controller.motor_temperature(self.axis_Id_numeric)
[docs]
def axis_is_referenced(self) -> bool:
"""
check if an axis is referenced
"""
return self.controller.axis_is_referenced(self.axis_Id_numeric)
def _get_user_param_safe(self, device, var):
param = self.device_manager.devices[device].user_parameter
if not param or param.get(var) is None:
raise ValueError(f"Device {device} has no user parameter definition for {var}.")
return param.get(var)
def omny_osamx_to_scan_center(self, cenx):
if self.controller.sock.port == 8082 and self.axis_Id_numeric == 0:
# get last setpoint
osamx = self.device_manager.devices["osamx"]
osamx_current_setpoint = osamx.obj.readback.get()
omny_samx_in = self._get_user_param_safe("osamx", "in")
if np.fabs(osamx_current_setpoint - (omny_samx_in + cenx / 1000)) > 0.025:
message = (
f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}."
)
logger.info(message)
osamx.read_only = False
# osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')")
osamx.set(omny_samx_in + cenx / 1000)
time.sleep(0.1)
while osamx.motor_is_moving.get():
time.sleep(0.05)
osamx.read_only = True
time.sleep(2)
rt = self.device_manager.devices["rtx"]
if rt.enabled:
rt.obj.controller.laser_tracker_on()
rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength()
def folerr_status(self) -> bool:
return self.controller.folerr_status(self.axis_Id_numeric)
[docs]
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)