Source code for csaxs_bec.devices.omny.galil.lgalil_ophyd

import threading
import time

import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected

from csaxs_bec.devices.omny.galil.galil_ophyd import (
    BECConfigError,
    GalilAxesReferenced,
    GalilController,
    GalilMotorIsMoving,
    GalilSetpointSignal,
    GalilSignalRO,
    retry_once,
)

logger = bec_logger.logger


[docs] class GalilMotorResolution(GalilSignalRO): @retry_once @threadlocked def _socket_get(self): if self.parent.axis_Id_numeric < 6: return float( self.controller.socket_put_and_receive(f"MG encpermm[{self.parent.axis_Id_numeric}]") ) else: return float( self.controller.socket_put_and_receive(f"MG stppermm[{self.parent.axis_Id_numeric}]") )
[docs] class LamniGalilController(GalilController): # ============================================================ # Error status # ============================================================ caperr_bits = { 0x01: "Cap1 outside expected left-stop range (early check)", 0x02: "Cap2 outside expected left-stop range (early check)", 0x04: "Cap1 too low during pressure-off check (near right boundary)", 0x08: "Cap2 too low during pressure-off check (near right boundary)", 0x10: "Cap1 exceeded allowed left-stop boundary during movement", 0x20: "Cap2 exceeded allowed left-stop boundary during movement (disabled in code)", 0x40: "Cap1 did not respond to test movement", 0x80: "Cap2 did not respond to test movement" } allaxrer_table = { 1: "Not all axes referenced after reference search", 2: "Pressure-loss emergency stop (pressure 14/15 active while motor C off)", 3: "Unexpected pressure OFF while soft-limits not yet set", 4: "Pressure valve mismatch (OUT13=0 but IN13=1)", 5: "Capacitive sensor boundary violations (caperr > 0)", 6: "Emergency Stop triggered (IN[5]=0)", 7: "Following error detected on one or more axes" } USER_ACCESS = [ "describe", "show_running_threads", "galil_show_all", "socket_put_and_receive", "socket_put_confirmed", "lgalil_is_air_off_and_orchestra_enabled", "drive_axis_to_limit", "find_reference", "get_motor_limit_switch", "is_motor_on", "all_axes_referenced", "lamni_lights_off", "lamni_lights_on" ]
[docs] def show_status_other(self): if self.get_digital_input(5): print("Emergency stop is not pushed.") else: print("Emergency stop is pushed.") if self.get_digital_input(6): print("Driver axis 2 error.") if self.get_digital_input(13): print("No air pressure at inner rotation.") else: print("There is air pressure at the inner rotation.") if self.get_digital_input(14): print("No air pressure at outer rotation axial.") else: print("There is air pressure at the outer rotation axial.") if self.get_digital_input(15): print("No air pressure at outer rotation radial.") else: print("There is air pressure at the outer rotation radial.") swver = float(self.socket_put_and_receive("MGswver")) print(f"Lgalil LAMNI firmware version {swver:2.0f}.") allaxref = int(float(self.socket_put_and_receive("MGallaxref"))) print(f"Error statuts:") if allaxref == 1: print(f"Allaxref = 1, all OK.") else: print(f"Allaxref = {allaxref}. Not all axes are referenced or error introduced preventing motion.") allaxrer = int(float(self.socket_put_and_receive("MGallaxrer"))) print("\nallaxrer =", allaxrer) print(self.decode_allaxrer(allaxrer)) caperr = int(float(self.socket_put_and_receive("MGcaperr"))) print("\nDecoding caperr =", caperr) self.visualize_caperr(caperr)
[docs] def decode_allaxrer(self, code: int) -> str: """Return human-readable meaning of allaxrer code.""" return self.allaxrer_table.get(code, "Unknown allaxrer code")
[docs] def visualize_caperr(self, mask: int): """Pretty-print a bitmask visualization for caperr.""" print("\n=== CAPERR BITMASK VISUALIZER ===") print(f"Raw value: {mask} (0x{mask:02X})") print("----------------------------------\n") print("Bit | Hex | Active | Meaning") print("----------------------------------") for bit, meaning in self.caperr_bits.items(): active = "YES" if mask & bit else "no" print(f"{bit:3d} | 0x{bit:02X} | {active:6} | {meaning}") print("\nActive flags:") active_flags = [meaning for bit, meaning in self.caperr_bits.items() if mask & bit] if active_flags: for f in active_flags: print(" ✓", f) else: print(" (none)") print("\n==================================\n")
def lamni_lights_off(self): self.socket_put_confirmed("SB1") def lamni_lights_on(self): self.socket_put_confirmed("CB1") def lgalil_is_air_off_and_orchestra_enabled(self) -> bool: # TODO: move this to the LamNI-specific controller rt_not_blocked_by_galil = bool(self.socket_put_and_receive("MG@OUT[9]")) air_off = bool(self.socket_put_and_receive("MG@OUT[13]")) return rt_not_blocked_by_galil and air_off
[docs] class LamniGalilReadbackSignal(GalilSignalRO): @retry_once @threadlocked def _socket_get(self) -> float: """Get command for the readback signal Returns: float: Readback value after adjusting for sign and motor resolution. """ if self.parent.axis_Id_numeric < 6: current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}")) current_pos *= self.parent.sign encoder_resolution = self.parent.motor_resolution.get() logger.info(f"Read galil encoder position of axis {self.parent.axis_Id_numeric} to be TP {current_pos} with resolution {encoder_resolution}") return current_pos / encoder_resolution else: current_pos = float(self.controller.socket_put_and_receive(f"TD{self.parent.axis_Id}")) current_pos *= self.parent.sign step_mm = self.parent.motor_resolution.get() return current_pos / step_mm
[docs] def read(self): self._metadata["timestamp"] = time.time() val = super().read() if self.parent.axis_Id_numeric == 2: try: rt = self.parent.device_manager.devices[self.parent.rt] if rt.enabled: rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]) except KeyError: logger.warning("Failed to set RT value during readback.") return val
[docs] class LamniGalilMotor(Device, PositionerBase): USER_ACCESS = ["controller", "drive_axis_to_limit", "find_reference"] readback = Cpt(LamniGalilReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint") motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config") motor_is_moving = Cpt(GalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") all_axes_referenced = Cpt(GalilAxesReferenced, signal_name="all_axes_referenced", kind="config") high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") SUB_READBACK = "readback" SUB_CONNECTION_CHANGE = "connection_change" _default_sub = SUB_READBACK def __init__( self, axis_Id, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc2680.psi.ch", port=8081, limits=None, sign=1, socket_cls=SocketIO, device_manager=None, **kwargs, ): self.controller = LamniGalilController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.sign = sign self.tolerance = kwargs.pop("tolerance", 0.5) self.device_mapping = kwargs.pop("device_mapping", {}) self.device_manager = device_manager if len(self.device_mapping) > 0 and self.device_manager is None: raise BECConfigError( "device_mapping has been specified but the device_manager cannot be accessed." ) self.rt = self.device_mapping.get("rt", "rtx") super().__init__( prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs, ) self.readback.name = self.name self.controller.subscribe( self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE ) self._update_connection_state() # self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE) if limits is not None: assert len(limits) == 2 self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1])
[docs] def wait_for_connection(self, timeout: float = 30.0) -> bool: self.controller.on(timeout=timeout) self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self): """ The setpoint is only stored locally. After a restart, we need to update it to match the current readback value. """ self.user_setpoint.setpoint = self.readback.get()
[docs] def destroy(self): """Make sure to turn off the controller socket on destroy.""" self.controller.off(update_config=False) return super().destroy()
@property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @property def low_limit(self): return self.limits[0] @property def high_limit(self): return self.limits[1]
[docs] def check_value(self, pos): """Check that the position is within the soft limits""" low_limit, high_limit = self.limits if low_limit < high_limit and not (low_limit <= pos <= high_limit): raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs): for walk in self.walk_signals(): walk.item._metadata["connected"] = self.controller.connected def _forward_readback(self, **kwargs): kwargs.pop("sub_type") self._run_subs(sub_type="readback", **kwargs)
[docs] @raise_if_disconnected def move(self, position, wait=True, **kwargs): """Move to a specified position, optionally waiting for motion to complete. Parameters ---------- position Position to move to moved_cb : callable Call this callback when movement has finished. This callback must accept one keyword argument: 'obj' which will be set to this positioner instance. timeout : float, optional Maximum time to wait for the motion. If None, the default timeout for this positioner is used. Returns ------- status : MoveStatus Raises ------ TimeoutError When motion takes longer than `timeout` ValueError On invalid positions RuntimeError If motion fails other than timing out """ self._started_moving = False timeout = kwargs.pop("timeout", 100) status = super().move(position, timeout=timeout, **kwargs) self.user_setpoint.put(position, wait=False) def move_and_finish(): while self.motor_is_moving.get(): logger.info("motor is moving") val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.1) val = self.readback.read() success = np.isclose(val[self.name]["value"], position, atol=self.tolerance) if not success: print(" stop") self._done_moving(success=success) logger.info("Move finished") threading.Thread(target=move_and_finish, daemon=True).start() try: if wait: status_wait(status) except KeyboardInterrupt: self.stop() raise return status
@property def axis_Id(self): return self._axis_Id_alpha @axis_Id.setter def axis_Id(self, val): if isinstance(val, str): if len(val) != 1: raise ValueError("Only single-character axis_Ids are supported.") self._axis_Id_alpha = val self._axis_Id_numeric = self.controller.axis_Id_to_numeric(val) else: raise TypeError(f"Expected value of type str but received {type(val)}") @property def axis_Id_numeric(self): return self._axis_Id_numeric @axis_Id_numeric.setter def axis_Id_numeric(self, val): if isinstance(val, int): if val > 26: raise ValueError("Numeric value exceeds supported range.") self._axis_Id_alpha = self.controller.axis_Id_numeric_to_alpha(val) self._axis_Id_numeric = val else: raise TypeError(f"Expected value of type int but received {type(val)}") @property def egu(self): """The engineering units (EGU) for positions""" return "mm"
[docs] def find_reference(self): """ Find the reference of the axis. """ self.controller.find_reference(self.axis_Id_numeric) # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs] def drive_axis_to_limit(self, direction: str) -> None: """ Drive an axis to the limit in a specified direction. Args: direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. """ self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction) # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs] def stop(self, *, success=False): self.controller.stop_all_axes() return super().stop(success=success)