import builtins
import socket
import threading
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
from csaxs_bec.devices.omny.rt.rt_ophyd import (
RtCommunicationError,
RtError,
RtReadbackSignal,
RtSetpointSignal,
RtSignalRO,
retry_once,
)
logger = bec_logger.logger
[docs]
class RtOMNY_mirror_switchbox_Error(Exception):
pass
[docs]
class RtOMNY_Error(Exception):
pass
[docs]
class RtOMNYController(Controller):
_axes_per_controller = 3
red = "\x1b[91m"
white = "\x1b[0m"
USER_ACCESS = [
"socket_put",
"socket_put_and_receive",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_with_reset",
"feedback_is_running",
"add_pos_to_scan",
"get_pid_x",
"move_samx_to_scan_region",
"clear_trajectory_generator",
"show_cyclic_error_compensation",
"laser_tracker_on",
"laser_tracker_off",
"laser_tracker_show_all",
"show_signal_strength_interferometer",
"read_ssi_interferometer",
"laser_tracker_check_signalstrength",
"laser_tracker_check_enabled",
"_omny_interferometer_switch_open_socket",
"_omny_interferometer_switch_channel",
"_omny_interferometer_switch_LED_on",
"_omny_interferometer_switch_alloff",
"_omny_interferometer_get_signalsample",
"laser_tracker_galil_enable",
"laser_tracker_galil_disable",
"laser_tracker_print_intensity_for_otrack_tweaking",
"laser_tracker_check_on_target",
"laser_tracker_wait_on_target",
"get_mirror_parameters",
"laser_tracker_check_and_wait_for_signalstrength",
"omny_interferometer_align_tracking",
"_omny_interferometer_openloop_steps",
"_omny_interferometer_optimize",
]
def __init__(
self,
*,
name="RtOMNYController",
socket_cls=None,
socket_host=None,
socket_port=None,
device_manager=None,
attr_name="",
parent=None,
labels=None,
kind=None,
):
super().__init__(
name=name,
socket_cls=socket_cls,
socket_host=socket_host,
socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name,
parent=parent,
labels=labels,
kind=kind,
)
self.tracker_info = {}
self.ssi = {}
self._min_scan_buffer_reached = False
self.switchbox_socket = socket.socket()
self.mirror_parameters = {
1: {
"opt_signalchannel": "ssi_5",
"opt_mirrorname": "Inc Angle",
"opt_averaging_time": 0.1,
"opt_steps1_pos": 1,
"opt_steps1_neg": 1,
"opt_steps2_pos": 10,
"opt_steps2_neg": 10,
"opt_signal_stop": 2400,
"opt_signal_min_begin": 1000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 2000,
"opt_amplitude1_neg": 2000,
"opt_amplitude2_pos": 2000,
"opt_amplitude2_neg": 2000,
},
2: {
"opt_signalchannel": "ssi_2",
"opt_mirrorname": "Inc ST OSA Y",
"opt_averaging_time": 0.1,
"opt_steps1_pos": 1,
"opt_steps1_neg": 1,
"opt_steps2_pos": 1,
"opt_steps2_neg": 1,
"opt_signal_stop": 5000,
"opt_signal_min_begin": 2000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 2000,
"opt_amplitude1_neg": 2000,
"opt_amplitude2_pos": 2000,
"opt_amplitude2_neg": 2000,
},
3: {
"opt_signalchannel": "ssi_1",
"opt_mirrorname": "Inc OSA FZP Y",
"opt_averaging_time": 0.25,
"opt_steps1_pos": 1,
"opt_steps1_neg": 2,
"opt_steps2_pos": 1,
"opt_steps2_neg": 3,
"opt_signal_stop": 7200,
"opt_signal_min_begin": 3000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 2500,
"opt_amplitude1_neg": 2500,
"opt_amplitude2_pos": 2500,
"opt_amplitude2_neg": 2500,
},
4: {
"opt_signalchannel": "ssi_2",
"opt_mirrorname": "OSA Y",
"opt_averaging_time": 0.1,
"opt_steps1_pos": 1,
"opt_steps1_neg": 1,
"opt_steps2_pos": 1,
"opt_steps2_neg": 1,
"opt_signal_stop": 5000,
"opt_signal_min_begin": 2000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 3000,
"opt_amplitude1_neg": 3000,
"opt_amplitude2_pos": 3000,
"opt_amplitude2_neg": 3000,
},
5: {
"opt_signalchannel": "ssi_1",
"opt_mirrorname": "FZP Y",
"opt_averaging_time": 0.2,
"opt_steps1_pos": 6,
"opt_steps1_neg": 6,
"opt_steps2_pos": 8,
"opt_steps2_neg": 4,
"opt_signal_stop": 8000,
"opt_signal_min_begin": 3000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 3000,
"opt_amplitude1_neg": 3000,
"opt_amplitude2_pos": 3000,
"opt_amplitude2_neg": 3000,
},
6: {
"opt_signalchannel": "ssi_4",
"opt_mirrorname": "OSA X",
"opt_averaging_time": 0.2,
"opt_steps1_pos": 2,
"opt_steps1_neg": 1,
"opt_steps2_pos": 1,
"opt_steps2_neg": 1,
"opt_signal_stop": 8850,
"opt_signal_min_begin": 3000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 3000,
"opt_amplitude1_neg": 3000,
"opt_amplitude2_pos": 3700,
"opt_amplitude2_neg": 2500,
},
7: {
"opt_signalchannel": "ssi_3",
"opt_mirrorname": "FZP X",
"opt_averaging_time": 0.2,
"opt_steps1_pos": 4,
"opt_steps1_neg": 4,
"opt_steps2_pos": 4,
"opt_steps2_neg": 4,
"opt_signal_stop": 8000,
"opt_signal_min_begin": 4000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 2500,
"opt_amplitude1_neg": 2500,
"opt_amplitude2_pos": 2500,
"opt_amplitude2_neg": 2500,
},
8: {
"opt_signalchannel": "ssi_1",
"opt_mirrorname": "OSA Y USING SIGNAL OSA FZP",
"opt_averaging_time": 0.2,
"opt_steps1_pos": 1,
"opt_steps1_neg": 1,
"opt_steps2_pos": 1,
"opt_steps2_neg": 1,
"opt_signal_stop": 5000,
"opt_signal_min_begin": 2000,
"opt_step_divider": 1,
"opt_amplitude1_pos": 3000,
"opt_amplitude1_neg": 3000,
"opt_amplitude2_pos": 3000,
"opt_amplitude2_neg": 3000,
},
}
# def is_axis_moving(self, axis_Id) -> bool:
# # this checks that axis is on target
# axis_is_on_target = bool(float(self.socket_put_and_receive("o")))
# return not axis_is_on_target
@threadlocked
def stop_all_axes(self):
self.socket_put("sc")
def add_pos_to_scan(self, positions) -> None:
def send_positions(parent, positions):
parent._min_scan_buffer_reached = False
start_time = time.time()
for pos_index, pos in enumerate(positions):
cmd = f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}"
parent.socket_put_and_receive(cmd)
if pos_index > 100:
parent._min_scan_buffer_reached = True
parent._min_scan_buffer_reached = True
logger.info(
f"Sending {len(positions)} positions took {time.time()-start_time} seconds."
)
threading.Thread(target=send_positions, args=(self, positions), daemon=True).start()
def get_mirror_parameters(self, channel):
return self.mirror_parameters[channel]
def laser_tracker_check_and_wait_for_signalstrength(self):
self.device_manager.connector.send_client_info(
"Checking laser tracker...", scope="", show_asap=True
)
if not self.laser_tracker_check_enabled():
print(
"laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled."
)
return
# first check on target
self.laser_tracker_wait_on_target()
# when on target, check interferometer signal
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1)
rtx = self.device_manager.devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
wait_counter = 0
while signal < min_signal and wait_counter < 10:
self.device_manager.connector.send_client_info(
f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...",
scope="laser_tracker_check_and_wait_for_signalstrength",
show_asap=True,
)
wait_counter += 1
time.sleep(0.2)
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1)
if signal < low_signal:
self.device_manager.connector.send_client_info(
f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m",
scope="laser_tracker_check_and_wait_for_signalstrength",
show_asap=True,
)
self.omny_interferometer_align_tracking()
self.device_manager.connector.send_client_info(
"Checking laser tracker completed.", scope="", show_asap=True
)
def omny_interferometer_align_tracking(self):
mirror_channel = 6
signal = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print(
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
)
else:
self._omny_interferometer_switch_channel(mirror_channel)
time.sleep(0.1)
self._omny_interferometer_optimize(mirror_channel, 3)
self._omny_interferometer_optimize(mirror_channel, 4)
self._omny_interferometer_optimize(mirror_channel, 3)
self._omny_interferometer_optimize(mirror_channel, 4)
self._omny_interferometer_switch_alloff()
self.show_signal_strength_interferometer()
mirror_channel = -1
def omny_interferometer_align_incoupling_angle(self):
mirror_channel = 1
signal = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print(
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
)
else:
self._omny_interferometer_switch_channel(mirror_channel)
time.sleep(0.1)
self._omny_interferometer_optimize(mirror_channel, 3)
self._omny_interferometer_optimize(mirror_channel, 4)
self._omny_interferometer_optimize(mirror_channel, 3)
self._omny_interferometer_optimize(mirror_channel, 4)
self._omny_interferometer_switch_alloff()
self.show_signal_strength_interferometer()
mirror_channel = -1
def _omny_interferometer_openloop_steps(self, channel, steps, amplitude):
if channel not in range(3, 5):
raise RtOMNY_Error(f"invalid channel number {channel}.")
if amplitude > 4090:
amplitude = 4090
elif amplitude < 10:
amplitude = 10
oshield = self.device_manager.devices.oshield
oshield.obj.controller.move_open_loop_steps(
channel, steps, amplitude=amplitude, frequency=500
)
time.sleep(0.01)
while oshield.obj.controller.is_axis_moving(channel):
time.sleep(0.002)
def _omny_interferometer_optimize(self, mirror_channel, channel):
if mirror_channel == -1:
raise RtOMNY_Error("no mirror channel selected")
# mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror
if channel == 3:
steps_pos = self.mirror_parameters[mirror_channel]["opt_steps1_pos"]
steps_neg = self.mirror_parameters[mirror_channel]["opt_steps1_neg"]
opt_amplitude_pos = self.mirror_parameters[mirror_channel]["opt_amplitude1_pos"]
opt_amplitude_neg = self.mirror_parameters[mirror_channel]["opt_amplitude1_neg"]
elif channel == 4:
steps_pos = self.mirror_parameters[mirror_channel]["opt_steps2_pos"]
steps_neg = self.mirror_parameters[mirror_channel]["opt_steps2_neg"]
opt_amplitude_pos = self.mirror_parameters[mirror_channel]["opt_amplitude2_pos"]
opt_amplitude_neg = self.mirror_parameters[mirror_channel]["opt_amplitude2_neg"]
else:
raise RtOMNY_Error(f"invalid channel number {channel}.")
previous_signal = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]
if previous_signal < min_begin:
# raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
print(f"\rMinimum signal for auto alignment {min_begin} not reached.")
return
elif previous_signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print(
f"\rInterferometer signal of axis is good"
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.")
return
else:
direction = 1
cycle_counter = 0
cycle_max = 20
reversal_counter = 0
reversal_max = 4
self.mirror_amplitutde_increase = 0
current_sample = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
max = current_sample
while (
current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"]
and cycle_counter < cycle_max
and reversal_counter < reversal_max
):
# if current_sample < self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]:
# raise OMNY_rt_clientError("error2") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
if direction > 0:
self._omny_interferometer_openloop_steps(channel, steps_pos, opt_amplitude_pos)
verbose_str = f"channel {channel}, steps {steps_pos}"
else:
self._omny_interferometer_openloop_steps(channel, -steps_neg, opt_amplitude_neg)
verbose_str = f"auto action {channel}, steps {-steps_pos}"
# print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
current_sample = self._omny_interferometer_get_signalsample(
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
opt_mirrorname = self.mirror_parameters[mirror_channel]["opt_mirrorname"]
info_str = f"\rAuto aligning Channel {mirror_channel}, {opt_mirrorname}, Current signal: {current_sample:.0f}"
message = info_str + " (q)uit \r"
self.device_manager.connector.send_client_info(
message, scope="_omny_interferometer_optimize", show_asap=True
)
if previous_signal > current_sample:
if direction < 0:
steps_pos = int(steps_pos / 2)
steps_neg = int(steps_neg / 2)
if steps_pos < 1:
steps_pos = 1
if steps_neg < 1:
steps_neg = 1
direction = direction * (-1)
reversal_counter += 1
previous_signal = current_sample
cycle_counter += 1
print(
f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r"
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
def move_to_zero(self):
self.socket_put("pa0,0")
self.get_axis_by_name("rtx").user_setpoint.setpoint = 0
self.socket_put("pa1,0")
self.get_axis_by_name("rty").user_setpoint.setpoint = 0
self.socket_put("pa2,0")
self.get_axis_by_name("rtz").user_setpoint.setpoint = 0
time.sleep(0.05)
def feedback_get_status_and_ssi(self):
ret = self.socket_put_and_receive("J2")
# remove trailing \n
ret = ret.split("\n")[0]
status_and_ssi_values = [float(val) for val in ret.split(",")]
self.ssi = {
"feedback_error": status_and_ssi_values[0],
"ssi_1": status_and_ssi_values[1],
"ssi_2": status_and_ssi_values[2],
"ssi_3": status_and_ssi_values[3],
"ssi_4": status_and_ssi_values[4],
"ssi_5": status_and_ssi_values[5],
}
def slew_rate_limiters_on_target(self) -> bool:
ret = int(float(self.socket_put_and_receive("y").strip()))
if ret == 1:
return True
return False
def feedback_is_running(self) -> bool:
self.feedback_get_status_and_ssi()
interferometer_feedback_not_running = int(self.ssi["feedback_error"])
if interferometer_feedback_not_running == 1:
return False
return True
def feedback_enable_with_reset(self):
self.device_manager.connector.send_client_info(
f"Enabling the feedback...", scope="", show_asap=True
)
self.socket_put("J0") # disable feedback
time.sleep(0.01)
self.move_to_zero()
time.sleep(0.01)
if not self.slew_rate_limiters_on_target():
print("Please wait, slew rate limiters not on target.")
logger.info("Please wait, slew rate limiters not on target.")
while not self.slew_rate_limiters_on_target():
time.sleep(0.05)
self.clear_trajectory_generator()
time.sleep(0.01)
self.laser_tracker_on()
time.sleep(0.01)
osamroy = self.device_manager.devices.osamroy
# the following read will also upate the angle in rt during readout
readback = osamroy.obj.readback.get()
if np.fabs(readback) > 0.1:
self.device_manager.connector.send_client_info(
f"Rotating to zero", scope="feedback enable", show_asap=True
)
osamroy.obj.move(0, wait=True)
osamx = self.device_manager.devices.osamx
osamx_in = osamx.user_parameter.get("in")
if not np.isclose(osamx.obj.readback.get(), osamx_in, atol=0.01):
osamx.read_only = False
osamx.obj.move(osamx_in, wait=True)
osamx.read_only = True
time.sleep(0.3)
self.laser_tracker_wait_on_target()
self.omny_interferometer_align_tracking()
time.sleep(0.01)
self.socket_put("J1")
time.sleep(0.5)
if not self.feedback_is_running():
print("Feedback is not running; likely an error in the interferometer.")
raise RtError("Feedback is not running; likely an error in the interferometer.")
time.sleep(1.5)
self.set_device_read_write("osamx", False)
self.set_device_read_write("osamy", False)
self.set_device_read_write("ofzpx", False)
self.set_device_read_write("ofzpy", False)
self.set_device_read_write("oosax", False)
self.set_device_read_write("oosax", False)
print("Feedback is running.")
@threadlocked
def clear_trajectory_generator(self):
self.socket_put("sc")
logger.info("omny scan stopped and deleted, moving to start position")
def feedback_disable(self):
self.clear_trajectory_generator()
self.move_to_zero()
self.socket_put("J0")
self.set_device_read_write("osamx", True)
self.set_device_read_write("osamy", True)
self.set_device_read_write("ofzpx", True)
self.set_device_read_write("ofzpy", True)
self.set_device_read_write("oosax", True)
self.set_device_read_write("oosax", True)
print("rt feedback is now disabled.")
def set_rotation_angle(self, val: float) -> None:
self.socket_put(f"a{val/180*np.pi}")
def laser_tracker_off(self):
if self.feedback_is_running():
print(
"Interferometer feedback is running. Cannot disable the tracker. First disable the feedback using omny.feedback_disable()"
)
else:
self.socket_put("T0")
logger.info("Disabled the laser tracker")
print("Disabled the laser tracker")
def laser_update_tracker_info(self):
ret = self.socket_put_and_receive("Ts")
# remove trailing \n
ret = ret.split("\n")[0]
tracker_values = [float(val) for val in ret.split(",")]
self.tracker_info = {
"beampos_y": tracker_values[5],
"beampos_z": tracker_values[0],
"tracker_intensity": tracker_values[2],
"target_y": tracker_values[6],
"threshold_intensity_y": tracker_values[8],
"piezo_voltage_y": tracker_values[9],
"enabled_y": tracker_values[10],
"target_z": tracker_values[1],
"threshold_intensity_z": tracker_values[3],
"piezo_voltage_z": tracker_values[4],
"enabled_z": bool(tracker_values[10]),
}
def laser_tracker_on(self):
# update variables and enable if not yet active
if not self.laser_tracker_check_enabled():
logger.info("Enabling the laser tracker. Please wait...")
self.device_manager.connector.send_client_info(
f"Enabling the laser tracker. Please wait...", scope="", show_asap=True
)
tracker_intensity = self.tracker_info["tracker_intensity"]
if (
tracker_intensity < self.tracker_info["threshold_intensity_y"]
or tracker_intensity < self.tracker_info["threshold_intensity_z"]
):
logger.info(self.tracker_info)
print("The tracker cannot be enabled because the beam intensity it low.")
raise RtError("The tracker cannot be enabled because the beam intensity it low.")
self.move_to_zero()
self.socket_put("T1")
time.sleep(0.5)
self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackyct=0")
self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackzct=0")
self.laser_tracker_wait_on_target()
logger.info("Laser tracker running!")
print("Laser tracker running!")
def laser_tracker_check_enabled(self) -> bool:
self.laser_update_tracker_info()
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
return True
else:
return False
def laser_tracker_check_on_target(self) -> bool:
if not self.laser_tracker_check_enabled():
logger.info("The tracker is not even enabled")
print("The tracker is not even enabled")
return False
if np.isclose(
self.tracker_info["beampos_y"], self.tracker_info["target_y"], atol=0.03
) and np.isclose(self.tracker_info["beampos_z"], self.tracker_info["target_z"], atol=0.03):
return True
return False
def laser_tracker_wait_on_target(self):
max_repeat = 15
count = 0
while not self.laser_tracker_check_on_target() and count < max_repeat:
logger.info("Waiting for laser tracker to reach target position.")
time.sleep(0.5)
count += 1
if count > max_repeat:
print("Failed to reach laser target position.")
raise RtError("Failed to reach laser target position.")
def laser_tracker_print_intensity_for_otrack_tweaking(self):
# self.laser_update_tracker_info()
# _laser_tracker_intensity = self.tracker_info["tracker_intensity"]
# print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r")
self.laser_tracker_show_all(extra_endline="\r")
def laser_tracker_show_all(self, extra_endline=""):
self.laser_update_tracker_info()
enabled_y = self.tracker_info["enabled_y"]
print(extra_endline + f"Tracker enabled: {bool(enabled_y)}" + extra_endline)
if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]:
print(self.red + " LOW INTENSITY" + self.white + extra_endline)
_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}" + extra_endline)
_laser_tracker_y_beampos = self.tracker_info["beampos_y"]
print(f" Y beam position: {_laser_tracker_y_beampos:.2f}" + extra_endline)
_laser_tracker_y_target = self.tracker_info["target_y"]
print(f" target position: {_laser_tracker_y_target:.2f}" + extra_endline)
_laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"]
print(
f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}" + extra_endline
)
_laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"]
print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}" + extra_endline)
_laser_tracker_z_beampos = self.tracker_info["beampos_z"]
print(f" Z beam position: {_laser_tracker_z_beampos:.2f}" + extra_endline)
_laser_tracker_z_target = self.tracker_info["target_z"]
print(f" target position: {_laser_tracker_z_target:.2f}" + extra_endline)
_laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"]
print(
f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}" + extra_endline
)
_laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"]
print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}" + extra_endline)
print(" Reminder - there is also an upper threshold active in rt\n" + extra_endline)
if extra_endline == "":
self.laser_tracker_galil_status()
def laser_tracker_galil_enable(self):
otracky_con = self.device_manager.devices.otracky.obj.controller
otracky_con.socket_put_confirmed("tracken=1")
otracky_con.socket_put_confirmed("trackyct=0")
otracky_con.socket_put_confirmed("trackzct=0")
def laser_tracker_galil_disable(self):
otracky_con = self.device_manager.devices.otracky.obj.controller
otracky_con.socket_put_confirmed("tracken=0")
def laser_tracker_galil_status(self):
otracky_con = self.device_manager.devices.otracky.obj.controller
if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0:
print(self.red + "Tracking in the Galil Controller is disabled." + self.white)
print("Use dev.rtx.controller.laser_tracker_galil_enable to enable.\n")
return 0
else:
print("Tracking in the Galil Controller is enabled.")
trackyct = int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
trackzct = int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
print(f"Galil Trackcounters y={trackyct}, z={trackzct}")
def show_signal_strength_interferometer(self):
channelnames = {1: "OSA FZP Y", 2: "ST OSA Y", 3: "OSA FZP X", 4: "ST OSA X", 5: "Angle"}
self.feedback_get_status_and_ssi()
t = PrettyTable()
t.title = f"Interferometer signal strength"
t.field_names = ["Channel", "Name", "Value"]
for i in range(1, 6):
ssi = self.ssi[f"ssi_{i}"]
t.add_row([i, channelnames[i], ssi])
print(t)
def _omny_interferometer_switch_open_socket(self):
# open a socket connection to the laser interferometer server
host = "mpc3217.psi.ch" # as both code is running on same pc
port = 3335 # socket server port number
self.switchbox_socket.connect((host, port)) # connect to the server
self._omny_interferometer_switch_put_and_receive("?000\r")
time.sleep(1)
def _omny_interferometer_switch_channel(self, channel):
self._omny_interferometer_switch_alloff()
time.sleep(0.1)
if channel == 1: # Relais 1 and 2
rback = self._omny_interferometer_switch_put_and_receive("!0020003\r")
# if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 2: # Relais 3 and 4
rback = self._omny_interferometer_switch_put_and_receive("!002000C\r")
# if "|000C\r" != self._omny_interferometer_switch_put_and_receive("!002000C\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 3: # Relais 5 and 6
rback = self._omny_interferometer_switch_put_and_receive("!0020030\r")
# if "|0030\r" != self._omny_interferometer_switch_put_and_receive("!0020030\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 4: # Relais 7 and 8
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 5: # Relais 9 and 10
rback = self._omny_interferometer_switch_put_and_receive("!0020300\r")
# if "|0300\r" != self._omny_interferometer_switch_put_and_receive("!0020300\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 6: # Relais 11 and 12
rback = self._omny_interferometer_switch_put_and_receive("!0020C00\r")
# if "|0C00\r" != self._omny_interferometer_switch_put_and_receive("!0020C00\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 7: # Relais 13 and 14
rback = self._omny_interferometer_switch_put_and_receive("!0023000\r")
# if "|3000\r" != self._omny_interferometer_switch_put_and_receive("!0023000\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 8: # Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 9: # Relais 15 and 16
rback = self._omny_interferometer_switch_put_and_receive("!002C000\r")
# if "|C000\r" != self._omny_interferometer_switch_put_and_receive("!002C000\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
if "0000" not in rback:
raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
def _omny_interferometer_switch_put_and_receive(self, command_str: str) -> str:
try:
self.switchbox_socket.send(command_str.encode())
except:
self._omny_interferometer_switch_open_socket()
time.sleep(0.1)
self.switchbox_socket.send(command_str.encode())
recv = self.switchbox_socket.recv(1024).decode()
# print("sent")
# print(command_str)
# print("\nreceived")
# print(recv)
# print("")
return recv
def _omny_interferometer_switch_LED_on(self):
self._omny_interferometer_switch_put_and_receive("!00S01\r")
def _omny_interferometer_switch_alloff(self):
# switch all off
self._omny_interferometer_switch_put_and_receive("!0020000\r")
# LED OFF
time.sleep(0.1)
self._omny_interferometer_switch_put_and_receive("!00S00\r")
time.sleep(0.1)
alloff = self._omny_interferometer_switch_put_and_receive("?002\r")
# check all off
if "00" not in alloff:
raise RtOMNY_mirror_switchbox_Error("Not all channels switched off.")
def _rt_start_averaging_SSI(self):
self.socket_put("J3")
def _omny_interferometer_get_signalsample(self, channel, averaging_duration):
# channel is string, eg ssi_1
# ensure no averaging running currently
self.feedback_is_running()
# measure first sample
self._rt_start_averaging_SSI()
time.sleep(averaging_duration)
self.feedback_is_running()
return self.ssi[channel]
@threadlocked
def start_scan(self):
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an"
" interferometer error."
)
raise RtError(
"Cannot start scan because feedback loop is not running or there is an"
" interferometer error."
)
# here exception
(mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
raise RtError("Cannot start scan because no target positions are planned.")
# hier exception
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
@retry_once
@threadlocked
def get_scan_status(self):
return_table = (self.socket_put_and_receive("sr")).split(",")
if len(return_table) != 3:
raise RtCommunicationError(
f"Expected to receive 3 return values. Instead received {return_table}"
)
mode = int(float(return_table[0]))
# mode 0: direct positioning
# mode 1: running internal timer (not tested/used anymore)
# mode 2: rt point scan running
# mode 3: rt point scan starting
# mode 5/6: rt continuous scanning (not available in LamNI)
number_of_positions_planned = int(float(return_table[1]))
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
[docs]
class RtOMNYReadbackSignal(RtReadbackSignal):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
time.sleep(0.1)
return_table = (self.controller.socket_put_and_receive(f"pr")).split(",")
current_pos = float(return_table[self.parent.axis_Id_numeric])
current_pos *= self.parent.sign
self.parent.user_setpoint.setpoint = current_pos
return current_pos
[docs]
class RtOMNYSetpointSignal(RtSetpointSignal):
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
RtError: Raised if interferometer feedback is disabled.
"""
if not self.parent.controller.feedback_is_running():
print(
"The interferometer feedback is not running. Either it is turned off or and"
" interferometer error occured."
)
raise RtError(
"The interferometer feedback is not running. Either it is turned off or and"
" interferometer error occured."
)
self.set_with_feedback_disabled(val)
def set_with_feedback_disabled(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
[docs]
class RtOMNYFeedbackRunning(RtSignalRO):
@threadlocked
def _socket_get(self):
return int(self.parent.controller.feedback_is_running())
[docs]
class RtOMNYMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(RtOMNYReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(RtOMNYSetpointSignal, signal_name="setpoint")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc3217.psi.ch",
port=2222,
sign=1,
socket_cls=SocketIO,
device_manager=None,
limits=None,
**kwargs,
):
self.axis_Id = axis_Id
self.sign = sign
self.controller = RtOMNYController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager
self.tolerance = kwargs.pop("tolerance", 0.5)
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
self._stopped = False
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
[docs]
def wait_for_connection(self, timeout: float = 30.0) -> bool:
self.controller.on(timeout=timeout)
self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self):
"""
The setpoint is only stored locally. After a restart,
we need to update it to match the current readback value.
"""
self.user_setpoint.setpoint = self.readback.get()
[docs]
def destroy(self):
"""Make sure to turn off the controller socket on destroy."""
self.controller.off(update_config=False)
return super().destroy()
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
[docs]
def check_value(self, value, **kwargs):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= value <= high_limit):
raise LimitError(f"position={value} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
# pylint: disable=protected-access
walk.item._metadata["connected"] = self.controller.connected
def _forward_readback(self, **kwargs):
kwargs.pop("sub_type")
self._run_subs(sub_type="readback", **kwargs)
[docs]
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 100)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
self._stopped = False
try:
while not self.controller.slew_rate_limiters_on_target() and not self._stopped:
print("motor is moving")
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(0.01)
print("Move finished")
self._done_moving(success=(not self._stopped))
finally:
self._stopped = False
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError("Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val > 26:
raise ValueError("Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
[docs]
def stop(self, *, success=False):
self.controller.stop_all_axes()
self._stopped = True
return super().stop(success=success)
[docs]
class RtOMNYFlyer(Device):
USER_ACCESS = ["controller"]
data = Cpt(
AsyncMultiSignal,
name="data",
signals=[
"target_x",
"average_x_st_osa",
"stdev_x_st_osa",
"target_y",
"average_y_st_osa",
"stdev_y_st_osa",
"average_x_osa_fzp",
"stdev_x_osa_fzp",
"average_y_osa_fzp",
"stdev_y_osa_fzp",
"average_rotz_st",
"stdev_rotz_st",
"average_x_st_fzp",
"stdev_x_st_fzp",
"average_y_st_fzp",
"stdev_y_st_fzp",
"average_stdeviations_x_st_fzp",
"average_stdeviations_y_st_fzp",
],
ndim=1,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
)
progress = Cpt(ProgressSignal, doc="Signal to track the progress of the device during a scan")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc3217.psi.ch",
port=2222,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
self.shutdown_event = threading.Event()
self.controller = RtOMNYController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.readout_thread = None
self.scan_done_event = threading.Event()
self.scan_done_event.set()
[docs]
def stage(self):
self.shutdown_event.clear()
self.scan_done_event.set()
return super().stage()
def start_readout(self, status: DeviceStatus):
self.readout_thread = threading.Thread(
target=self.read_positions_from_sampler, args=(status,)
)
self.readout_thread.start()
def kickoff(self) -> DeviceStatus:
self.shutdown_event.clear()
self.scan_done_event.clear()
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
...
self.controller.start_scan()
self.shutdown_event.wait(0.1)
status = DeviceStatus(self)
status.set_finished()
return status
[docs]
def complete(self) -> DeviceStatus:
"""Wait until the flyer is done."""
if self.scan_done_event.is_set():
# if the scan_done_event is already set, we can return a finished status immediately
status = DeviceStatus(self)
status.set_finished()
return status
status = DeviceStatus(self)
self.start_readout(status)
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
return status
[docs]
def stop(self, *, success=False):
self.shutdown_event.set()
self.scan_done_event.set()
if self.readout_thread is not None:
self.readout_thread.join()
return super().stop(success=success)
[docs]
def read_positions_from_sampler(self, status: DeviceStatus):
"""
Read the positions from the sampler and update the data signal.
This function runs in a separate thread and continuously checks the
scan status.
Args:
status (DeviceStatus): The status object to update when the readout is complete.
"""
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
while mode > 0 and not self.shutdown_event.wait(0.01):
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (
self.controller.socket_put_and_receive(f"r{read_counter}")
).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(
value=read_counter, max_value=number_of_positions_planned, done=False
)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
if self.shutdown_event.wait(0.05):
logger.info("Shutdown event set, stopping readout.")
# if we are here, the shutdown_event is set. We can exit the readout loop.
status.set_finished()
return
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
# NOTE: No need to set the status to failed if the shutdown_event is set.
# The stop() method will take care of that.
status.set_finished()
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
logger.info(
"OMNY statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[16])
self.average_stdeviations_y_st_fzp += float(return_table[18])
signals = {
"target_x": {"value": float(return_table[3])},
"average_x_st_osa": {"value": float(return_table[4])},
"stdev_x_st_osa": {"value": float(return_table[5])},
"target_y": {"value": float(return_table[6])},
"average_y_st_osa": {"value": float(return_table[7])},
"stdev_y_st_osa": {"value": float(return_table[8])},
"average_x_osa_fzp": {"value": float(return_table[9])},
"stdev_x_osa_fzp": {"value": float(return_table[10])},
"average_y_osa_fzp": {"value": float(return_table[11])},
"stdev_y_osa_fzp": {"value": float(return_table[12])},
"average_rotz_st": {"value": float(return_table[13])},
"stdev_rotz_st": {"value": float(return_table[14])},
"average_x_st_fzp": {"value": float(return_table[15])},
"stdev_x_st_fzp": {"value": float(return_table[16])},
"average_y_st_fzp": {"value": float(return_table[17])},
"stdev_y_st_fzp": {"value": float(return_table[18])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
if __name__ == "__main__":
rtcontroller = RtOMNYController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
)
rtcontroller.on()
rtcontroller.laser_tracker_on()