Source code for csaxs_bec.devices.omny.galil.galil_ophyd

"""
This module contains the base class for Galil controllers as well as the signals used for Galil devices.
"""

import functools
import time
from typing import Any

from bec_lib import bec_logger
from ophyd.utils import ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketSignal
from prettytable import PrettyTable

logger = bec_logger.logger


[docs] class GalilCommunicationError(Exception): pass
[docs] class GalilError(Exception): pass
[docs] class BECConfigError(Exception): pass
[docs] def retry_once(fcn): """Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty.""" @functools.wraps(fcn) def wrapper(self, *args, **kwargs): try: val = fcn(self, *args, **kwargs) except (GalilCommunicationError, GalilError): val = fcn(self, *args, **kwargs) return val return wrapper
[docs] class GalilController(Controller): """ Base class for Galil controllers. This class provides the basic functionality for Galil controllers and should be subclassed for specific devices. """ _axes_per_controller = 8 USER_ACCESS = [ "describe", "show_running_threads", "galil_show_all", "socket_put_and_receive", "socket_put_confirmed", "drive_axis_to_limit", "find_reference", "get_motor_limit_switch", "is_motor_on", "is_thread_active", "all_axes_referenced", ] OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m"
[docs] @threadlocked def socket_put(self, val: str) -> None: self.sock.put(f"{val}\r".encode())
[docs] @retry_once def socket_put_confirmed(self, val: str) -> None: """Send message to controller and ensure that it is received by checking that the socket receives a colon. Args: val (str): Message that should be sent to the socket Raises: GalilCommunicationError: Raised if the return value is not a colon. """ return_val = self.socket_put_and_receive(val) if return_val != ":": raise GalilCommunicationError( f"Sent {val} and expected return value of ':' but instead received {return_val}" )
def is_axis_moving(self, axis_Id, axis_Id_numeric) -> bool: if axis_Id is None and axis_Id_numeric is not None: axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0) backlash_is_active = bool( float(self.socket_put_and_receive(f"MGbcklact[{axis_Id_numeric}]")) != 0 ) return bool( is_moving or backlash_is_active or self.is_thread_active(0) or self.is_thread_active(2) ) def is_thread_active(self, thread_id: int) -> bool: val = float(self.socket_put_and_receive(f"MG_XQ{thread_id}")) if val == -1: return False return True def stop_all_axes(self) -> str: if not self.is_thread_active(1): return self.socket_put_and_receive("XQ#STOP,1") else: return ":" def get_digital_input(self, channel): return bool(float(self.socket_put_and_receive(f"MG @IN[{channel}]").strip())) def axis_is_referenced(self, axis_Id_numeric) -> bool: return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip())) def folerr_status(self, axis_Id_numeric) -> bool: return bool(float(self.socket_put_and_receive(f"MG folaxerr[{axis_Id_numeric}]").strip())) def motor_temperature(self, axis_Id_numeric) -> float: # this is only valid for omny. consider moving to ogalil voltage = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip()) voltage2 = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip()) if voltage2 < voltage: voltage = voltage2 # convert from [-10,10]V to [0,300]degC temperature_degC = round((voltage + 10.0) / 20.0 * 300.0, 1) # the motors of the parking station have a different offset # the range is reduced, so if at the limit, we show an extreme value if self.sock.port == 8082: # controller 2 if axis_Id_numeric == 6: temperature_degC = round((voltage + 10.0 - 11.4) / 20.0 * 300.0, 1) if voltage > 9.9: temperature_degC = 300 if axis_Id_numeric == 7: temperature_degC = round((voltage + 0.0 - 12) / 20.0 * 300.0, 1) if voltage > 9.9: temperature_degC = 300 return temperature_degC
[docs] def all_axes_referenced(self) -> bool: """ Check if all axes are referenced. """ return bool(float(self.socket_put_and_receive("MG allaxref").strip()))
def _omny_get_microstep_position(self, axis_Id): return float(self.socket_put_and_receive(f"MG _TD{axis_Id}").strip()) def _omny_get_reference_limit(self, axis_Id): get_axis_no = float(self.socket_put_and_receive(f"MG frmmv").strip()) if get_axis_no > 0: reference_is_before = float(self.socket_put_and_receive(f"MG _FL{axis_Id}").strip()) elif get_axis_no < 0: reference_is_before = float(self.socket_put_and_receive(f"MG _BL{axis_Id}").strip()) else: reference_is_before = 0 return reference_is_before
[docs] def drive_axis_to_limit(self, axis_Id_numeric: int, direction: str, verbose=0) -> None: """ Drive an axis to the limit in a specified direction. Args: axis_Id_numeric (int): Axis number direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. """ if direction == "forward": direction_flag = 1 elif direction == "reverse": direction_flag = -1 else: raise ValueError(f"Invalid direction {direction}") self.socket_put_confirmed(f"naxis={axis_Id_numeric}") self.socket_put_confirmed(f"ndir={direction_flag}") self.socket_put_confirmed("XQ#NEWPAR") time.sleep(0.1) self.socket_put_confirmed("XQ#FES") time.sleep(0.1) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) while self.is_axis_moving(None, axis_Id_numeric): time.sleep(0.01) if verbose: self.device_manager.connector.send_client_info( f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f}", scope="drive axis to limit", show_asap=True, ) time.sleep(0.5) # check if we actually hit the limit if direction == "forward": limit = self.get_motor_limit_switch(axis_Id)[1] elif direction == "reverse": limit = self.get_motor_limit_switch(axis_Id)[0] if not limit: raise GalilError(f"Failed to drive axis {axis_Id}/{axis_Id_numeric} to limit.") else: print("Limit reached.")
[docs] def find_reference(self, axis_Id_numeric: int, verbose=0, raise_error=1) -> None: """ Find the reference of an axis. Args: axis_Id_numeric (int): Axis number """ time.sleep(0.1) self.socket_put_confirmed(f"naxis={axis_Id_numeric}") self.socket_put_and_receive("XQ#NEWPAR") time.sleep(0.1) self.socket_put_confirmed("XQ#FRM") time.sleep(0.1) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) while self.is_axis_moving(None, axis_Id_numeric): time.sleep(0.1) if verbose: self.device_manager.connector.send_client_info( f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f} reference is before {self._omny_get_reference_limit(axis_Id)}", scope="find axis reference", show_asap=True, ) time.sleep(0.5) if not self.axis_is_referenced(axis_Id_numeric): if raise_error: raise GalilError(f"Failed to find reference of axis {axis_Id_numeric}.") else: print(f"Failed to find reference of axis {axis_Id_numeric}.") else: logger.info(f"Successfully found reference of axis {axis_Id_numeric}.") print(f"Successfully found reference of axis {axis_Id_numeric}.")
def show_running_threads(self) -> None: t = PrettyTable() t.title = f"Threads on {self.sock.host}:{self.sock.port}" t.field_names = [str(ax) for ax in range(self._axes_per_controller)] t.add_row( [ "active" if self.is_thread_active(t) else "inactive" for t in range(self._axes_per_controller) ] ) print(t) def is_motor_on(self, axis_Id) -> bool: return not bool(float(self.socket_put_and_receive(f"MG _MO{axis_Id}").strip()))
[docs] def get_motor_limit_switch(self, axis_Id) -> list: """ Get the status of the motor limit switches. Args: axis_Id (str): Axis identifier (e.g. 'A', 'B', 'C', ...) Returns: list: List of two booleans indicating if the low and high limit switch is active, respectively. """ ret = self.socket_put_and_receive(f"MG _LR{axis_Id}, _LF{axis_Id}") low, high = ret.strip().split(" ") return [not bool(float(low)), not bool(float(high))]
def describe(self) -> None: t = PrettyTable() t.title = f"{self.__class__.__name__} on {self.sock.host}:{self.sock.port}" field_names = ["Axis", "Name", "Referenced", "Motor On", "Limits", "Position"] # in case of OMNY if self.sock.host == "mpc3217.psi.ch": field_names.append("Temperature") field_names.append("FolErr") t.field_names = field_names for ax in range(self._axes_per_controller): axis = self._axis[ax] if axis is not None: if self.sock.host == "mpc3217.psi.ch": # case of omny. possibly consider moving to ogalil motor_on = self.is_motor_on(axis.axis_Id) if motor_on == True: motor_on = self.WARNING + "ON" + self.ENDC else: motor_on = "OFF" folerr_status = self.folerr_status(axis.axis_Id_numeric) if folerr_status == True: folerr_status = self.WARNING + "True" + self.ENDC else: folerr_status = "False" position = axis.readback.read().get(axis.name).get("value") position = f"{position:.3f}" t.add_row( [ f"{axis.axis_Id_numeric}/{axis.axis_Id}", axis.name, self.axis_is_referenced(axis.axis_Id_numeric), motor_on, self.get_motor_limit_switch(axis.axis_Id), position, self.motor_temperature(axis.axis_Id_numeric), self.folerr_status(axis.axis_Id_numeric), ] ) else: t.add_row( [ f"{axis.axis_Id_numeric}/{axis.axis_Id}", axis.name, self.axis_is_referenced(axis.axis_Id_numeric), self.is_motor_on(axis.axis_Id), self.get_motor_limit_switch(axis.axis_Id), axis.readback.read().get(axis.name).get("value"), ] ) else: t.add_row([None for t in t.field_names]) print(t) self.show_running_threads() self.show_status_other()
[docs] def show_status_other(self) -> None: """ Show additional device-specific status information. Override in subclasses. """
def galil_show_all(self) -> None: for controller in self._controller_instances.values(): if isinstance(controller, GalilController): controller.describe() @staticmethod def axis_Id_to_numeric(axis_Id: str) -> int: return ord(axis_Id.lower()) - 97 @staticmethod def axis_Id_numeric_to_alpha(axis_Id_numeric: int) -> str: return (chr(axis_Id_numeric + 97)).capitalize()
[docs] class GalilSignalBase(SocketSignal): def __init__(self, signal_name, **kwargs): self.signal_name = signal_name super().__init__(**kwargs) self.controller = self.root.controller if hasattr(self.root, "controller") else None
[docs] class GalilSignalRO(GalilSignalBase): def __init__(self, signal_name, **kwargs): super().__init__(signal_name, **kwargs) self._metadata["write_access"] = False def _socket_set(self, val): raise ReadOnlyError("Read-only signals cannot be set")
[docs] class GalilReadbackSignal(GalilSignalRO): @retry_once @threadlocked def _socket_get(self) -> float: """Get command for the readback signal Returns: float: Readback value after adjusting for sign and motor resolution. """ current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}")) current_pos *= self.parent.sign step_mm = self.parent.motor_resolution.get() return current_pos / step_mm
[docs] def read(self): self._metadata["timestamp"] = time.time() val = super().read() return val
[docs] class GalilSetpointSignal(GalilSignalBase): def __init__(self, signal_name, **kwargs): super().__init__(signal_name, **kwargs) self.setpoint = 0.0 def _socket_get(self) -> float: """Get command for receiving the setpoint / target value. The value is not pulled from the controller but instead just the last setpoint used. Returns: float: setpoint / target value """ return self.setpoint * self.parent.sign @retry_once @threadlocked def _socket_set(self, val: float) -> None: """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. Args: val (float): Target value / setpoint value Raises: GalilError: Raised if not all axes are referenced. """ target_val = val * self.parent.sign self.setpoint = target_val axes_referenced = self.controller.all_axes_referenced() if axes_referenced: while self.controller.is_thread_active(0): time.sleep(0.1) # in the case of lamni, consider moving to lgalil if self.parent.axis_Id_numeric == 2 and self.controller.sock.host == "mpc2680.psi.ch": try: rt = self.parent.device_manager.devices[self.parent.rt] if rt.enabled: angle_status = self.parent.device_manager.devices[ self.parent.rt ].obj.controller.feedback_status_angle_lamni() if angle_status: self.controller.socket_put_confirmed("angintf=1") except KeyError: logger.warning( "RT is disabled. Failed to update RT angle interferometer status to galil." ) self.controller.socket_put_confirmed(f"naxis={self.parent.axis_Id_numeric}") self.controller.socket_put_confirmed(f"ntarget={target_val:.3f}") self.controller.socket_put_confirmed("movereq=1") self.controller.socket_put_confirmed("XQ#NEWPAR") while self.controller.is_thread_active(0): time.sleep(0.005) else: raise GalilError("Not all axes are referenced.")
[docs] class GalilMotorResolution(GalilSignalRO): @retry_once @threadlocked def _socket_get(self): return float( self.controller.socket_put_and_receive(f"MG stppermm[{self.parent.axis_Id_numeric}]") )
[docs] class GalilMotorIsMoving(GalilSignalRO): @threadlocked def _socket_get(self): return self.controller.is_axis_moving(self.parent.axis_Id, self.parent.axis_Id_numeric)
[docs] def get(self): val = super().get() if val is not None: self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time()) return val
[docs] class GalilAxesReferenced(GalilSignalRO): @threadlocked def _socket_get(self): return self.controller.all_axes_referenced()