Source code for csaxs_bec.devices.omny.galil.ogalil_ophyd

import functools
import threading
import time
import urllib.request
import xml.etree.ElementTree as ET

import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected

from csaxs_bec.devices.omny.galil.galil_ophyd import (
    BECConfigError,
    GalilAxesReferenced,
    GalilCommunicationError,
    GalilController,
    GalilError,
    GalilMotorIsMoving,
    GalilSetpointSignal,
    GalilSignalRO,
)

logger = bec_logger.logger


[docs] def retry_once(fcn): """Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty.""" @functools.wraps(fcn) def wrapper(self, *args, **kwargs): try: val = fcn(self, *args, **kwargs) except (GalilCommunicationError, GalilError): val = fcn(self, *args, **kwargs) return val return wrapper
[docs] class GalilMotorResolution(GalilSignalRO): @retry_once @threadlocked def _socket_get(self): if self.controller.sock.port == 8083 and self.parent.axis_Id_numeric == 2: # rotation stage return 89565.8666667 else: return 51200
[docs] class OMNYGalilReadbackSignal(GalilSignalRO): previous_rotation_angle = 0 ignore_glitch = True @retry_once @threadlocked def _socket_get(self) -> float: """Get command for the readback signal Returns: float: Readback value after adjusting for sign and motor resolution. """ current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}")) current_pos *= self.parent.sign step_mm = self.parent.motor_resolution.get() # here we introduce an offset of 25 to the rotation axis # when setting a position this is taken into account in the controller # that way we just do tomography from 0 to 180 degrees if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083: return (current_pos / step_mm) + 25 else: return current_pos / step_mm
[docs] def read(self): self._metadata["timestamp"] = time.time() val = super().read() # if reading rotation stage angle if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083: current_readback_value = val[self.parent.name]["value"] # print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.") if np.fabs((self.previous_rotation_angle - current_readback_value) > 10): message = f"Glitch detected in rotation stage. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}." print(message) self.parent.device_manager.connector.send_client_info( message, scope="glitch detector", show_asap=True ) val = super().read() current_readback_value = val[self.parent.name]["value"] if np.fabs((self.previous_rotation_angle - current_readback_value) > 10): message = f"Glitch detected in rotation stage second read. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}. Disabling the controller." print(message) self.parent.device_manager.connector.send_client_info( message, scope="glitch detector", show_asap=True ) self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed( "allaxref=0" ) self.parent.device_manager.devices["osamroy"].obj.enabled = False return val self.previous_rotation_angle = current_readback_value try: rt = self.parent.device_manager.devices["rtx"] if rt.enabled: rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"] - 25 + 54) except KeyError: logger.warning("Failed to set RT value during ogalil readback.") return val
[docs] class OMNYGalilController(GalilController): USER_ACCESS = [ "describe", "show_running_threads", "galil_show_all", "socket_put_and_receive", "socket_put_confirmed", "get_motor_limit_switch", "is_motor_on", "all_axes_referenced", "_ogalil_switchsocket", "_ogalil_switchsocket_switch_all_on", "_ogalil_switchsocket_status", "_ogalil_switchsocket_are_all_on", "_ogalil_folerr_not_ignore", ] OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m"
[docs] def on(self, timeout: int = 10) -> None: """Open a new socket connection to the controller""" self._ogalil_switchsocket_switch_all_on() time.sleep(0.3) super().on(timeout=timeout)
def _ogalil_switchsocket(self, number: int, switch: bool): # number is socket number ranging from 1 to 4 # switch is either 0 or 1 if number not in range(1, 5): raise Exception("Socket number ranges from 1 to 4") else: contents = urllib.request.urlopen( f"http://mpc3217:8091/netio.cgi?pass=24A42C3929C5&output{number}={switch}" ).read() # print(contents) if b"OK" in contents: print(f"Controller number switchsocket {number} is now {switch}") return 1 else: print(f"Failed to switch controller number {number}") return 0 def _ogalil_switchsocket_are_all_on(self): if self._ogalil_switchsocket_status() == [1, 1, 1, 1]: return 1 else: return 0 def _ogalil_tempdisabledebug(self): # sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],"WH\r\n") < 1) { # ret_str = sock_get(_ogalil_debugging_host_and_port_str[ogalil_no][],_ogalil_prompt_str) # printf("%s _ogalil_redirect_debug_output(%d): expected \"IHx\" received \"%s\"\n",\ # cmd_str = sprintf("CF %s\r\n",substr(ret_str,3,1)) # if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) { # cmd_str = sprintf("CW 2\r\n") # if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) { print("not yet implemented") return 0 def _ogalil_folerr_reset_and_ignore(self, axis_id_numeric: int) -> None: self.socket_put_confirmed(f"folaxerr[{axis_id_numeric}]=0") self.socket_put_confirmed("folerr=0") self.socket_put_confirmed("IgNoFol=1") self.socket_put_confirmed("XQ#STOP,1") def _ogalil_set_axis_to_pos_wo_reference_search( self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign ): self.socket_put_confirmed("IgNoFol=1") # pos_mm = pos_encoder / motor_resolution pos_encoder = pos_mm * motor_resolution * motor_sign # print(motor_resolution) self.socket_put_confirmed(f"DE{axis_id}={pos_encoder:.0f}") self.socket_put_confirmed(f"DP{axis_id}=_TP{axis_id}*ratio[{axis_id_numeric:.0f}]") self.socket_put_confirmed(f"folaxerr[{axis_id_numeric}]=0") self.socket_put_confirmed(f"axisref[{axis_id_numeric}]=1") self.socket_put_confirmed("folerr=0") self._ogalil_folerr_not_ignore() def _ogalil_folerr_not_ignore(self): self.socket_put_confirmed("IgNoFol=0") def _ogalil_switchsocket_switch_all_on(self): if not self._ogalil_switchsocket_are_all_on(): for j in range(1, 5): self._ogalil_switchsocket(j, 1) time.sleep(0.4) def _ogalil_switchsocket_status(self): contents = urllib.request.urlopen("http://mpc3217:8091/netio.xml").read() root = ET.fromstring(contents) returnvalue = [] for j in range(0, 4): status = int(root[1][j][2].text) returnvalue.append(int(root[1][j][2].text)) if status: print(f"Controller {j+1} is ON") else: print(f"Controller {j+1} is OFF") return returnvalue
[docs] def show_status_other(self): swver = float(self.socket_put_and_receive("MGswver")) allaxref = float(self.socket_put_and_receive("MGallaxref")) tempab = float(self.socket_put_and_receive("MGtempab")) timeab = float(self.socket_put_and_receive("MGtimeab")) IgNoFol = float(self.socket_put_and_receive("MGIgNoFol")) print( f"OMNY galil firmware {swver:2.0f}, TempAbort: {tempab:1.0f}, Allaxref: {allaxref:1.0f}, TimeAbort: {timeab:1.0f}, Ignore Folerr: {IgNoFol:1.0f}\n" ) if self.sock.port == 8083: self._ogalil_switchsocket_status()
[docs] class OMNYGalilMotor(Device, PositionerBase): USER_ACCESS = [ "controller", "find_reference", "omny_osamx_to_scan_center", "drive_axis_to_limit", "_ogalil_folerr_reset_and_ignore", "_ogalil_set_axis_to_pos_wo_reference_search", "get_motor_limit_switch", "axis_is_referenced", "get_motor_temperature", "folerr_status", ] readback = Cpt(OMNYGalilReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint") motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config") motor_is_moving = Cpt(GalilMotorIsMoving, signal_name="motor_is_moving", kind="normal") all_axes_referenced = Cpt(GalilAxesReferenced, signal_name="all_axes_referenced", kind="config") high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") SUB_READBACK = "readback" SUB_CONNECTION_CHANGE = "connection_change" _default_sub = SUB_READBACK def __init__( self, axis_Id, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc3217.psi.ch", port=8081, limits=None, sign=1, socket_cls=SocketIO, device_manager=None, **kwargs, ): self.controller = OMNYGalilController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.axis_Id = axis_Id self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.sign = sign self.tolerance = kwargs.pop("tolerance", 0.5) self.device_mapping = kwargs.pop("device_mapping", {}) self.device_manager = device_manager if len(self.device_mapping) > 0 and self.device_manager is None: raise BECConfigError( "device_mapping has been specified but the device_manager cannot be accessed." ) self.rt = self.device_mapping.get("rt") super().__init__( prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs, ) self.readback.name = self.name self.controller.subscribe( self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE ) self._update_connection_state() # self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE) if limits is not None: assert len(limits) == 2 self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1])
[docs] def wait_for_connection(self, timeout: float = 30.0) -> bool: self.controller.on(timeout=timeout) self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self): """ The setpoint is only stored locally. After a restart, we need to update it to match the current readback value. """ self.user_setpoint.setpoint = self.readback.get()
[docs] def destroy(self): """Make sure to turn off the controller socket on destroy.""" self.controller.off(update_config=False) return super().destroy()
@property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @property def low_limit(self): return self.limits[0] @property def high_limit(self): return self.limits[1]
[docs] def check_value(self, pos): """Check that the position is within the soft limits""" low_limit, high_limit = self.limits if low_limit < high_limit and not (low_limit <= pos <= high_limit): raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs): for walk in self.walk_signals(): walk.item._metadata["connected"] = self.controller.connected def _forward_readback(self, **kwargs): kwargs.pop("sub_type") self._run_subs(sub_type="readback", **kwargs)
[docs] @raise_if_disconnected def move(self, position, wait=True, **kwargs): """Move to a specified position, optionally waiting for motion to complete. Parameters ---------- position Position to move to moved_cb : callable Call this callback when movement has finished. This callback must accept one keyword argument: 'obj' which will be set to this positioner instance. timeout : float, optional Maximum time to wait for the motion. If None, the default timeout for this positioner is used. Returns ------- status : MoveStatus Raises ------ TimeoutError When motion takes longer than `timeout` ValueError On invalid positions RuntimeError If motion fails other than timing out """ self._started_moving = False timeout = kwargs.pop("timeout", 100) status = super().move(position, timeout=timeout, **kwargs) self.user_setpoint.put(position, wait=False) def move_and_finish(): while self.motor_is_moving.get(): logger.info("motor is moving") val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.1) val = self.readback.read() success = np.isclose(val[self.name]["value"], position, atol=self.tolerance) if not success: print(" stop") self._done_moving(success=success) logger.info("Move finished") threading.Thread(target=move_and_finish, daemon=True).start() try: if wait: status_wait(status) except KeyboardInterrupt: self.stop() raise return status
@property def axis_Id(self): return self._axis_Id_alpha @axis_Id.setter def axis_Id(self, val): if isinstance(val, str): if len(val) != 1: raise ValueError("Only single-character axis_Ids are supported.") self._axis_Id_alpha = val self._axis_Id_numeric = self.controller.axis_Id_to_numeric(val) else: raise TypeError(f"Expected value of type str but received {type(val)}") @property def axis_Id_numeric(self): return self._axis_Id_numeric @axis_Id_numeric.setter def axis_Id_numeric(self, val): if isinstance(val, int): if val > 26: raise ValueError("Numeric value exceeds supported range.") self._axis_Id_alpha = self.controller.axis_Id_numeric_to_alpha(val) self._axis_Id_numeric = val else: raise TypeError(f"Expected value of type int but received {type(val)}") @property def egu(self): """The engineering units (EGU) for positions""" return "mm" def _ogalil_folerr_reset_and_ignore(self): # pylint: disable=protected-access self.controller._ogalil_folerr_reset_and_ignore(self.axis_Id_numeric) def _ogalil_set_axis_to_pos_wo_reference_search(self, pos_mm): motor_resolution = self.motor_resolution.get() self.controller._ogalil_set_axis_to_pos_wo_reference_search( self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign ) # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs] def find_reference(self, raise_error=1): """ Find the reference of the axis. """ verbose = 1 self.controller.find_reference(self.axis_Id_numeric, verbose, raise_error) # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs] def drive_axis_to_limit(self, direction: str) -> None: """ Drive an axis to the limit in a specified direction. Args: direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. """ self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction, verbose=1) # now force position read to cache val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
[docs] def get_motor_limit_switch(self) -> list: """ Get status of the motor limit switches """ return self.controller.get_motor_limit_switch(self.axis_Id)
[docs] def get_motor_temperature(self) -> float: """ Get motor temperature """ return self.controller.motor_temperature(self.axis_Id_numeric)
[docs] def axis_is_referenced(self) -> bool: """ check if an axis is referenced """ return self.controller.axis_is_referenced(self.axis_Id_numeric)
def _get_user_param_safe(self, device, var): param = self.device_manager.devices[device].user_parameter if not param or param.get(var) is None: raise ValueError(f"Device {device} has no user parameter definition for {var}.") return param.get(var) def omny_osamx_to_scan_center(self, cenx): if self.controller.sock.port == 8082 and self.axis_Id_numeric == 0: # get last setpoint osamx = self.device_manager.devices["osamx"] osamx_current_setpoint = osamx.obj.readback.get() omny_samx_in = self._get_user_param_safe("osamx", "in") if np.fabs(osamx_current_setpoint - (omny_samx_in + cenx / 1000)) > 0.025: message = ( f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}." ) logger.info(message) osamx.read_only = False # osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')") osamx.set(omny_samx_in + cenx / 1000) time.sleep(0.1) while osamx.motor_is_moving.get(): time.sleep(0.05) osamx.read_only = True time.sleep(2) rt = self.device_manager.devices["rtx"] if rt.enabled: rt.obj.controller.laser_tracker_on() rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength() def folerr_status(self) -> bool: return self.controller.folerr_status(self.axis_Id_numeric)
[docs] def stop(self, *, success=False): self.controller.stop_all_axes() return super().stop(success=success)