Source code for csaxs_bec.devices.omny.rt.rt_omny_ophyd

import builtins
import socket
import threading
import time

import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable

if builtins.__dict__.get("bec") is not None:
    bec = builtins.__dict__.get("bec")
    dev = builtins.__dict__.get("dev")
    umv = builtins.__dict__.get("umv")
    umvr = builtins.__dict__.get("umvr")


from csaxs_bec.devices.omny.rt.rt_ophyd import (
    RtCommunicationError,
    RtError,
    RtReadbackSignal,
    RtSetpointSignal,
    RtSignalRO,
    retry_once,
)

logger = bec_logger.logger


[docs] class RtOMNY_mirror_switchbox_Error(Exception): pass
[docs] class RtOMNY_Error(Exception): pass
[docs] class RtOMNYController(Controller): _axes_per_controller = 3 red = "\x1b[91m" white = "\x1b[0m" USER_ACCESS = [ "socket_put", "socket_put_and_receive", "set_rotation_angle", "feedback_disable", "feedback_enable_with_reset", "feedback_is_running", "add_pos_to_scan", "get_pid_x", "move_samx_to_scan_region", "clear_trajectory_generator", "show_cyclic_error_compensation", "laser_tracker_on", "laser_tracker_off", "laser_tracker_show_all", "show_signal_strength_interferometer", "read_ssi_interferometer", "laser_tracker_check_signalstrength", "laser_tracker_check_enabled", "_omny_interferometer_switch_open_socket", "_omny_interferometer_switch_channel", "_omny_interferometer_switch_LED_on", "_omny_interferometer_switch_alloff", "_omny_interferometer_get_signalsample", "laser_tracker_galil_enable", "laser_tracker_galil_disable", "laser_tracker_print_intensity_for_otrack_tweaking", "laser_tracker_check_on_target", "laser_tracker_wait_on_target", "get_mirror_parameters", "laser_tracker_check_and_wait_for_signalstrength", "omny_interferometer_align_tracking", "_omny_interferometer_openloop_steps", "_omny_interferometer_optimize", ] def __init__( self, *, name="RtOMNYController", socket_cls=None, socket_host=None, socket_port=None, device_manager=None, attr_name="", parent=None, labels=None, kind=None, ): super().__init__( name=name, socket_cls=socket_cls, socket_host=socket_host, socket_port=socket_port, device_manager=device_manager, attr_name=attr_name, parent=parent, labels=labels, kind=kind, ) self.tracker_info = {} self.ssi = {} self._min_scan_buffer_reached = False self.switchbox_socket = socket.socket() self.mirror_parameters = { 1: { "opt_signalchannel": "ssi_5", "opt_mirrorname": "Inc Angle", "opt_averaging_time": 0.1, "opt_steps1_pos": 1, "opt_steps1_neg": 1, "opt_steps2_pos": 10, "opt_steps2_neg": 10, "opt_signal_stop": 2400, "opt_signal_min_begin": 1000, "opt_step_divider": 1, "opt_amplitude1_pos": 2000, "opt_amplitude1_neg": 2000, "opt_amplitude2_pos": 2000, "opt_amplitude2_neg": 2000, }, 2: { "opt_signalchannel": "ssi_2", "opt_mirrorname": "Inc ST OSA Y", "opt_averaging_time": 0.1, "opt_steps1_pos": 1, "opt_steps1_neg": 1, "opt_steps2_pos": 1, "opt_steps2_neg": 1, "opt_signal_stop": 5000, "opt_signal_min_begin": 2000, "opt_step_divider": 1, "opt_amplitude1_pos": 2000, "opt_amplitude1_neg": 2000, "opt_amplitude2_pos": 2000, "opt_amplitude2_neg": 2000, }, 3: { "opt_signalchannel": "ssi_1", "opt_mirrorname": "Inc OSA FZP Y", "opt_averaging_time": 0.25, "opt_steps1_pos": 1, "opt_steps1_neg": 2, "opt_steps2_pos": 1, "opt_steps2_neg": 3, "opt_signal_stop": 7200, "opt_signal_min_begin": 3000, "opt_step_divider": 1, "opt_amplitude1_pos": 2500, "opt_amplitude1_neg": 2500, "opt_amplitude2_pos": 2500, "opt_amplitude2_neg": 2500, }, 4: { "opt_signalchannel": "ssi_2", "opt_mirrorname": "OSA Y", "opt_averaging_time": 0.1, "opt_steps1_pos": 1, "opt_steps1_neg": 1, "opt_steps2_pos": 1, "opt_steps2_neg": 1, "opt_signal_stop": 5000, "opt_signal_min_begin": 2000, "opt_step_divider": 1, "opt_amplitude1_pos": 3000, "opt_amplitude1_neg": 3000, "opt_amplitude2_pos": 3000, "opt_amplitude2_neg": 3000, }, 5: { "opt_signalchannel": "ssi_1", "opt_mirrorname": "FZP Y", "opt_averaging_time": 0.2, "opt_steps1_pos": 6, "opt_steps1_neg": 6, "opt_steps2_pos": 8, "opt_steps2_neg": 4, "opt_signal_stop": 8000, "opt_signal_min_begin": 3000, "opt_step_divider": 1, "opt_amplitude1_pos": 3000, "opt_amplitude1_neg": 3000, "opt_amplitude2_pos": 3000, "opt_amplitude2_neg": 3000, }, 6: { "opt_signalchannel": "ssi_4", "opt_mirrorname": "OSA X", "opt_averaging_time": 0.2, "opt_steps1_pos": 2, "opt_steps1_neg": 1, "opt_steps2_pos": 1, "opt_steps2_neg": 1, "opt_signal_stop": 8850, "opt_signal_min_begin": 3000, "opt_step_divider": 1, "opt_amplitude1_pos": 3000, "opt_amplitude1_neg": 3000, "opt_amplitude2_pos": 3700, "opt_amplitude2_neg": 2500, }, 7: { "opt_signalchannel": "ssi_3", "opt_mirrorname": "FZP X", "opt_averaging_time": 0.2, "opt_steps1_pos": 4, "opt_steps1_neg": 4, "opt_steps2_pos": 4, "opt_steps2_neg": 4, "opt_signal_stop": 8000, "opt_signal_min_begin": 4000, "opt_step_divider": 1, "opt_amplitude1_pos": 2500, "opt_amplitude1_neg": 2500, "opt_amplitude2_pos": 2500, "opt_amplitude2_neg": 2500, }, 8: { "opt_signalchannel": "ssi_1", "opt_mirrorname": "OSA Y USING SIGNAL OSA FZP", "opt_averaging_time": 0.2, "opt_steps1_pos": 1, "opt_steps1_neg": 1, "opt_steps2_pos": 1, "opt_steps2_neg": 1, "opt_signal_stop": 5000, "opt_signal_min_begin": 2000, "opt_step_divider": 1, "opt_amplitude1_pos": 3000, "opt_amplitude1_neg": 3000, "opt_amplitude2_pos": 3000, "opt_amplitude2_neg": 3000, }, } # def is_axis_moving(self, axis_Id) -> bool: # # this checks that axis is on target # axis_is_on_target = bool(float(self.socket_put_and_receive("o"))) # return not axis_is_on_target @threadlocked def stop_all_axes(self): self.socket_put("sc") def add_pos_to_scan(self, positions) -> None: def send_positions(parent, positions): parent._min_scan_buffer_reached = False start_time = time.time() for pos_index, pos in enumerate(positions): cmd = f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}" parent.socket_put_and_receive(cmd) if pos_index > 100: parent._min_scan_buffer_reached = True parent._min_scan_buffer_reached = True logger.info( f"Sending {len(positions)} positions took {time.time()-start_time} seconds." ) threading.Thread(target=send_positions, args=(self, positions), daemon=True).start() def get_mirror_parameters(self, channel): return self.mirror_parameters[channel] def laser_tracker_check_and_wait_for_signalstrength(self): self.device_manager.connector.send_client_info( "Checking laser tracker...", scope="", show_asap=True ) if not self.laser_tracker_check_enabled(): print( "laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled." ) return # first check on target self.laser_tracker_wait_on_target() # when on target, check interferometer signal signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1) rtx = self.device_manager.devices.rtx min_signal = rtx.user_parameter.get("min_signal") low_signal = rtx.user_parameter.get("low_signal") wait_counter = 0 while signal < min_signal and wait_counter < 10: self.device_manager.connector.send_client_info( f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True, ) wait_counter += 1 time.sleep(0.2) signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1) if signal < low_signal: self.device_manager.connector.send_client_info( f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True, ) self.omny_interferometer_align_tracking() self.device_manager.connector.send_client_info( "Checking laser tracker completed.", scope="", show_asap=True ) def omny_interferometer_align_tracking(self): mirror_channel = 6 signal = self._omny_interferometer_get_signalsample( self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"], ) if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: print( f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed." ) else: self._omny_interferometer_switch_channel(mirror_channel) time.sleep(0.1) self._omny_interferometer_optimize(mirror_channel, 3) self._omny_interferometer_optimize(mirror_channel, 4) self._omny_interferometer_optimize(mirror_channel, 3) self._omny_interferometer_optimize(mirror_channel, 4) self._omny_interferometer_switch_alloff() self.show_signal_strength_interferometer() mirror_channel = -1 def omny_interferometer_align_incoupling_angle(self): mirror_channel = 1 signal = self._omny_interferometer_get_signalsample( self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"], ) if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: print( f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed." ) else: self._omny_interferometer_switch_channel(mirror_channel) time.sleep(0.1) self._omny_interferometer_optimize(mirror_channel, 3) self._omny_interferometer_optimize(mirror_channel, 4) self._omny_interferometer_optimize(mirror_channel, 3) self._omny_interferometer_optimize(mirror_channel, 4) self._omny_interferometer_switch_alloff() self.show_signal_strength_interferometer() mirror_channel = -1 def _omny_interferometer_openloop_steps(self, channel, steps, amplitude): if channel not in range(3, 5): raise RtOMNY_Error(f"invalid channel number {channel}.") if amplitude > 4090: amplitude = 4090 elif amplitude < 10: amplitude = 10 oshield = self.device_manager.devices.oshield oshield.obj.controller.move_open_loop_steps( channel, steps, amplitude=amplitude, frequency=500 ) time.sleep(0.01) while oshield.obj.controller.is_axis_moving(channel): time.sleep(0.002) def _omny_interferometer_optimize(self, mirror_channel, channel): if mirror_channel == -1: raise RtOMNY_Error("no mirror channel selected") # mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror if channel == 3: steps_pos = self.mirror_parameters[mirror_channel]["opt_steps1_pos"] steps_neg = self.mirror_parameters[mirror_channel]["opt_steps1_neg"] opt_amplitude_pos = self.mirror_parameters[mirror_channel]["opt_amplitude1_pos"] opt_amplitude_neg = self.mirror_parameters[mirror_channel]["opt_amplitude1_neg"] elif channel == 4: steps_pos = self.mirror_parameters[mirror_channel]["opt_steps2_pos"] steps_neg = self.mirror_parameters[mirror_channel]["opt_steps2_neg"] opt_amplitude_pos = self.mirror_parameters[mirror_channel]["opt_amplitude2_pos"] opt_amplitude_neg = self.mirror_parameters[mirror_channel]["opt_amplitude2_neg"] else: raise RtOMNY_Error(f"invalid channel number {channel}.") previous_signal = self._omny_interferometer_get_signalsample( self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"], ) min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"] if previous_signal < min_begin: # raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.") print(f"\rMinimum signal for auto alignment {min_begin} not reached.") return elif previous_signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: print( f"\rInterferometer signal of axis is good" ) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.") return else: direction = 1 cycle_counter = 0 cycle_max = 20 reversal_counter = 0 reversal_max = 4 self.mirror_amplitutde_increase = 0 current_sample = self._omny_interferometer_get_signalsample( self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"], ) max = current_sample while ( current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"] and cycle_counter < cycle_max and reversal_counter < reversal_max ): # if current_sample < self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]: # raise OMNY_rt_clientError("error2") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.") if direction > 0: self._omny_interferometer_openloop_steps(channel, steps_pos, opt_amplitude_pos) verbose_str = f"channel {channel}, steps {steps_pos}" else: self._omny_interferometer_openloop_steps(channel, -steps_neg, opt_amplitude_neg) verbose_str = f"auto action {channel}, steps {-steps_pos}" # print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") current_sample = self._omny_interferometer_get_signalsample( self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"], ) opt_mirrorname = self.mirror_parameters[mirror_channel]["opt_mirrorname"] info_str = f"\rAuto aligning Channel {mirror_channel}, {opt_mirrorname}, Current signal: {current_sample:.0f}" message = info_str + " (q)uit \r" self.device_manager.connector.send_client_info( message, scope="_omny_interferometer_optimize", show_asap=True ) if previous_signal > current_sample: if direction < 0: steps_pos = int(steps_pos / 2) steps_neg = int(steps_neg / 2) if steps_pos < 1: steps_pos = 1 if steps_neg < 1: steps_neg = 1 direction = direction * (-1) reversal_counter += 1 previous_signal = current_sample cycle_counter += 1 print( f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r" ) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") def move_to_zero(self): self.socket_put("pa0,0") self.get_axis_by_name("rtx").user_setpoint.setpoint = 0 self.socket_put("pa1,0") self.get_axis_by_name("rty").user_setpoint.setpoint = 0 self.socket_put("pa2,0") self.get_axis_by_name("rtz").user_setpoint.setpoint = 0 time.sleep(0.05) def feedback_get_status_and_ssi(self): ret = self.socket_put_and_receive("J2") # remove trailing \n ret = ret.split("\n")[0] status_and_ssi_values = [float(val) for val in ret.split(",")] self.ssi = { "feedback_error": status_and_ssi_values[0], "ssi_1": status_and_ssi_values[1], "ssi_2": status_and_ssi_values[2], "ssi_3": status_and_ssi_values[3], "ssi_4": status_and_ssi_values[4], "ssi_5": status_and_ssi_values[5], } def slew_rate_limiters_on_target(self) -> bool: ret = int(float(self.socket_put_and_receive("y").strip())) if ret == 1: return True return False def feedback_is_running(self) -> bool: self.feedback_get_status_and_ssi() interferometer_feedback_not_running = int(self.ssi["feedback_error"]) if interferometer_feedback_not_running == 1: return False return True def feedback_enable_with_reset(self): self.device_manager.connector.send_client_info( f"Enabling the feedback...", scope="", show_asap=True ) self.socket_put("J0") # disable feedback time.sleep(0.01) self.move_to_zero() time.sleep(0.01) if not self.slew_rate_limiters_on_target(): print("Please wait, slew rate limiters not on target.") logger.info("Please wait, slew rate limiters not on target.") while not self.slew_rate_limiters_on_target(): time.sleep(0.05) self.clear_trajectory_generator() time.sleep(0.01) self.laser_tracker_on() time.sleep(0.01) osamroy = self.device_manager.devices.osamroy # the following read will also upate the angle in rt during readout readback = osamroy.obj.readback.get() if np.fabs(readback) > 0.1: self.device_manager.connector.send_client_info( f"Rotating to zero", scope="feedback enable", show_asap=True ) osamroy.obj.move(0, wait=True) osamx = self.device_manager.devices.osamx osamx_in = osamx.user_parameter.get("in") if not np.isclose(osamx.obj.readback.get(), osamx_in, atol=0.01): osamx.read_only = False osamx.obj.move(osamx_in, wait=True) osamx.read_only = True time.sleep(0.3) self.laser_tracker_wait_on_target() self.omny_interferometer_align_tracking() time.sleep(0.01) self.socket_put("J1") time.sleep(0.5) if not self.feedback_is_running(): print("Feedback is not running; likely an error in the interferometer.") raise RtError("Feedback is not running; likely an error in the interferometer.") time.sleep(1.5) self.set_device_read_write("osamx", False) self.set_device_read_write("osamy", False) self.set_device_read_write("ofzpx", False) self.set_device_read_write("ofzpy", False) self.set_device_read_write("oosax", False) self.set_device_read_write("oosax", False) print("Feedback is running.") @threadlocked def clear_trajectory_generator(self): self.socket_put("sc") logger.info("omny scan stopped and deleted, moving to start position") def feedback_disable(self): self.clear_trajectory_generator() self.move_to_zero() self.socket_put("J0") self.set_device_read_write("osamx", True) self.set_device_read_write("osamy", True) self.set_device_read_write("ofzpx", True) self.set_device_read_write("ofzpy", True) self.set_device_read_write("oosax", True) self.set_device_read_write("oosax", True) print("rt feedback is now disabled.") def set_rotation_angle(self, val: float) -> None: self.socket_put(f"a{val/180*np.pi}") def laser_tracker_off(self): if self.feedback_is_running(): print( "Interferometer feedback is running. Cannot disable the tracker. First disable the feedback using omny.feedback_disable()" ) else: self.socket_put("T0") logger.info("Disabled the laser tracker") print("Disabled the laser tracker") def laser_update_tracker_info(self): ret = self.socket_put_and_receive("Ts") # remove trailing \n ret = ret.split("\n")[0] tracker_values = [float(val) for val in ret.split(",")] self.tracker_info = { "beampos_y": tracker_values[5], "beampos_z": tracker_values[0], "tracker_intensity": tracker_values[2], "target_y": tracker_values[6], "threshold_intensity_y": tracker_values[8], "piezo_voltage_y": tracker_values[9], "enabled_y": tracker_values[10], "target_z": tracker_values[1], "threshold_intensity_z": tracker_values[3], "piezo_voltage_z": tracker_values[4], "enabled_z": bool(tracker_values[10]), } def laser_tracker_on(self): # update variables and enable if not yet active if not self.laser_tracker_check_enabled(): logger.info("Enabling the laser tracker. Please wait...") self.device_manager.connector.send_client_info( f"Enabling the laser tracker. Please wait...", scope="", show_asap=True ) tracker_intensity = self.tracker_info["tracker_intensity"] if ( tracker_intensity < self.tracker_info["threshold_intensity_y"] or tracker_intensity < self.tracker_info["threshold_intensity_z"] ): logger.info(self.tracker_info) print("The tracker cannot be enabled because the beam intensity it low.") raise RtError("The tracker cannot be enabled because the beam intensity it low.") self.move_to_zero() self.socket_put("T1") time.sleep(0.5) self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackyct=0") self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackzct=0") self.laser_tracker_wait_on_target() logger.info("Laser tracker running!") print("Laser tracker running!") def laser_tracker_check_enabled(self) -> bool: self.laser_update_tracker_info() if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]: return True else: return False def laser_tracker_check_on_target(self) -> bool: if not self.laser_tracker_check_enabled(): logger.info("The tracker is not even enabled") print("The tracker is not even enabled") return False if np.isclose( self.tracker_info["beampos_y"], self.tracker_info["target_y"], atol=0.03 ) and np.isclose(self.tracker_info["beampos_z"], self.tracker_info["target_z"], atol=0.03): return True return False def laser_tracker_wait_on_target(self): max_repeat = 15 count = 0 while not self.laser_tracker_check_on_target() and count < max_repeat: logger.info("Waiting for laser tracker to reach target position.") time.sleep(0.5) count += 1 if count > max_repeat: print("Failed to reach laser target position.") raise RtError("Failed to reach laser target position.") def laser_tracker_print_intensity_for_otrack_tweaking(self): # self.laser_update_tracker_info() # _laser_tracker_intensity = self.tracker_info["tracker_intensity"] # print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r") self.laser_tracker_show_all(extra_endline="\r") def laser_tracker_show_all(self, extra_endline=""): self.laser_update_tracker_info() enabled_y = self.tracker_info["enabled_y"] print(extra_endline + f"Tracker enabled: {bool(enabled_y)}" + extra_endline) if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]: print(self.red + " LOW INTENSITY" + self.white + extra_endline) _laser_tracker_intensity = self.tracker_info["tracker_intensity"] print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}" + extra_endline) _laser_tracker_y_beampos = self.tracker_info["beampos_y"] print(f" Y beam position: {_laser_tracker_y_beampos:.2f}" + extra_endline) _laser_tracker_y_target = self.tracker_info["target_y"] print(f" target position: {_laser_tracker_y_target:.2f}" + extra_endline) _laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"] print( f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}" + extra_endline ) _laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"] print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}" + extra_endline) _laser_tracker_z_beampos = self.tracker_info["beampos_z"] print(f" Z beam position: {_laser_tracker_z_beampos:.2f}" + extra_endline) _laser_tracker_z_target = self.tracker_info["target_z"] print(f" target position: {_laser_tracker_z_target:.2f}" + extra_endline) _laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"] print( f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}" + extra_endline ) _laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"] print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}" + extra_endline) print(" Reminder - there is also an upper threshold active in rt\n" + extra_endline) if extra_endline == "": self.laser_tracker_galil_status() def laser_tracker_galil_enable(self): otracky_con = self.device_manager.devices.otracky.obj.controller otracky_con.socket_put_confirmed("tracken=1") otracky_con.socket_put_confirmed("trackyct=0") otracky_con.socket_put_confirmed("trackzct=0") def laser_tracker_galil_disable(self): otracky_con = self.device_manager.devices.otracky.obj.controller otracky_con.socket_put_confirmed("tracken=0") def laser_tracker_galil_status(self): otracky_con = self.device_manager.devices.otracky.obj.controller if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0: print(self.red + "Tracking in the Galil Controller is disabled." + self.white) print("Use dev.rtx.controller.laser_tracker_galil_enable to enable.\n") return 0 else: print("Tracking in the Galil Controller is enabled.") trackyct = int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip())) trackzct = int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip())) print(f"Galil Trackcounters y={trackyct}, z={trackzct}") def show_signal_strength_interferometer(self): channelnames = {1: "OSA FZP Y", 2: "ST OSA Y", 3: "OSA FZP X", 4: "ST OSA X", 5: "Angle"} self.feedback_get_status_and_ssi() t = PrettyTable() t.title = f"Interferometer signal strength" t.field_names = ["Channel", "Name", "Value"] for i in range(1, 6): ssi = self.ssi[f"ssi_{i}"] t.add_row([i, channelnames[i], ssi]) print(t) def _omny_interferometer_switch_open_socket(self): # open a socket connection to the laser interferometer server host = "mpc3217.psi.ch" # as both code is running on same pc port = 3335 # socket server port number self.switchbox_socket.connect((host, port)) # connect to the server self._omny_interferometer_switch_put_and_receive("?000\r") time.sleep(1) def _omny_interferometer_switch_channel(self, channel): self._omny_interferometer_switch_alloff() time.sleep(0.1) if channel == 1: # Relais 1 and 2 rback = self._omny_interferometer_switch_put_and_receive("!0020003\r") # if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 2: # Relais 3 and 4 rback = self._omny_interferometer_switch_put_and_receive("!002000C\r") # if "|000C\r" != self._omny_interferometer_switch_put_and_receive("!002000C\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 3: # Relais 5 and 6 rback = self._omny_interferometer_switch_put_and_receive("!0020030\r") # if "|0030\r" != self._omny_interferometer_switch_put_and_receive("!0020030\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 4: # Relais 7 and 8 rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r") # if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 5: # Relais 9 and 10 rback = self._omny_interferometer_switch_put_and_receive("!0020300\r") # if "|0300\r" != self._omny_interferometer_switch_put_and_receive("!0020300\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 6: # Relais 11 and 12 rback = self._omny_interferometer_switch_put_and_receive("!0020C00\r") # if "|0C00\r" != self._omny_interferometer_switch_put_and_receive("!0020C00\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 7: # Relais 13 and 14 rback = self._omny_interferometer_switch_put_and_receive("!0023000\r") # if "|3000\r" != self._omny_interferometer_switch_put_and_receive("!0023000\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 8: # Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r") # if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") elif channel == 9: # Relais 15 and 16 rback = self._omny_interferometer_switch_put_and_receive("!002C000\r") # if "|C000\r" != self._omny_interferometer_switch_put_and_receive("!002C000\r"): # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") if "0000" not in rback: raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") def _omny_interferometer_switch_put_and_receive(self, command_str: str) -> str: try: self.switchbox_socket.send(command_str.encode()) except: self._omny_interferometer_switch_open_socket() time.sleep(0.1) self.switchbox_socket.send(command_str.encode()) recv = self.switchbox_socket.recv(1024).decode() # print("sent") # print(command_str) # print("\nreceived") # print(recv) # print("") return recv def _omny_interferometer_switch_LED_on(self): self._omny_interferometer_switch_put_and_receive("!00S01\r") def _omny_interferometer_switch_alloff(self): # switch all off self._omny_interferometer_switch_put_and_receive("!0020000\r") # LED OFF time.sleep(0.1) self._omny_interferometer_switch_put_and_receive("!00S00\r") time.sleep(0.1) alloff = self._omny_interferometer_switch_put_and_receive("?002\r") # check all off if "00" not in alloff: raise RtOMNY_mirror_switchbox_Error("Not all channels switched off.") def _rt_start_averaging_SSI(self): self.socket_put("J3") def _omny_interferometer_get_signalsample(self, channel, averaging_duration): # channel is string, eg ssi_1 # ensure no averaging running currently self.feedback_is_running() # measure first sample self._rt_start_averaging_SSI() time.sleep(averaging_duration) self.feedback_is_running() return self.ssi[channel] @threadlocked def start_scan(self): if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an" " interferometer error." ) raise RtError( "Cannot start scan because feedback loop is not running or there is an" " interferometer error." ) # here exception (mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status() if number_of_positions_planned == 0: logger.error("Cannot start scan because no target positions are planned.") raise RtError("Cannot start scan because no target positions are planned.") # hier exception # start a point-by-point scan (for cont scan in flomni it would be "sa") self.socket_put_and_receive("sd") @retry_once @threadlocked def get_scan_status(self): return_table = (self.socket_put_and_receive("sr")).split(",") if len(return_table) != 3: raise RtCommunicationError( f"Expected to receive 3 return values. Instead received {return_table}" ) mode = int(float(return_table[0])) # mode 0: direct positioning # mode 1: running internal timer (not tested/used anymore) # mode 2: rt point scan running # mode 3: rt point scan starting # mode 5/6: rt continuous scanning (not available in LamNI) number_of_positions_planned = int(float(return_table[1])) current_position_in_scan = int(float(return_table[2])) return (mode, number_of_positions_planned, current_position_in_scan)
[docs] class RtOMNYReadbackSignal(RtReadbackSignal): @retry_once @threadlocked def _socket_get(self) -> float: """Get command for the readback signal Returns: float: Readback value after adjusting for sign and motor resolution. """ time.sleep(0.1) return_table = (self.controller.socket_put_and_receive(f"pr")).split(",") current_pos = float(return_table[self.parent.axis_Id_numeric]) current_pos *= self.parent.sign self.parent.user_setpoint.setpoint = current_pos return current_pos
[docs] class RtOMNYSetpointSignal(RtSetpointSignal): @retry_once @threadlocked def _socket_set(self, val: float) -> None: """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. Args: val (float): Target value / setpoint value Raises: RtError: Raised if interferometer feedback is disabled. """ if not self.parent.controller.feedback_is_running(): print( "The interferometer feedback is not running. Either it is turned off or and" " interferometer error occured." ) raise RtError( "The interferometer feedback is not running. Either it is turned off or and" " interferometer error occured." ) self.set_with_feedback_disabled(val) def set_with_feedback_disabled(self, val): target_val = val * self.parent.sign self.setpoint = target_val self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
[docs] class RtOMNYFeedbackRunning(RtSignalRO): @threadlocked def _socket_get(self): return int(self.parent.controller.feedback_is_running())
[docs] class RtOMNYMotor(Device, PositionerBase): USER_ACCESS = ["controller"] readback = Cpt(RtOMNYReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(RtOMNYSetpointSignal, signal_name="setpoint") high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") SUB_READBACK = "readback" SUB_CONNECTION_CHANGE = "connection_change" _default_sub = SUB_READBACK def __init__( self, axis_Id, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc3217.psi.ch", port=2222, sign=1, socket_cls=SocketIO, device_manager=None, limits=None, **kwargs, ): self.axis_Id = axis_Id self.sign = sign self.controller = RtOMNYController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.device_manager = device_manager self.tolerance = kwargs.pop("tolerance", 0.5) super().__init__( prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs, ) self.readback.name = self.name self.controller.subscribe( self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE ) self._update_connection_state() self._stopped = False # self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE) if limits is not None: assert len(limits) == 2 self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1])
[docs] def wait_for_connection(self, timeout: float = 30.0) -> bool: self.controller.on(timeout=timeout) self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self): """ The setpoint is only stored locally. After a restart, we need to update it to match the current readback value. """ self.user_setpoint.setpoint = self.readback.get()
[docs] def destroy(self): """Make sure to turn off the controller socket on destroy.""" self.controller.off(update_config=False) return super().destroy()
@property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @property def low_limit(self): return self.limits[0] @property def high_limit(self): return self.limits[1]
[docs] def check_value(self, value, **kwargs): """Check that the position is within the soft limits""" low_limit, high_limit = self.limits if low_limit < high_limit and not (low_limit <= value <= high_limit): raise LimitError(f"position={value} not within limits {self.limits}")
def _update_connection_state(self, **kwargs): for walk in self.walk_signals(): # pylint: disable=protected-access walk.item._metadata["connected"] = self.controller.connected def _forward_readback(self, **kwargs): kwargs.pop("sub_type") self._run_subs(sub_type="readback", **kwargs)
[docs] @raise_if_disconnected def move(self, position, wait=True, **kwargs): """Move to a specified position, optionally waiting for motion to complete. Parameters ---------- position Position to move to moved_cb : callable Call this callback when movement has finished. This callback must accept one keyword argument: 'obj' which will be set to this positioner instance. timeout : float, optional Maximum time to wait for the motion. If None, the default timeout for this positioner is used. Returns ------- status : MoveStatus Raises ------ TimeoutError When motion takes longer than `timeout` ValueError On invalid positions RuntimeError If motion fails other than timing out """ self._started_moving = False timeout = kwargs.pop("timeout", 100) status = super().move(position, timeout=timeout, **kwargs) self.user_setpoint.put(position, wait=False) def move_and_finish(): self._stopped = False try: while not self.controller.slew_rate_limiters_on_target() and not self._stopped: print("motor is moving") val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.01) print("Move finished") self._done_moving(success=(not self._stopped)) finally: self._stopped = False threading.Thread(target=move_and_finish, daemon=True).start() try: if wait: status_wait(status) except KeyboardInterrupt: self.stop() raise return status
@property def axis_Id(self): return self._axis_Id_alpha @axis_Id.setter def axis_Id(self, val): if isinstance(val, str): if len(val) != 1: raise ValueError("Only single-character axis_Ids are supported.") self._axis_Id_alpha = val self._axis_Id_numeric = ord(val.lower()) - 97 else: raise TypeError(f"Expected value of type str but received {type(val)}") @property def axis_Id_numeric(self): return self._axis_Id_numeric @axis_Id_numeric.setter def axis_Id_numeric(self, val): if isinstance(val, int): if val > 26: raise ValueError("Numeric value exceeds supported range.") self._axis_Id_alpha = val self._axis_Id_numeric = (chr(val + 97)).capitalize() else: raise TypeError(f"Expected value of type int but received {type(val)}") @property def egu(self): """The engineering units (EGU) for positions""" return "um"
[docs] def stop(self, *, success=False): self.controller.stop_all_axes() self._stopped = True return super().stop(success=success)
[docs] class RtOMNYFlyer(Device): USER_ACCESS = ["controller"] data = Cpt( AsyncMultiSignal, name="data", signals=[ "target_x", "average_x_st_osa", "stdev_x_st_osa", "target_y", "average_y_st_osa", "stdev_y_st_osa", "average_x_osa_fzp", "stdev_x_osa_fzp", "average_y_osa_fzp", "stdev_y_osa_fzp", "average_rotz_st", "stdev_rotz_st", "average_x_st_fzp", "stdev_x_st_fzp", "average_y_st_fzp", "stdev_y_st_fzp", "average_stdeviations_x_st_fzp", "average_stdeviations_y_st_fzp", ], ndim=1, async_update={"type": "add", "max_shape": [None]}, max_size=1000, ) progress = Cpt(ProgressSignal, doc="Signal to track the progress of the device during a scan") def __init__( self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc3217.psi.ch", port=2222, socket_cls=SocketIO, device_manager=None, **kwargs, ): super().__init__(prefix=prefix, name=name, parent=parent, **kwargs) self.shutdown_event = threading.Event() self.controller = RtOMNYController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 self.readout_thread = None self.scan_done_event = threading.Event() self.scan_done_event.set()
[docs] def stage(self): self.shutdown_event.clear() self.scan_done_event.set() return super().stage()
def start_readout(self, status: DeviceStatus): self.readout_thread = threading.Thread( target=self.read_positions_from_sampler, args=(status,) ) self.readout_thread.start() def kickoff(self) -> DeviceStatus: self.shutdown_event.clear() self.scan_done_event.clear() while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): ... self.controller.start_scan() self.shutdown_event.wait(0.1) status = DeviceStatus(self) status.set_finished() return status
[docs] def complete(self) -> DeviceStatus: """Wait until the flyer is done.""" if self.scan_done_event.is_set(): # if the scan_done_event is already set, we can return a finished status immediately status = DeviceStatus(self) status.set_finished() return status status = DeviceStatus(self) self.start_readout(status) status.add_callback(lambda *args, **kwargs: self.scan_done_event.set()) return status
[docs] def stop(self, *, success=False): self.shutdown_event.set() self.scan_done_event.set() if self.readout_thread is not None: self.readout_thread.join() return super().stop(success=success)
[docs] def read_positions_from_sampler(self, status: DeviceStatus): """ Read the positions from the sampler and update the data signal. This function runs in a separate thread and continuously checks the scan status. Args: status (DeviceStatus): The status object to update when the readout is complete. """ read_counter = 0 self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() while mode > 0 and not self.shutdown_event.wait(0.01): # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") mode, number_of_positions_planned, current_position_in_scan = ( self.controller.get_scan_status() ) if current_position_in_scan > 5: while current_position_in_scan > read_counter + 1: return_table = ( self.controller.socket_put_and_receive(f"r{read_counter}") ).split(",") logger.info(f"Read {read_counter} out of {number_of_positions_planned}") self.progress.put( value=read_counter, max_value=number_of_positions_planned, done=False ) read_counter = read_counter + 1 signals = self._get_signals_from_table(return_table) self.data.set(signals) if self.shutdown_event.wait(0.05): logger.info("Shutdown event set, stopping readout.") # if we are here, the shutdown_event is set. We can exit the readout loop. status.set_finished() return # read the last samples even though scan is finished already while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",") logger.info(f"Read {read_counter} out of {number_of_positions_planned}") self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False) read_counter = read_counter + 1 signals = self._get_signals_from_table(return_table) self.data.set(signals) # NOTE: No need to set the status to failed if the shutdown_event is set. # The stop() method will take care of that. status.set_finished() self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True) logger.info( "OMNY statistics: Average of all standard deviations: x" f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}" )
def _get_signals_from_table(self, return_table) -> dict: self.average_stdeviations_x_st_fzp += float(return_table[16]) self.average_stdeviations_y_st_fzp += float(return_table[18]) signals = { "target_x": {"value": float(return_table[3])}, "average_x_st_osa": {"value": float(return_table[4])}, "stdev_x_st_osa": {"value": float(return_table[5])}, "target_y": {"value": float(return_table[6])}, "average_y_st_osa": {"value": float(return_table[7])}, "stdev_y_st_osa": {"value": float(return_table[8])}, "average_x_osa_fzp": {"value": float(return_table[9])}, "stdev_x_osa_fzp": {"value": float(return_table[10])}, "average_y_osa_fzp": {"value": float(return_table[11])}, "stdev_y_osa_fzp": {"value": float(return_table[12])}, "average_rotz_st": {"value": float(return_table[13])}, "stdev_rotz_st": {"value": float(return_table[14])}, "average_x_st_fzp": {"value": float(return_table[15])}, "stdev_x_st_fzp": {"value": float(return_table[16])}, "average_y_st_fzp": {"value": float(return_table[17])}, "stdev_y_st_fzp": {"value": float(return_table[18])}, "average_stdeviations_x_st_fzp": { "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) }, "average_stdeviations_y_st_fzp": { "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) }, } return signals
if __name__ == "__main__": rtcontroller = RtOMNYController( socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None ) rtcontroller.on() rtcontroller.laser_tracker_on()