import threading
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, DeviceStatus, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable
from csaxs_bec.devices.omny.rt.rt_ophyd import (
RtCommunicationError,
RtError,
RtReadbackSignal,
RtSetpointSignal,
RtSignalRO,
retry_once,
)
logger = bec_logger.logger
[docs]
class RtFlomniController(Controller):
_axes_per_controller = 3
USER_ACCESS = [
"socket_put_and_receive",
"set_rotation_angle",
"feedback_disable",
"feedback_enable_without_reset",
"feedback_enable_with_reset",
"feedback_is_running",
"add_pos_to_scan",
"get_pid_x",
"move_samx_to_scan_region",
"clear_trajectory_generator",
"show_cyclic_error_compensation",
"laser_tracker_on",
"laser_tracker_off",
"laser_tracker_show_all",
"show_signal_strength_interferometer",
"read_ssi_interferometer",
"laser_tracker_check_signalstrength",
"laser_tracker_check_enabled",
"is_axis_moving",
]
def __init__(
self,
*,
name="RtFlomniController",
socket_cls=None,
socket_host=None,
socket_port=None,
device_manager=None,
attr_name="",
parent=None,
labels=None,
kind=None,
):
super().__init__(
name=name,
socket_cls=socket_cls,
socket_host=socket_host,
socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name,
parent=parent,
labels=labels,
kind=kind,
)
self.tracker_info = {}
self._min_scan_buffer_reached = False
self.rt_pid_voltage = None
def is_axis_moving(self, axis_Id) -> bool:
# this checks that axis is on target
axis_is_on_target = bool(float(self.socket_put_and_receive("o")))
return not axis_is_on_target
@threadlocked
def stop_all_axes(self):
self.socket_put("sc")
def add_pos_to_scan(self, positions) -> None:
def send_positions(parent, positions):
parent._min_scan_buffer_reached = False
start_time = time.time()
for pos_index, pos in enumerate(positions):
cmd = f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}"
parent.socket_put_and_receive(cmd)
if pos_index > 100:
parent._min_scan_buffer_reached = True
parent._min_scan_buffer_reached = True
logger.info(
f"Sending {len(positions)} positions took {time.time()-start_time} seconds."
)
threading.Thread(target=send_positions, args=(self, positions), daemon=True).start()
def move_to_zero(self):
self.socket_put("pa0,0")
self.get_axis_by_name("rtx").user_setpoint.setpoint = 0
self.socket_put("pa1,0")
self.get_axis_by_name("rty").user_setpoint.setpoint = 0
self.socket_put("pa2,0")
self.get_axis_by_name("rtz").user_setpoint.setpoint = 0
time.sleep(0.05)
def feedback_is_running(self) -> bool:
status = int(float(self.socket_put_and_receive("l2").strip()))
if status == 1:
return False
return True
def feedback_enable_with_reset(self):
self.socket_put("l0") # disable feedback
self.move_to_zero()
if not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1:
print("Please wait, slew rate limiters not on target.")
logger.info("Please wait, slew rate limiters not on target.")
while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1:
time.sleep(0.05)
self.device_manager.devices.rty.update_user_parameter({"tomo_additional_offsety": 0})
self.clear_trajectory_generator()
self.laser_tracker_on()
# move to 0. FUPR will set the rotation angle during readout
self.device_manager.devices.fsamroy.obj.move(0, wait=True)
fsamx = self.device_manager.devices.fsamx
fsamx.obj.pid_x_correction = 0
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
fsamx_in = fsamx.user_parameter.get("in")
if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.3):
print(
"Something is wrong. fsamx is very far from the samx_in position. Don't dare correct automatically."
)
raise RtError(
"Something is wrong. fsamx is very far from the samx_in position. Don't dare correct automatically."
)
if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.01):
fsamx.read_only = False
fsamx.obj.move(fsamx_in, wait=True)
fsamx.read_only = True
time.sleep(1)
self.socket_put("l1")
time.sleep(0.4)
if not self.feedback_is_running():
print("Feedback is not running; likely an error in the interferometer.")
raise RtError("Feedback is not running; likely an error in the interferometer.")
time.sleep(1.5)
self.show_cyclic_error_compensation()
self.rt_pid_voltage = self.get_pid_x()
rtx = self.device_manager.devices.rtx
rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage})
self.set_device_read_write("fsamx", False)
self.set_device_read_write("fsamy", False)
self.set_device_read_write("foptx", False)
self.set_device_read_write("fopty", False)
def move_samx_to_scan_region(self, cenx: float, move_in_this_routine: bool = False):
# attention. a movement will clear all positions in the rt trajectory generator!
if move_in_this_routine == True:
self.device_manager.devices.rtx.obj.move(cenx, wait=True)
time.sleep(0.05)
# at cenx we expect the PID to be close to zero for a good fsamx position
if self.rt_pid_voltage is None:
rtx = self.device_manager.devices.rtx
self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage")
if self.rt_pid_voltage is None:
raise RtError(
"rt_pid_voltage not set in rtx user parameters. Please run feedback_enable_with_reset first."
)
logger.info(f"Using PID voltage from rtx user parameter: {self.rt_pid_voltage}")
expected_voltage = self.rt_pid_voltage
# logger.info(f"Expected PID voltage: {expected_voltage}")
logger.info(f"Current PID voltage: {self.get_pid_x()}")
wait_on_exit = False
# we allow 2V range from center, this corresponds to 30 microns
if np.abs(self.get_pid_x() - expected_voltage) < 2:
logger.info("No correction of fsamx needed")
else:
fsamx = self.device_manager.devices.fsamx
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
while True:
# when we correct, then to 1 V, within 15 microns
if np.abs(self.get_pid_x() - expected_voltage) < 1:
logger.info("No further correction needed")
break
wait_on_exit = True
# disable FZP piezo feedback
self.socket_put("v0")
fsamx.read_only = False
logger.info(f"Current PID voltage: {self.get_pid_x()}")
# here we accumulate the correction
fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.006
fsamx_in = fsamx.user_parameter.get("in")
logger.info(
f"Moving fsamx to {cenx / 1000 * 0.7 + fsamx.obj.pid_x_correction}, PID portion of that {fsamx.obj.pid_x_correction}"
)
fsamx.obj.move(fsamx_in + cenx / 1000 * 0.7 + fsamx.obj.pid_x_correction, wait=True)
fsamx.read_only = True
time.sleep(0.1)
self.laser_tracker_on()
time.sleep(0.01)
if wait_on_exit:
time.sleep(1)
# enable fast FZP feedback again
self.socket_put("v1")
@threadlocked
def clear_trajectory_generator(self):
self.socket_put("sc")
logger.info("flomni scan stopped and deleted, moving to start position")
def feedback_enable_without_reset(self):
self.laser_tracker_on()
self.socket_put("l3")
time.sleep(0.01)
if not self.feedback_is_running():
print("Feedback is not running; likely an error in the interferometer.")
raise RtError("Feedback is not running; likely an error in the interferometer.")
self.set_device_read_write("fsamx", False)
self.set_device_read_write("fsamy", False)
self.set_device_read_write("foptx", False)
self.set_device_read_write("fopty", False)
def feedback_disable(self):
self.clear_trajectory_generator()
self.move_to_zero()
self.socket_put("l0")
self.set_device_read_write("fsamx", True)
self.set_device_read_write("fsamy", True)
self.set_device_read_write("foptx", True)
self.set_device_read_write("fopty", True)
fsamx = self.device_manager.devices.fsamx
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]")
print("rt feedback is now disalbed.")
def get_pid_x(self) -> float:
voltage = float(self.socket_put_and_receive("g").strip())
return voltage
def show_cyclic_error_compensation(self):
cec0 = int(float(self.socket_put_and_receive("w0").strip()))
cec1 = int(float(self.socket_put_and_receive("w1").strip()))
if cec0 == 32:
logger.info("Cyclic Error Compensation: y-axis is initialized")
else:
logger.info("Cyclic Error Compensation: y-axis is NOT initialized")
print("Cyclic Error Compensation: y-axis is NOT initialized")
if cec1 == 32:
logger.info("Cyclic Error Compensation: x-axis is initialized")
else:
logger.info("Cyclic Error Compensation: x-axis is NOT initialized")
print("Cyclic Error Compensation: x-axis is NOT initialized")
def set_rotation_angle(self, val: float) -> None:
self.socket_put(f"a{val/180*np.pi}")
def laser_tracker_check_enabled(self) -> bool:
self.laser_update_tracker_info()
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
return True
else:
return False
def laser_tracker_on(self):
if not self.laser_tracker_check_enabled():
logger.info("Enabling the laser tracker. Please wait...")
print("Enabling the laser tracker. Please wait...")
tracker_intensity = self.tracker_info["tracker_intensity"]
if (
tracker_intensity < self.tracker_info["threshold_intensity_y"]
or tracker_intensity < self.tracker_info["threshold_intensity_z"]
):
logger.info(self.tracker_info)
print("The tracker cannot be enabled because the beam intensity it low.")
raise RtError("The tracker cannot be enabled because the beam intensity it low.")
self.move_to_zero()
self.socket_put("T1")
time.sleep(0.5)
self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackyct=0")
self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackzct=0")
self.laser_tracker_wait_on_target()
logger.info("Laser tracker running!")
print("Laser tracker running!")
def laser_tracker_off(self):
if self.feedback_is_running():
print(
"Interferometer feedback is running. Cannot disable the tracker. First disable the feedback using rt_feedback_disable()"
)
else:
self.socket_put("T0")
logger.info("Disabled the laser tracker")
print("Disabled the laser tracker")
def laser_tracker_show_all(self):
self.laser_update_tracker_info()
t = PrettyTable()
t.title = f"Laser Tracker Info"
t.field_names = ["Name", "Value"]
for key, val in self.tracker_info.items():
t.add_row([key, val])
print(t)
def laser_update_tracker_info(self):
ret = self.socket_put_and_receive("Ts")
# remove trailing \n
ret = ret.split("\n")[0]
tracker_values = [float(val) for val in ret.split(",")]
self.tracker_info = {
"tracker_intensity": tracker_values[2],
"threshold_intensity_y": tracker_values[8],
"enabled_y": bool(tracker_values[10]),
"beampos_y": tracker_values[5],
"target_y": tracker_values[6],
"piezo_voltage_y": tracker_values[9],
"threshold_intensity_z": tracker_values[3],
"enabled_z": bool(tracker_values[10]),
"beampos_z": tracker_values[0],
"target_z": tracker_values[1],
"piezo_voltage_z": tracker_values[4],
}
def laser_tracker_galil_enable(self):
ftrackz_con = self.device_manager.devices.ftrackz.obj.controller
if not ftrackz_con.is_thread_active(5):
ftrackz_con.socket_put_confirmed("tracken=1")
ftrackz_con.socket_put_confirmed("trackyct=0")
ftrackz_con.socket_put_confirmed("trackzct=0")
ftrackz_con.socket_put_confirmed("XQ#Tracker")
while not ftrackz_con.is_thread_active(5):
time.sleep(0.1)
def laser_tracker_on_target(self) -> bool:
self.laser_update_tracker_info()
if np.isclose(
self.tracker_info["beampos_y"], self.tracker_info["target_y"], atol=0.02
) and np.isclose(self.tracker_info["beampos_z"], self.tracker_info["target_z"], atol=0.02):
return True
return False
def laser_tracker_wait_on_target(self):
max_repeat = 25
count = 0
self.laser_tracker_galil_enable()
while not self.laser_tracker_on_target():
logger.info("Waiting for laser tracker to reach target.")
time.sleep(0.5)
count += 1
if count == 10:
ftrackz_con = self.device_manager.devices.ftrackz.obj.controller
ftrackz_con.socket_put_confirmed("tracken=1")
ftrackz_con.socket_put_confirmed("trackyct=0")
ftrackz_con.socket_put_confirmed("trackzct=0")
if count > max_repeat:
print("Failed to reach laser target position.")
raise RtError("Failed to reach laser target position.")
def slew_rate_limiters_on_target(self) -> bool:
ret = int(float(self.socket_put_and_receive("y").strip()))
if ret == 3:
return True
return False
def pid_y(self) -> float:
ret = float(self.socket_put_and_receive("G").strip())
return ret
def read_ssi_interferometer(self, axis_number):
val = float(self.socket_put_and_receive(f"j{axis_number}").strip())
return val
def laser_tracker_check_signalstrength(self, verbose=True):
if not self.laser_tracker_check_enabled():
returnval = "disabled"
else:
returnval = "ok"
self.laser_tracker_wait_on_target()
signal = self.read_ssi_interferometer(1)
rtx = self.device_manager.devices.rtx
min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal")
if verbose:
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal:
time.sleep(1)
if signal < min_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m"
)
returnval = "toolow"
# raise RtError("The interferometer signal of tracker is too low.")
elif signal < low_signal:
print(
f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m"
)
returnval = "low"
return returnval
def show_signal_strength_interferometer(self):
t = PrettyTable()
t.title = f"Interferometer signal strength"
t.field_names = ["Axis", "Value"]
for i in range(4):
t.add_row([i, self.read_ssi_interferometer(i)])
print(t)
@threadlocked
def start_scan(self):
if not self.feedback_is_running():
logger.error(
"Cannot start scan because feedback loop is not running or there is an"
" interferometer error."
)
raise RtError(
"Cannot start scan because feedback loop is not running or there is an"
" interferometer error."
)
# here exception
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
if number_of_positions_planned == 0:
logger.error("Cannot start scan because no target positions are planned.")
raise RtError("Cannot start scan because no target positions are planned.")
# hier exception
# start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd")
@retry_once
@threadlocked
def get_scan_status(self):
return_table = (self.socket_put_and_receive("sr")).split(",")
if len(return_table) != 3:
raise RtCommunicationError(
f"Expected to receive 3 return values. Instead received {return_table}"
)
mode = int(float(return_table[0]))
# mode 0: direct positioning
# mode 1: running internal timer (not tested/used anymore)
# mode 2: rt point scan running
# mode 3: rt point scan starting
# mode 5/6: rt continuous scanning (not available in LamNI)
number_of_positions_planned = int(float(return_table[1]))
current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan)
[docs]
class RtFlomniReadbackSignal(RtReadbackSignal):
@retry_once
@threadlocked
def _socket_get(self) -> float:
"""Get command for the readback signal
Returns:
float: Readback value after adjusting for sign and motor resolution.
"""
time.sleep(0.1)
return_table = (self.controller.socket_put_and_receive(f"pr")).split(",")
current_pos = float(return_table[self.parent.axis_Id_numeric])
current_pos *= self.parent.sign
self.parent.user_setpoint.setpoint = current_pos
return current_pos
[docs]
class RtFlomniSetpointSignal(RtSetpointSignal):
@retry_once
@threadlocked
def _socket_set(self, val: float) -> None:
"""Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign.
Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted.
Args:
val (float): Target value / setpoint value
Raises:
RtError: Raised if interferometer feedback is disabled.
"""
if not self.parent.controller.feedback_is_running():
print(
"The interferometer feedback is not running. Either it is turned off or and"
" interferometer error occured."
)
raise RtError(
"The interferometer feedback is not running. Either it is turned off or and"
" interferometer error occured."
)
tracker_status = self.parent.controller.laser_tracker_check_signalstrength()
if tracker_status == "toolow":
print("The interferometer signal is too low for movements. Realignment required.")
raise RtError(
"The interferometer signal is too low for movements. Realignment required."
)
self.set_with_feedback_disabled(val)
def set_with_feedback_disabled(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
[docs]
class RtFlomniFeedbackRunning(RtSignalRO):
@threadlocked
def _socket_get(self):
return int(self.parent.controller.feedback_is_running())
[docs]
class RtFlomniMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
readback = Cpt(RtFlomniReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(RtFlomniSetpointSignal, signal_name="setpoint")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
connectionTimeout = 20
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2844.psi.ch",
port=2222,
sign=1,
socket_cls=SocketIO,
device_manager=None,
limits=None,
**kwargs,
):
self.axis_Id = axis_Id
self.sign = sign
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager
self.tolerance = kwargs.pop("tolerance", 0.5)
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
self._stopped = False
# self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE)
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
[docs]
def wait_for_connection(self, timeout: float = 30.0) -> bool:
self.controller.on(timeout=timeout)
self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self):
"""
The setpoint is only stored locally. After a restart,
we need to update it to match the current readback value.
"""
self.user_setpoint.setpoint = self.readback.get()
[docs]
def destroy(self):
"""Make sure to turn off the controller socket on destroy."""
self.controller.off(update_config=False)
return super().destroy()
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
[docs]
def check_value(self, value, **kwargs):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= value <= high_limit):
raise LimitError(f"position={value} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
# pylint: disable=protected-access
walk.item._metadata["connected"] = self.controller.connected
def _forward_readback(self, **kwargs):
kwargs.pop("sub_type")
self._run_subs(sub_type="readback", **kwargs)
[docs]
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 100)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
def move_and_finish():
while not self.controller.slew_rate_limiters_on_target() and not self._stopped:
print("motor is moving")
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(0.01)
print("Move finished")
self._done_moving(success=(not self._stopped))
self._stopped = False
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val):
if isinstance(val, str):
if len(val) != 1:
raise ValueError("Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
@property
def axis_Id_numeric(self):
return self._axis_Id_numeric
@axis_Id_numeric.setter
def axis_Id_numeric(self, val):
if isinstance(val, int):
if val > 26:
raise ValueError("Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
def kickoff(self, metadata, **kwargs) -> None:
self.controller.kickoff(metadata)
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
[docs]
def stop(self, *, success=False):
self.controller.stop_all_axes()
self._stopped = True
return super().stop(success=success)
[docs]
class RtFlomniFlyer(Device):
USER_ACCESS = ["controller"]
data = Cpt(
AsyncMultiSignal,
name="data",
signals=[
"target_x",
"average_x_st_fzp",
"stdev_x_st_fzp",
"target_y",
"average_y_st_fzp",
"stdev_y_st_fzp",
"average_rotz",
"stdev_rotz",
"average_stdeviations_x_st_fzp",
"average_stdeviations_y_st_fzp",
],
ndim=1,
async_update={"type": "add", "max_shape": [None]},
max_size=1000,
)
progress = Cpt(
ProgressSignal, doc="ProgressSignal indicating the progress of the device during a scan."
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2844.psi.ch",
port=2222,
socket_cls=SocketIO,
device_manager=None,
**kwargs,
):
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
self.shutdown_event = threading.Event()
self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
self.readout_thread = None
self.scan_done_event = threading.Event()
self.scan_done_event.set()
[docs]
def read_positions_from_sampler(self, status: DeviceStatus):
"""
Read the positions from the sampler and update the data signal.
This function runs in a separate thread and continuously checks the
scan status.
Args:
status (DeviceStatus): The status object to update when the readout is complete.
"""
read_counter = 0
self.average_stdeviations_x_st_fzp = 0
self.average_stdeviations_y_st_fzp = 0
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
# while scan is running
while mode > 0 and not self.shutdown_event.wait(0.01):
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
mode, number_of_positions_planned, current_position_in_scan = (
self.controller.get_scan_status()
)
if current_position_in_scan > 5:
while current_position_in_scan > read_counter + 1:
return_table = (
self.controller.socket_put_and_receive(f"r{read_counter}")
).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(
value=read_counter, max_value=number_of_positions_planned, done=False
)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
if self.shutdown_event.wait(0.05):
logger.info("Shutdown event set, stopping readout.")
# if we are here, the shutdown_event is set. We can exit the readout loop.
status.set_finished()
return
# read the last samples even though scan is finished already
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
read_counter = read_counter + 1
signals = self._get_signals_from_table(return_table)
self.data.set(signals)
# NOTE: No need to set the status to failed if the shutdown_event is set.
# The stop() method will take care of that.
status.set_finished()
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
logger.info(
"Flomni statistics: Average of all standard deviations: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
)
def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[4])
self.average_stdeviations_y_st_fzp += float(return_table[7])
signals = {
"target_x": {"value": float(return_table[2])},
"average_x_st_fzp": {"value": float(return_table[3])},
"stdev_x_st_fzp": {"value": float(return_table[4])},
"target_y": {"value": float(return_table[5])},
"average_y_st_fzp": {"value": float(return_table[6])},
"stdev_y_st_fzp": {"value": float(return_table[7])},
"average_rotz": {"value": float(return_table[8])},
"stdev_rotz": {"value": float(return_table[9])},
"average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
},
"average_stdeviations_y_st_fzp": {
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
},
}
return signals
[docs]
def stage(self):
self.shutdown_event.clear()
self.scan_done_event.set()
return super().stage()
def start_readout(self, status: DeviceStatus):
self.readout_thread = threading.Thread(
target=self.read_positions_from_sampler, args=(status,)
)
self.readout_thread.start()
def kickoff(self) -> DeviceStatus:
self.shutdown_event.clear()
self.scan_done_event.clear()
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
...
self.controller.start_scan()
self.shutdown_event.wait(0.1)
status = DeviceStatus(self)
status.set_finished()
return status
[docs]
def complete(self) -> DeviceStatus:
"""Wait until the flyer is done."""
if self.scan_done_event.is_set():
# if the scan_done_event is already set, we can return a finished status immediately
status = DeviceStatus(self)
status.set_finished()
return status
status = DeviceStatus(self)
self.start_readout(status)
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
return status
[docs]
def stop(self, *, success=False):
self.shutdown_event.set()
self.scan_done_event.set()
if self.readout_thread is not None:
self.readout_thread.join()
return super().stop(success=success)
if __name__ == "__main__":
rtcontroller = RtFlomniController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
)
rtcontroller.on()
rtcontroller.laser_tracker_on()