Source code for csaxs_bec.devices.omny.rt.rt_flomni_ophyd

import threading
import time

import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices import AsyncMultiSignal, DeviceStatus, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from prettytable import PrettyTable

from csaxs_bec.devices.omny.rt.rt_ophyd import (
    RtCommunicationError,
    RtError,
    RtReadbackSignal,
    RtSetpointSignal,
    RtSignalRO,
    retry_once,
)

logger = bec_logger.logger


[docs] class RtFlomniController(Controller): _axes_per_controller = 3 USER_ACCESS = [ "socket_put_and_receive", "set_rotation_angle", "feedback_disable", "feedback_enable_without_reset", "feedback_enable_with_reset", "feedback_is_running", "add_pos_to_scan", "get_pid_x", "move_samx_to_scan_region", "clear_trajectory_generator", "show_cyclic_error_compensation", "laser_tracker_on", "laser_tracker_off", "laser_tracker_show_all", "show_signal_strength_interferometer", "read_ssi_interferometer", "laser_tracker_check_signalstrength", "laser_tracker_check_enabled", "is_axis_moving", ] def __init__( self, *, name="RtFlomniController", socket_cls=None, socket_host=None, socket_port=None, device_manager=None, attr_name="", parent=None, labels=None, kind=None, ): super().__init__( name=name, socket_cls=socket_cls, socket_host=socket_host, socket_port=socket_port, device_manager=device_manager, attr_name=attr_name, parent=parent, labels=labels, kind=kind, ) self.tracker_info = {} self._min_scan_buffer_reached = False self.rt_pid_voltage = None def is_axis_moving(self, axis_Id) -> bool: # this checks that axis is on target axis_is_on_target = bool(float(self.socket_put_and_receive("o"))) return not axis_is_on_target @threadlocked def stop_all_axes(self): self.socket_put("sc") def add_pos_to_scan(self, positions) -> None: def send_positions(parent, positions): parent._min_scan_buffer_reached = False start_time = time.time() for pos_index, pos in enumerate(positions): cmd = f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}" parent.socket_put_and_receive(cmd) if pos_index > 100: parent._min_scan_buffer_reached = True parent._min_scan_buffer_reached = True logger.info( f"Sending {len(positions)} positions took {time.time()-start_time} seconds." ) threading.Thread(target=send_positions, args=(self, positions), daemon=True).start() def move_to_zero(self): self.socket_put("pa0,0") self.get_axis_by_name("rtx").user_setpoint.setpoint = 0 self.socket_put("pa1,0") self.get_axis_by_name("rty").user_setpoint.setpoint = 0 self.socket_put("pa2,0") self.get_axis_by_name("rtz").user_setpoint.setpoint = 0 time.sleep(0.05) def feedback_is_running(self) -> bool: status = int(float(self.socket_put_and_receive("l2").strip())) if status == 1: return False return True def feedback_enable_with_reset(self): self.socket_put("l0") # disable feedback self.move_to_zero() if not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1: print("Please wait, slew rate limiters not on target.") logger.info("Please wait, slew rate limiters not on target.") while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1: time.sleep(0.05) self.device_manager.devices.rty.update_user_parameter({"tomo_additional_offsety": 0}) self.clear_trajectory_generator() self.laser_tracker_on() # move to 0. FUPR will set the rotation angle during readout self.device_manager.devices.fsamroy.obj.move(0, wait=True) fsamx = self.device_manager.devices.fsamx fsamx.obj.pid_x_correction = 0 fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") fsamx_in = fsamx.user_parameter.get("in") if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.3): print( "Something is wrong. fsamx is very far from the samx_in position. Don't dare correct automatically." ) raise RtError( "Something is wrong. fsamx is very far from the samx_in position. Don't dare correct automatically." ) if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.01): fsamx.read_only = False fsamx.obj.move(fsamx_in, wait=True) fsamx.read_only = True time.sleep(1) self.socket_put("l1") time.sleep(0.4) if not self.feedback_is_running(): print("Feedback is not running; likely an error in the interferometer.") raise RtError("Feedback is not running; likely an error in the interferometer.") time.sleep(1.5) self.show_cyclic_error_compensation() self.rt_pid_voltage = self.get_pid_x() rtx = self.device_manager.devices.rtx rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage}) self.set_device_read_write("fsamx", False) self.set_device_read_write("fsamy", False) self.set_device_read_write("foptx", False) self.set_device_read_write("fopty", False) def move_samx_to_scan_region(self, cenx: float, move_in_this_routine: bool = False): # attention. a movement will clear all positions in the rt trajectory generator! if move_in_this_routine == True: self.device_manager.devices.rtx.obj.move(cenx, wait=True) time.sleep(0.05) # at cenx we expect the PID to be close to zero for a good fsamx position if self.rt_pid_voltage is None: rtx = self.device_manager.devices.rtx self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage") if self.rt_pid_voltage is None: raise RtError( "rt_pid_voltage not set in rtx user parameters. Please run feedback_enable_with_reset first." ) logger.info(f"Using PID voltage from rtx user parameter: {self.rt_pid_voltage}") expected_voltage = self.rt_pid_voltage # logger.info(f"Expected PID voltage: {expected_voltage}") logger.info(f"Current PID voltage: {self.get_pid_x()}") wait_on_exit = False # we allow 2V range from center, this corresponds to 30 microns if np.abs(self.get_pid_x() - expected_voltage) < 2: logger.info("No correction of fsamx needed") else: fsamx = self.device_manager.devices.fsamx fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") while True: # when we correct, then to 1 V, within 15 microns if np.abs(self.get_pid_x() - expected_voltage) < 1: logger.info("No further correction needed") break wait_on_exit = True # disable FZP piezo feedback self.socket_put("v0") fsamx.read_only = False logger.info(f"Current PID voltage: {self.get_pid_x()}") # here we accumulate the correction fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.006 fsamx_in = fsamx.user_parameter.get("in") logger.info( f"Moving fsamx to {cenx / 1000 * 0.7 + fsamx.obj.pid_x_correction}, PID portion of that {fsamx.obj.pid_x_correction}" ) fsamx.obj.move(fsamx_in + cenx / 1000 * 0.7 + fsamx.obj.pid_x_correction, wait=True) fsamx.read_only = True time.sleep(0.1) self.laser_tracker_on() time.sleep(0.01) if wait_on_exit: time.sleep(1) # enable fast FZP feedback again self.socket_put("v1") @threadlocked def clear_trajectory_generator(self): self.socket_put("sc") logger.info("flomni scan stopped and deleted, moving to start position") def feedback_enable_without_reset(self): self.laser_tracker_on() self.socket_put("l3") time.sleep(0.01) if not self.feedback_is_running(): print("Feedback is not running; likely an error in the interferometer.") raise RtError("Feedback is not running; likely an error in the interferometer.") self.set_device_read_write("fsamx", False) self.set_device_read_write("fsamy", False) self.set_device_read_write("foptx", False) self.set_device_read_write("fopty", False) def feedback_disable(self): self.clear_trajectory_generator() self.move_to_zero() self.socket_put("l0") self.set_device_read_write("fsamx", True) self.set_device_read_write("fsamy", True) self.set_device_read_write("foptx", True) self.set_device_read_write("fopty", True) fsamx = self.device_manager.devices.fsamx fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]") print("rt feedback is now disalbed.") def get_pid_x(self) -> float: voltage = float(self.socket_put_and_receive("g").strip()) return voltage def show_cyclic_error_compensation(self): cec0 = int(float(self.socket_put_and_receive("w0").strip())) cec1 = int(float(self.socket_put_and_receive("w1").strip())) if cec0 == 32: logger.info("Cyclic Error Compensation: y-axis is initialized") else: logger.info("Cyclic Error Compensation: y-axis is NOT initialized") print("Cyclic Error Compensation: y-axis is NOT initialized") if cec1 == 32: logger.info("Cyclic Error Compensation: x-axis is initialized") else: logger.info("Cyclic Error Compensation: x-axis is NOT initialized") print("Cyclic Error Compensation: x-axis is NOT initialized") def set_rotation_angle(self, val: float) -> None: self.socket_put(f"a{val/180*np.pi}") def laser_tracker_check_enabled(self) -> bool: self.laser_update_tracker_info() if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]: return True else: return False def laser_tracker_on(self): if not self.laser_tracker_check_enabled(): logger.info("Enabling the laser tracker. Please wait...") print("Enabling the laser tracker. Please wait...") tracker_intensity = self.tracker_info["tracker_intensity"] if ( tracker_intensity < self.tracker_info["threshold_intensity_y"] or tracker_intensity < self.tracker_info["threshold_intensity_z"] ): logger.info(self.tracker_info) print("The tracker cannot be enabled because the beam intensity it low.") raise RtError("The tracker cannot be enabled because the beam intensity it low.") self.move_to_zero() self.socket_put("T1") time.sleep(0.5) self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackyct=0") self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackzct=0") self.laser_tracker_wait_on_target() logger.info("Laser tracker running!") print("Laser tracker running!") def laser_tracker_off(self): if self.feedback_is_running(): print( "Interferometer feedback is running. Cannot disable the tracker. First disable the feedback using rt_feedback_disable()" ) else: self.socket_put("T0") logger.info("Disabled the laser tracker") print("Disabled the laser tracker") def laser_tracker_show_all(self): self.laser_update_tracker_info() t = PrettyTable() t.title = f"Laser Tracker Info" t.field_names = ["Name", "Value"] for key, val in self.tracker_info.items(): t.add_row([key, val]) print(t) def laser_update_tracker_info(self): ret = self.socket_put_and_receive("Ts") # remove trailing \n ret = ret.split("\n")[0] tracker_values = [float(val) for val in ret.split(",")] self.tracker_info = { "tracker_intensity": tracker_values[2], "threshold_intensity_y": tracker_values[8], "enabled_y": bool(tracker_values[10]), "beampos_y": tracker_values[5], "target_y": tracker_values[6], "piezo_voltage_y": tracker_values[9], "threshold_intensity_z": tracker_values[3], "enabled_z": bool(tracker_values[10]), "beampos_z": tracker_values[0], "target_z": tracker_values[1], "piezo_voltage_z": tracker_values[4], } def laser_tracker_galil_enable(self): ftrackz_con = self.device_manager.devices.ftrackz.obj.controller if not ftrackz_con.is_thread_active(5): ftrackz_con.socket_put_confirmed("tracken=1") ftrackz_con.socket_put_confirmed("trackyct=0") ftrackz_con.socket_put_confirmed("trackzct=0") ftrackz_con.socket_put_confirmed("XQ#Tracker") while not ftrackz_con.is_thread_active(5): time.sleep(0.1) def laser_tracker_on_target(self) -> bool: self.laser_update_tracker_info() if np.isclose( self.tracker_info["beampos_y"], self.tracker_info["target_y"], atol=0.02 ) and np.isclose(self.tracker_info["beampos_z"], self.tracker_info["target_z"], atol=0.02): return True return False def laser_tracker_wait_on_target(self): max_repeat = 25 count = 0 self.laser_tracker_galil_enable() while not self.laser_tracker_on_target(): logger.info("Waiting for laser tracker to reach target.") time.sleep(0.5) count += 1 if count == 10: ftrackz_con = self.device_manager.devices.ftrackz.obj.controller ftrackz_con.socket_put_confirmed("tracken=1") ftrackz_con.socket_put_confirmed("trackyct=0") ftrackz_con.socket_put_confirmed("trackzct=0") if count > max_repeat: print("Failed to reach laser target position.") raise RtError("Failed to reach laser target position.") def slew_rate_limiters_on_target(self) -> bool: ret = int(float(self.socket_put_and_receive("y").strip())) if ret == 3: return True return False def pid_y(self) -> float: ret = float(self.socket_put_and_receive("G").strip()) return ret def read_ssi_interferometer(self, axis_number): val = float(self.socket_put_and_receive(f"j{axis_number}").strip()) return val def laser_tracker_check_signalstrength(self, verbose=True): if not self.laser_tracker_check_enabled(): returnval = "disabled" else: returnval = "ok" self.laser_tracker_wait_on_target() signal = self.read_ssi_interferometer(1) rtx = self.device_manager.devices.rtx min_signal = rtx.user_parameter.get("min_signal") low_signal = rtx.user_parameter.get("low_signal") if verbose: print(f"low signal: {low_signal}") print(f"min signal: {min_signal}") print(f"signal: {signal}") if signal < min_signal: time.sleep(1) if signal < min_signal: print( f"\x1b[91mThe signal of the tracker {signal} is below the minimum required signal of {min_signal}. Readjustment requred!\x1b[0m" ) returnval = "toolow" # raise RtError("The interferometer signal of tracker is too low.") elif signal < low_signal: print( f"\x1b[91mThe signal of the tracker {signal} is below the warning limit of {low_signal}. Readjustment recommended!\x1b[0m" ) returnval = "low" return returnval def show_signal_strength_interferometer(self): t = PrettyTable() t.title = f"Interferometer signal strength" t.field_names = ["Axis", "Value"] for i in range(4): t.add_row([i, self.read_ssi_interferometer(i)]) print(t) @threadlocked def start_scan(self): if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an" " interferometer error." ) raise RtError( "Cannot start scan because feedback loop is not running or there is an" " interferometer error." ) # here exception mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status() if number_of_positions_planned == 0: logger.error("Cannot start scan because no target positions are planned.") raise RtError("Cannot start scan because no target positions are planned.") # hier exception # start a point-by-point scan (for cont scan in flomni it would be "sa") self.socket_put_and_receive("sd") @retry_once @threadlocked def get_scan_status(self): return_table = (self.socket_put_and_receive("sr")).split(",") if len(return_table) != 3: raise RtCommunicationError( f"Expected to receive 3 return values. Instead received {return_table}" ) mode = int(float(return_table[0])) # mode 0: direct positioning # mode 1: running internal timer (not tested/used anymore) # mode 2: rt point scan running # mode 3: rt point scan starting # mode 5/6: rt continuous scanning (not available in LamNI) number_of_positions_planned = int(float(return_table[1])) current_position_in_scan = int(float(return_table[2])) return (mode, number_of_positions_planned, current_position_in_scan)
[docs] class RtFlomniReadbackSignal(RtReadbackSignal): @retry_once @threadlocked def _socket_get(self) -> float: """Get command for the readback signal Returns: float: Readback value after adjusting for sign and motor resolution. """ time.sleep(0.1) return_table = (self.controller.socket_put_and_receive(f"pr")).split(",") current_pos = float(return_table[self.parent.axis_Id_numeric]) current_pos *= self.parent.sign self.parent.user_setpoint.setpoint = current_pos return current_pos
[docs] class RtFlomniSetpointSignal(RtSetpointSignal): @retry_once @threadlocked def _socket_set(self, val: float) -> None: """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. Args: val (float): Target value / setpoint value Raises: RtError: Raised if interferometer feedback is disabled. """ if not self.parent.controller.feedback_is_running(): print( "The interferometer feedback is not running. Either it is turned off or and" " interferometer error occured." ) raise RtError( "The interferometer feedback is not running. Either it is turned off or and" " interferometer error occured." ) tracker_status = self.parent.controller.laser_tracker_check_signalstrength() if tracker_status == "toolow": print("The interferometer signal is too low for movements. Realignment required.") raise RtError( "The interferometer signal is too low for movements. Realignment required." ) self.set_with_feedback_disabled(val) def set_with_feedback_disabled(self, val): target_val = val * self.parent.sign self.setpoint = target_val self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
[docs] class RtFlomniFeedbackRunning(RtSignalRO): @threadlocked def _socket_get(self): return int(self.parent.controller.feedback_is_running())
[docs] class RtFlomniMotor(Device, PositionerBase): USER_ACCESS = ["controller"] readback = Cpt(RtFlomniReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(RtFlomniSetpointSignal, signal_name="setpoint") high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") SUB_READBACK = "readback" SUB_CONNECTION_CHANGE = "connection_change" _default_sub = SUB_READBACK connectionTimeout = 20 def __init__( self, axis_Id, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc2844.psi.ch", port=2222, sign=1, socket_cls=SocketIO, device_manager=None, limits=None, **kwargs, ): self.axis_Id = axis_Id self.sign = sign self.controller = RtFlomniController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.device_manager = device_manager self.tolerance = kwargs.pop("tolerance", 0.5) super().__init__( prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs, ) self.readback.name = self.name self.controller.subscribe( self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE ) self._update_connection_state() self._stopped = False # self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE) if limits is not None: assert len(limits) == 2 self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1])
[docs] def wait_for_connection(self, timeout: float = 30.0) -> bool: self.controller.on(timeout=timeout) self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self): """ The setpoint is only stored locally. After a restart, we need to update it to match the current readback value. """ self.user_setpoint.setpoint = self.readback.get()
[docs] def destroy(self): """Make sure to turn off the controller socket on destroy.""" self.controller.off(update_config=False) return super().destroy()
@property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @property def low_limit(self): return self.limits[0] @property def high_limit(self): return self.limits[1]
[docs] def check_value(self, value, **kwargs): """Check that the position is within the soft limits""" low_limit, high_limit = self.limits if low_limit < high_limit and not (low_limit <= value <= high_limit): raise LimitError(f"position={value} not within limits {self.limits}")
def _update_connection_state(self, **kwargs): for walk in self.walk_signals(): # pylint: disable=protected-access walk.item._metadata["connected"] = self.controller.connected def _forward_readback(self, **kwargs): kwargs.pop("sub_type") self._run_subs(sub_type="readback", **kwargs)
[docs] @raise_if_disconnected def move(self, position, wait=True, **kwargs): """Move to a specified position, optionally waiting for motion to complete. Parameters ---------- position Position to move to moved_cb : callable Call this callback when movement has finished. This callback must accept one keyword argument: 'obj' which will be set to this positioner instance. timeout : float, optional Maximum time to wait for the motion. If None, the default timeout for this positioner is used. Returns ------- status : MoveStatus Raises ------ TimeoutError When motion takes longer than `timeout` ValueError On invalid positions RuntimeError If motion fails other than timing out """ self._started_moving = False timeout = kwargs.pop("timeout", 100) status = super().move(position, timeout=timeout, **kwargs) self.user_setpoint.put(position, wait=False) def move_and_finish(): while not self.controller.slew_rate_limiters_on_target() and not self._stopped: print("motor is moving") val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.01) print("Move finished") self._done_moving(success=(not self._stopped)) self._stopped = False threading.Thread(target=move_and_finish, daemon=True).start() try: if wait: status_wait(status) except KeyboardInterrupt: self.stop() raise return status
@property def axis_Id(self): return self._axis_Id_alpha @axis_Id.setter def axis_Id(self, val): if isinstance(val, str): if len(val) != 1: raise ValueError("Only single-character axis_Ids are supported.") self._axis_Id_alpha = val self._axis_Id_numeric = ord(val.lower()) - 97 else: raise TypeError(f"Expected value of type str but received {type(val)}") @property def axis_Id_numeric(self): return self._axis_Id_numeric @axis_Id_numeric.setter def axis_Id_numeric(self, val): if isinstance(val, int): if val > 26: raise ValueError("Numeric value exceeds supported range.") self._axis_Id_alpha = val self._axis_Id_numeric = (chr(val + 97)).capitalize() else: raise TypeError(f"Expected value of type int but received {type(val)}") def kickoff(self, metadata, **kwargs) -> None: self.controller.kickoff(metadata) @property def egu(self): """The engineering units (EGU) for positions""" return "um"
[docs] def stop(self, *, success=False): self.controller.stop_all_axes() self._stopped = True return super().stop(success=success)
[docs] class RtFlomniFlyer(Device): USER_ACCESS = ["controller"] data = Cpt( AsyncMultiSignal, name="data", signals=[ "target_x", "average_x_st_fzp", "stdev_x_st_fzp", "target_y", "average_y_st_fzp", "stdev_y_st_fzp", "average_rotz", "stdev_rotz", "average_stdeviations_x_st_fzp", "average_stdeviations_y_st_fzp", ], ndim=1, async_update={"type": "add", "max_shape": [None]}, max_size=1000, ) progress = Cpt( ProgressSignal, doc="ProgressSignal indicating the progress of the device during a scan." ) def __init__( self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc2844.psi.ch", port=2222, socket_cls=SocketIO, device_manager=None, **kwargs, ): super().__init__(prefix=prefix, name=name, parent=parent, **kwargs) self.shutdown_event = threading.Event() self.controller = RtFlomniController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 self.readout_thread = None self.scan_done_event = threading.Event() self.scan_done_event.set()
[docs] def read_positions_from_sampler(self, status: DeviceStatus): """ Read the positions from the sampler and update the data signal. This function runs in a separate thread and continuously checks the scan status. Args: status (DeviceStatus): The status object to update when the readout is complete. """ read_counter = 0 self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 mode, number_of_positions_planned, current_position_in_scan = ( self.controller.get_scan_status() ) # while scan is running while mode > 0 and not self.shutdown_event.wait(0.01): # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") mode, number_of_positions_planned, current_position_in_scan = ( self.controller.get_scan_status() ) if current_position_in_scan > 5: while current_position_in_scan > read_counter + 1: return_table = ( self.controller.socket_put_and_receive(f"r{read_counter}") ).split(",") logger.info(f"Read {read_counter} out of {number_of_positions_planned}") self.progress.put( value=read_counter, max_value=number_of_positions_planned, done=False ) read_counter = read_counter + 1 signals = self._get_signals_from_table(return_table) self.data.set(signals) if self.shutdown_event.wait(0.05): logger.info("Shutdown event set, stopping readout.") # if we are here, the shutdown_event is set. We can exit the readout loop. status.set_finished() return # read the last samples even though scan is finished already while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",") logger.info(f"Read {read_counter} out of {number_of_positions_planned}") self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False) read_counter = read_counter + 1 signals = self._get_signals_from_table(return_table) self.data.set(signals) # NOTE: No need to set the status to failed if the shutdown_event is set. # The stop() method will take care of that. status.set_finished() self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True) logger.info( "Flomni statistics: Average of all standard deviations: x" f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}" )
def _get_signals_from_table(self, return_table) -> dict: self.average_stdeviations_x_st_fzp += float(return_table[4]) self.average_stdeviations_y_st_fzp += float(return_table[7]) signals = { "target_x": {"value": float(return_table[2])}, "average_x_st_fzp": {"value": float(return_table[3])}, "stdev_x_st_fzp": {"value": float(return_table[4])}, "target_y": {"value": float(return_table[5])}, "average_y_st_fzp": {"value": float(return_table[6])}, "stdev_y_st_fzp": {"value": float(return_table[7])}, "average_rotz": {"value": float(return_table[8])}, "stdev_rotz": {"value": float(return_table[9])}, "average_stdeviations_x_st_fzp": { "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) }, "average_stdeviations_y_st_fzp": { "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) }, } return signals
[docs] def stage(self): self.shutdown_event.clear() self.scan_done_event.set() return super().stage()
def start_readout(self, status: DeviceStatus): self.readout_thread = threading.Thread( target=self.read_positions_from_sampler, args=(status,) ) self.readout_thread.start() def kickoff(self) -> DeviceStatus: self.shutdown_event.clear() self.scan_done_event.clear() while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): ... self.controller.start_scan() self.shutdown_event.wait(0.1) status = DeviceStatus(self) status.set_finished() return status
[docs] def complete(self) -> DeviceStatus: """Wait until the flyer is done.""" if self.scan_done_event.is_set(): # if the scan_done_event is already set, we can return a finished status immediately status = DeviceStatus(self) status.set_finished() return status status = DeviceStatus(self) self.start_readout(status) status.add_callback(lambda *args, **kwargs: self.scan_done_event.set()) return status
[docs] def stop(self, *, success=False): self.shutdown_event.set() self.scan_done_event.set() if self.readout_thread is not None: self.readout_thread.join() return super().stop(success=success)
if __name__ == "__main__": rtcontroller = RtFlomniController( socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None ) rtcontroller.on() rtcontroller.laser_tracker_on()