Source code for csaxs_bec.devices.omny.rt.rt_lamni_ophyd

import functools
import threading
import time

import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, DeviceStatus, PositionerBase, Signal
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.bec_signals import AsyncMultiSignal, ProgressSignal
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable

from csaxs_bec.devices.omny.rt.rt_ophyd import RtCommunicationError, RtError

logger = bec_logger.logger


[docs] class RtLamniCommunicationError(Exception): pass
[docs] class RtLamniError(Exception): pass
[docs] class BECConfigError(Exception): pass
[docs] def retry_once(fcn): """Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty.""" @functools.wraps(fcn) def wrapper(self, *args, **kwargs): try: val = fcn(self, *args, **kwargs) except (RtLamniCommunicationError, RtLamniError): val = fcn(self, *args, **kwargs) return val return wrapper
[docs] class RtLamniController(Controller): """ RT-Lamni controller class for all rt devices. """ _axes_per_controller = 3 USER_ACCESS = [ "socket_put_and_receive", "socket_put", "set_rotation_angle", "feedback_disable", "feedback_enable_without_reset", "feedback_disable_and_even_reset_lamni_angle_interferometer", "feedback_enable_with_reset", "add_pos_to_scan", "clear_trajectory_generator", "_set_axis_velocity", "_set_axis_velocity_maximum_speed", "_position_sampling_single_read", "_position_sampling_single_reset_and_start_sampling", "show_signal_strength_interferometer", "show_analog_signals", "show_feedback_status", ] def __init__( self, *, name="RtLamniController", socket_cls=None, socket_host=None, socket_port=None, device_manager=None, attr_name="", parent=None, labels=None, kind=None, ): super().__init__( name=name, socket_cls=socket_cls, socket_host=socket_host, socket_port=socket_port, device_manager=device_manager, attr_name=attr_name, parent=parent, labels=labels, kind=kind, ) self._min_scan_buffer_reached = False @threadlocked def feedback_disable(self): self.socket_put("J0") logger.info("LamNI Feedback disabled.") self.set_device_read_write("lsamx", True) self.set_device_read_write("lsamy", True) self.set_device_read_write("loptx", True) self.set_device_read_write("lopty", True) self.set_device_read_write("loptz", True) def is_axis_moving(self, axis_Id) -> bool: # this checks that axis is on target axis_is_on_target = bool(float(self.socket_put_and_receive("o"))) return not axis_is_on_target @threadlocked def stop_all_axes(self): self.socket_put("sc") @threadlocked def set_rotation_angle(self, val: float): self.socket_put(f"a{(val-300+30.538)/180*np.pi}") @threadlocked def _set_axis_velocity(self, um_per_s): self.socket_put(f"V{um_per_s}") @threadlocked def _set_axis_velocity_maximum_speed(self): self.socket_put(f"V0") # for developement of soft continuous scanning @threadlocked def _position_sampling_single_reset_and_start_sampling(self): self.socket_put(f"Ss") @threadlocked def _position_sampling_single_read(self): (number_of_samples, sum0, sum0_2, sum1, sum1_2, sum2, sum2_2) = self.socket_put_and_receive( f"Sr" ).split(",") avg_x = float(sum1) / int(number_of_samples) avg_y = float(sum0) / int(number_of_samples) stdev_x = np.sqrt( float(sum1_2) / int(number_of_samples) - np.power(float(sum1) / int(number_of_samples), 2) ) stdev_y = np.sqrt( float(sum0_2) / int(number_of_samples) - np.power(float(sum0) / int(number_of_samples), 2) ) return (avg_x, avg_y, stdev_x, stdev_y) @threadlocked def feedback_enable_without_reset(self): # read current interferometer position return_table = (self.socket_put_and_receive(f"J4")).split(",") x_curr = float(return_table[2]) y_curr = float(return_table[1]) # set these as closed loop target position self.socket_put(f"pa0,{x_curr:.4f}") self.socket_put(f"pa1,{y_curr:.4f}") self.device_manager.devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr) self.device_manager.devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr) self.socket_put("J5") logger.info("LamNI Feedback enabled (without reset).") self.set_device_read_write("lsamx", False) self.set_device_read_write("lsamy", False) self.set_device_read_write("loptx", False) self.set_device_read_write("lopty", False) self.set_device_read_write("loptz", False) @threadlocked def feedback_disable_and_even_reset_lamni_angle_interferometer(self): self.socket_put("J6") logger.info("LamNI Feedback disabled including the angular interferometer.") self.set_device_read_write("lsamx", True) self.set_device_read_write("lsamy", True) self.set_device_read_write("loptx", True) self.set_device_read_write("lopty", True) self.set_device_read_write("loptz", True) @threadlocked def clear_trajectory_generator(self): self.socket_put("sc") logger.info("LamNI scan stopped and deleted, moving to start position") def add_pos_to_scan(self, positions) -> None: def send_positions(parent, positions): parent._min_scan_buffer_reached = False for pos_index, pos in enumerate(positions): cmd = f"s{pos[0]:.05f},{pos[1]:.05f},0" parent.socket_put_and_receive(cmd) if pos_index > 100: parent._min_scan_buffer_reached = True parent._min_scan_buffer_reached = True threading.Thread(target=send_positions, args=(self, positions), daemon=True).start() @retry_once @threadlocked def get_scan_status(self): return_table = (self.socket_put_and_receive(f"sr")).split(",") if len(return_table) != 3: raise RtCommunicationError( f"Expected to receive 3 return values. Instead received {return_table}" ) mode = int(return_table[0]) # mode 0: direct positioning # mode 1: running internal timer (not tested/used anymore) # mode 2: rt point scan running # mode 3: rt point scan starting # mode 5/6: rt continuous scanning (not available in LamNI) number_of_positions_planned = int(return_table[1]) current_position_in_scan = int(return_table[2]) return (mode, number_of_positions_planned, current_position_in_scan) @threadlocked def start_scan(self): # interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) # if interferometer_feedback_not_running == 1: if not self.feedback_is_running(): logger.error( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) raise RtError( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) # here exception (mode, number_of_positions_planned, current_position_in_scan) = self.get_scan_status() if number_of_positions_planned == 0: logger.error("Cannot start scan because no target positions are planned.") raise RtError("Cannot start scan because no target positions are planned.") # hier exception # start a point-by-point scan (for cont scan in flomni it would be "sa") self.socket_put_and_receive("sd") def feedback_is_running(self) -> bool: status = int(float((self.socket_put_and_receive("J2")).split(",")[0])) return status == 0 # 0 means running, 1 means error/disabled def show_feedback_status(self): if self.feedback_is_running(): print("Loop is running, no error on interferometer.") else: print("Loop is not running, either it is turned off or an interferometer error occurred.") def show_analog_signals(self) -> dict: self.socket_put("As") # start sampling time.sleep(0.01) return_table = (self.socket_put_and_receive("Ar")).split(",") number_of_samples = int(float(return_table[0])) signals = { "number_of_samples": number_of_samples, "piezo_0": float(return_table[1]), "piezo_1": float(return_table[2]), "cap_0": float(return_table[3]), "cap_1": float(return_table[4]), "cap_2": float(return_table[5]), "cap_3": float(return_table[6]), "cap_4": float(return_table[7]), } t = PrettyTable() t.title = f"LamNI Analog Signals ({number_of_samples} samples)" t.field_names = ["Signal", "Value"] for key, val in signals.items(): if key != "number_of_samples": t.add_row([key, f"{val:.4f}"]) print(t) return def feedback_status_angle_lamni(self) -> bool: return_table = (self.socket_put_and_receive("J7")).split(",") logger.debug( f"LamNI angle interferomter status {bool(return_table[0])}, position {float(return_table[1])}, signal {float(return_table[2])}" ) return bool(return_table[0]) def show_signal_strength_interferometer(self): # trigger SSI averaging before reading self.socket_put("J3") time.sleep(0.05) return_table = (self.socket_put_and_receive("J2")).split(",") ssi_0 = float(return_table[1]) ssi_1 = float(return_table[2]) return_table_angle = (self.socket_put_and_receive("J7")).split(",") angle_running = bool(int(float(return_table_angle[0]))) angle_position = float(return_table_angle[1]) angle_signal = float(return_table_angle[2]) t = PrettyTable() t.title = "Interferometer signal strength" t.field_names = ["Axis", "Description", "Value", "Running"] t.add_row([0, "ST FZP horizontal", ssi_0, "-"]) t.add_row([1, "ST FZP vertical", ssi_1, "-"]) t.add_row([2, "Angle interferometer", angle_signal, angle_running]) print(t) if angle_running: print(f"Angle interferometer position: {angle_position:.4f} um") else: print("Warning: angle interferometer is not running.") def show_interferometer_positions(self) -> dict: return_table = (self.socket_put_and_receive("J4")).split(",") loop_status = bool(int(float(return_table[0]))) pos_y = float(return_table[1]) pos_x = float(return_table[2]) t = PrettyTable() t.title = "LamNI Interferometer Positions" t.field_names = ["Axis", "Description", "Position (um)"] t.add_row([0, "X", f"{pos_x:.4f}"]) t.add_row([1, "Y", f"{pos_y:.4f}"]) print(t) print(f"Feedback loop running: {loop_status}") return {"x": pos_x, "y": pos_y, "loop_running": loop_status} def feedback_enable_with_reset(self): if not self.feedback_status_angle_lamni(): self.feedback_disable_and_even_reset_lamni_angle_interferometer() logger.info("LamNI resetting interferometer inclusive angular interferomter.") else: self.feedback_disable() logger.info( "LamNI resetting interferomter except angular interferometer which is already running." ) # set these as closed loop target position self.socket_put("pa0,0") self.get_axis_by_name("rtx").user_setpoint.setpoint = 0 self.socket_put("pa1,0") self.get_axis_by_name("rty").user_setpoint.setpoint = 0 self.socket_put( "pa2,0" ) # we set all three outputs of the traj. gen. although in LamNI case only 0,1 are used self.clear_trajectory_generator() self.device_manager.devices.lsamrot.obj.move(0, wait=True) galil_controller_rt_status = ( self.device_manager.devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled() ) if galil_controller_rt_status == 0: logger.error( "Cannot enable feedback. The small rotation air is on and/or orchestra disabled by the motor controller." ) raise RtError( "Cannot enable feedback. The small rotation air is on and/or orchestra disabled by the motor controller." ) time.sleep(0.03) lsamx_user_params = self.device_manager.devices.lsamx.user_parameter if lsamx_user_params is None or lsamx_user_params.get("center") is None: raise RuntimeError("lsamx center is not defined") lsamy_user_params = self.device_manager.devices.lsamy.user_parameter if lsamy_user_params is None or lsamy_user_params.get("center") is None: raise RuntimeError("lsamy center is not defined") lsamx_center = lsamx_user_params.get("center") lsamy_center = lsamy_user_params.get("center") self.device_manager.devices.lsamx.obj.move(lsamx_center, wait=True) self.device_manager.devices.lsamy.obj.move(lsamy_center, wait=True) self.socket_put("J1") _waitforfeedbackctr = 0 interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0]) while interferometer_feedback_not_running == 1 and _waitforfeedbackctr < 100: time.sleep(0.01) _waitforfeedbackctr = _waitforfeedbackctr + 1 interferometer_feedback_not_running = int( (self.socket_put_and_receive("J2")).split(",")[0] ) self.set_device_read_write("lsamx", False) self.set_device_read_write("lsamy", False) self.set_device_read_write("loptx", False) self.set_device_read_write("lopty", False) self.set_device_read_write("loptz", False) if interferometer_feedback_not_running == 1: logger.error( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) raise RtError( "Cannot start scan because feedback loop is not running or there is an interferometer error." ) time.sleep(0.01)
# ptychography_alignment_done = 0
[docs] class RtLamniSignalBase(SocketSignal): def __init__(self, signal_name, **kwargs): self.signal_name = signal_name super().__init__(**kwargs) self.controller = self.parent.controller self.sock = self.parent.controller.sock
[docs] class RtLamniSignalRO(RtLamniSignalBase): def __init__(self, signal_name, **kwargs): super().__init__(signal_name, **kwargs) self._metadata["write_access"] = False def _socket_set(self, val): raise ReadOnlyError("Read-only signals cannot be set")
[docs] class RtLamniReadbackSignal(RtLamniSignalRO): @retry_once @threadlocked def _socket_get(self) -> float: """Get command for the readback signal Returns: float: Readback value after adjusting for sign and motor resolution. """ return_table = (self.controller.socket_put_and_receive(f"J4")).split(",") print(return_table) if self.parent.axis_Id_numeric == 0: readback_index = 2 elif self.parent.axis_Id_numeric == 1: readback_index = 1 else: raise RtLamniError("Currently, only two axes are supported.") print(return_table) current_pos = float(return_table[readback_index]) current_pos *= self.parent.sign return current_pos
[docs] class RtLamniSetpointSignal(RtLamniSignalBase): def __init__(self, signal_name, **kwargs): super().__init__(signal_name, **kwargs) self.setpoint = 0.0 def _socket_get(self) -> float: """Get command for receiving the setpoint / target value. The value is not pulled from the controller but instead just the last setpoint used. Returns: float: setpoint / target value """ return self.setpoint @retry_once @threadlocked def _socket_set(self, val: float) -> None: """Set a new target value / setpoint value. Before submission, the target value is adjusted for the axis' sign. Furthermore, it is ensured that all axes are referenced before a new setpoint is submitted. Args: val (float): Target value / setpoint value Raises: RtLamniError: Raised if interferometer feedback is disabled. """ interferometer_feedback_not_running = int( (self.controller.socket_put_and_receive("J2")).split(",")[0] ) if interferometer_feedback_not_running != 0: raise RtLamniError( "The interferometer feedback is not running. Either it is turned off or and interferometer error occured." ) self.set_with_feedback_disabled(val) def set_with_feedback_disabled(self, val): target_val = val * self.parent.sign self.setpoint = target_val self.controller.socket_put(f"pa{self.parent.axis_Id_numeric},{target_val:.4f}")
[docs] class RtLamniMotorIsMoving(RtLamniSignalRO): def _socket_get(self): return self.controller.is_axis_moving(self.parent.axis_Id_numeric)
[docs] def get(self): val = super().get() if val is not None: self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time()) return val
[docs] class RtLamniFeedbackRunning(RtLamniSignalRO): @threadlocked def _socket_get(self): if int((self.controller.socket_put_and_receive("J2")).split(",")[0]) == 0: return 1 else: return 0
[docs] class RtLamniMotor(Device, PositionerBase): USER_ACCESS = ["controller"] readback = Cpt(RtLamniReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(RtLamniSetpointSignal, signal_name="setpoint") motor_is_moving = Cpt(RtLamniMotorIsMoving, signal_name="motor_is_moving", kind="normal") high_limit_travel = Cpt(Signal, value=0, kind="omitted") low_limit_travel = Cpt(Signal, value=0, kind="omitted") SUB_READBACK = "readback" SUB_CONNECTION_CHANGE = "connection_change" _default_sub = SUB_READBACK def __init__( self, axis_Id, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc2680.psi.ch", port=3333, sign=1, socket_cls=SocketIO, device_manager=None, limits=None, **kwargs, ): self.axis_Id = axis_Id self.sign = sign self.controller = RtLamniController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.device_manager = device_manager self.tolerance = kwargs.pop("tolerance", 0.5) super().__init__( prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs, ) self.readback.name = self.name self.controller.subscribe( self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE ) self._update_connection_state() # self.readback.subscribe(self._forward_readback, event_type=self.readback.SUB_VALUE) if limits is not None: assert len(limits) == 2 self.low_limit_travel.put(limits[0]) self.high_limit_travel.put(limits[1])
[docs] def wait_for_connection(self, timeout: float = 30.0) -> bool: self.controller.on(timeout=timeout) self._update_setpoint_from_readback()
def _update_setpoint_from_readback(self): """ The setpoint is only stored locally. After a restart, we need to update it to match the current readback value. """ self.user_setpoint.setpoint = self.readback.get()
[docs] def destroy(self): """Make sure to turn off the controller socket on destroy.""" self.controller.off(update_config=False) return super().destroy()
@property def limits(self): return (self.low_limit_travel.get(), self.high_limit_travel.get()) @property def low_limit(self): return self.limits[0] @property def high_limit(self): return self.limits[1]
[docs] def check_value(self, value, **kwargs): """Check that the position is within the soft limits""" low_limit, high_limit = self.limits if low_limit < high_limit and not (low_limit <= value <= high_limit): raise LimitError(f"position={value} not within limits {self.limits}")
def _update_connection_state(self, **kwargs): for walk in self.walk_signals(): # pylint: disable=protected-access walk.item._metadata["connected"] = self.controller.connected def _forward_readback(self, **kwargs): kwargs.pop("sub_type") self._run_subs(sub_type="readback", **kwargs)
[docs] @raise_if_disconnected def move(self, position, wait=True, **kwargs): """Move to a specified position, optionally waiting for motion to complete. Parameters ---------- position Position to move to moved_cb : callable Call this callback when movement has finished. This callback must accept one keyword argument: 'obj' which will be set to this positioner instance. timeout : float, optional Maximum time to wait for the motion. If None, the default timeout for this positioner is used. Returns ------- status : MoveStatus Raises ------ TimeoutError When motion takes longer than `timeout` ValueError On invalid positions RuntimeError If motion fails other than timing out """ self._started_moving = False timeout = kwargs.pop("timeout", 100) status = super().move(position, timeout=timeout, **kwargs) self.user_setpoint.put(position, wait=False) def move_and_finish(): while self.motor_is_moving.get(): print("motor is moving") val = self.readback.read() self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.01) print("Move finished") self._done_moving() threading.Thread(target=move_and_finish, daemon=True).start() try: if wait: status_wait(status) except KeyboardInterrupt: self.stop() raise return status
@property def axis_Id(self): return self._axis_Id_alpha @axis_Id.setter def axis_Id(self, val): if isinstance(val, str): if len(val) != 1: raise ValueError("Only single-character axis_Ids are supported.") self._axis_Id_alpha = val self._axis_Id_numeric = ord(val.lower()) - 97 else: raise TypeError(f"Expected value of type str but received {type(val)}") @property def axis_Id_numeric(self): return self._axis_Id_numeric @axis_Id_numeric.setter def axis_Id_numeric(self, val): if isinstance(val, int): if val > 26: raise ValueError("Numeric value exceeds supported range.") self._axis_Id_alpha = val self._axis_Id_numeric = (chr(val + 97)).capitalize() else: raise TypeError(f"Expected value of type int but received {type(val)}") @property def egu(self): """The engineering units (EGU) for positions""" return "um"
[docs] def stop(self, *, success=False): self.controller.stop_all_axes() return super().stop(success=success)
[docs] class RtLamniFlyer(Device): USER_ACCESS = ["controller"] data = Cpt( AsyncMultiSignal, name="data", signals=[ "target_x", "average_x_st_fzp", "stdev_x_st_fzp", "target_y", "average_y_st_fzp", "stdev_y_st_fzp", "average_cap1", "stdev_cap1", "average_cap2", "stdev_cap2", "average_cap3", "stdev_cap3", "average_cap4", "stdev_cap4", "average_cap5", "stdev_cap5", "average_angle_interf_ST", "stdev_angle_interf_ST", "average_stdeviations_x_st_fzp", "average_stdeviations_y_st_fzp", "average_lamni_angle", ], ndim=1, async_update={"type": "add", "max_shape": [None]}, max_size=1000, ) progress = Cpt(ProgressSignal, doc="ProgressSignal indicating of the device during the scan.") def __init__( self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, host="mpc2680.psi.ch", port=3333, socket_cls=SocketIO, device_manager=None, **kwargs, ): super().__init__(prefix=prefix, name=name, parent=parent, **kwargs) self.shutdown_event = threading.Event() self.controller = RtLamniController( socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager ) self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 self.average_lamni_angle = 0 self.readout_thread = None self.scan_done_event = threading.Event() self.scan_done_event.set()
[docs] def stage(self): self.shutdown_event.clear() self.scan_done_event.set() return super().stage()
def start_readout(self, status: DeviceStatus): self.readout_thread = threading.Thread( target=self.read_positions_from_sampler, args=(status,) ) self.readout_thread.start() def kickoff(self) -> DeviceStatus: self.shutdown_event.clear() self.scan_done_event.clear() while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001): ... self.controller.start_scan() self.shutdown_event.wait(0.1) status = DeviceStatus(self) status.set_finished() return status
[docs] def complete(self) -> DeviceStatus: """Wait until the flyer is done.""" if self.scan_done_event.is_set(): # if the scan_done_event is already set, we can return a finished status immediately status = DeviceStatus(self) status.set_finished() return status status = DeviceStatus(self) self.start_readout(status) status.add_callback(lambda *args, **kwargs: self.scan_done_event.set()) return status
[docs] def stop(self, *, success=False): self.shutdown_event.set() self.scan_done_event.set() if self.readout_thread is not None: self.readout_thread.join() return super().stop(success=success)
[docs] def read_positions_from_sampler(self, status: DeviceStatus): """ Read the positions from the sampler and update the data signal. This function runs in a separate thread and continuously checks the scan status. Args: status (DeviceStatus): The status object to update when the readout is complete. """ read_counter = 0 self.average_stdeviations_x_st_fzp = 0 self.average_stdeviations_y_st_fzp = 0 self.average_lamni_angle = 0 mode, number_of_positions_planned, current_position_in_scan = ( self.controller.get_scan_status() ) # while scan is running while mode > 0 and not self.shutdown_event.wait(0.01): # logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}") mode, number_of_positions_planned, current_position_in_scan = ( self.controller.get_scan_status() ) if current_position_in_scan > 5: while current_position_in_scan > read_counter + 1: return_table = ( self.controller.socket_put_and_receive(f"r{read_counter}") ).split(",") logger.info(f"Read {read_counter} out of {number_of_positions_planned}") self.progress.put( value=read_counter, max_value=number_of_positions_planned, done=False ) read_counter = read_counter + 1 signals = self._get_signals_from_table(return_table) self.data.set(signals) if self.shutdown_event.wait(0.05): logger.info("Shutdown event set, stopping readout.") # if we are here, the shutdown_event is set. We can exit the readout loop. status.set_finished() return # read the last samples even though scan is finished already while number_of_positions_planned > read_counter and not self.shutdown_event.is_set(): return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",") logger.info(f"Read {read_counter} out of {number_of_positions_planned}") self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False) read_counter = read_counter + 1 signals = self._get_signals_from_table(return_table) self.data.set(signals) # NOTE: No need to set the status to failed if the shutdown_event is set. # The stop() method will take care of that. status.set_finished() self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True) logger.info( f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp}, y {self.average_stdeviations_y_st_fzp}, angle {self.average_lamni_angle}." )
def _get_signals_from_table(self, return_table) -> dict: self.average_stdeviations_x_st_fzp += float(return_table[5]) self.average_stdeviations_y_st_fzp += float(return_table[8]) self.average_lamni_angle += float(return_table[19]) signals = { "target_x": {"value": float(return_table[3])}, "average_x_st_fzp": {"value": float(return_table[4])}, "stdev_x_st_fzp": {"value": float(return_table[5])}, "target_y": {"value": float(return_table[6])}, "average_y_st_fzp": {"value": float(return_table[7])}, "stdev_y_st_fzp": {"value": float(return_table[8])}, "average_cap1": {"value": float(return_table[9])}, "stdev_cap1": {"value": float(return_table[10])}, "average_cap2": {"value": float(return_table[11])}, "stdev_cap2": {"value": float(return_table[12])}, "average_cap3": {"value": float(return_table[13])}, "stdev_cap3": {"value": float(return_table[14])}, "average_cap4": {"value": float(return_table[15])}, "stdev_cap4": {"value": float(return_table[16])}, "average_cap5": {"value": float(return_table[17])}, "stdev_cap5": {"value": float(return_table[18])}, "average_angle_interf_ST": {"value": float(return_table[19])}, "stdev_angle_interf_ST": {"value": float(return_table[20])}, "average_stdeviations_x_st_fzp": { "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) }, "average_stdeviations_y_st_fzp": { "value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1) }, "average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)}, } return signals
if __name__ == "__main__": # pragma: no cover mock = False if not mock: rty = RtLamniMotor("B", name="rty", host="mpc2680.psi.ch", port=3333, sign=1) rty.stage() status = rty.move(0, wait=True) status = rty.move(10, wait=True) rty.read() rty.get() rty.describe() rty.unstage() else: from ophyd_devices.utils.socket import SocketMock rtx = RtLamniMotor("A", name="rtx", host="mpc2680.psi.ch", port=3333, socket_cls=SocketMock) rty = RtLamniMotor("B", name="rty", host="mpc2680.psi.ch", port=3333, socket_cls=SocketMock) rtx.stage() # rty.stage()