Source code for csaxs_bec.bec_ipython_client.plugins.omny.omny_rt

import time
import numpy as np
import sys
import termios
import tty
import fcntl
import os


from rich import box
from rich.console import Console
from rich.table import Table

from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose


[docs] class OMNY_rt_clientError(Exception): pass
[docs] class OMNY_rt_client: def __init__(self): self.mirror_channel = -1 self.mirror_amplitutde_increase = 0 self.mirror_parameters = {} for j in range(1, 9): self.mirror_parameters[j] = dev.rtx.controller.get_mirror_parameters(j) @staticmethod def _get_user_param_safe(device, var): param = dev[device].user_parameter if not param or param.get(var) is None: raise OMNY_rt_clientError( f"Device {device} has no user parameter definition for {var}." ) return param.get(var) def _omny_interferometer_openloop_steps(self, channel, steps, amplitude): dev.rtx.controller._omny_interferometer_openloop_steps(channel, steps, amplitude) def interferometer_tweaking(self): self._tweak_interferometer() def _tweak_interferometer(self): self.mirror_channel = -1 # Save the current terminal settings fd = sys.stdin.fileno() old_term = termios.tcgetattr(fd) print("Ready to tweak the interferometer. Press q to quit.") print("The arrows adjust directions.") print("Numbers select the mirror aligner.") try: # Set the terminal to raw mode to capture single key presses tty.setraw(fd) # Set stdin to non-blocking mode old_flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK) opt_mirrorname = "none" max = 0 while True: try: # Read single character input key = sys.stdin.read(1) if key == "q": self.mirror_amplitutde_increase = 0 self.mirror_channel = -1 print("\n\rExiting tweak mode\r") break elif key == "\x1b": # Escape sequences for arrow keys next1, next2 = sys.stdin.read(2) if next1 == "[": printit = True if next2 == "A": # print("up") if self.mirror_channel != -1: self._omny_interferometer_openloop_steps( 4, -self.mirror_parameters[self.mirror_channel][ "opt_steps2_neg" ], self.mirror_parameters[self.mirror_channel][ "opt_amplitude2_neg" ] + self.mirror_amplitutde_increase, ) elif next2 == "B": # print(" down") if self.mirror_channel != -1: self._omny_interferometer_openloop_steps( 4, self.mirror_parameters[self.mirror_channel][ "opt_steps2_pos" ], self.mirror_parameters[self.mirror_channel][ "opt_amplitude2_pos" ] + self.mirror_amplitutde_increase, ) elif next2 == "C": # print("right") if self.mirror_channel != -1: self._omny_interferometer_openloop_steps( 3, -self.mirror_parameters[self.mirror_channel][ "opt_steps1_neg" ], self.mirror_parameters[self.mirror_channel][ "opt_amplitude1_neg" ] + self.mirror_amplitutde_increase, ) elif next2 == "D": # print("left") if self.mirror_channel != -1: self._omny_interferometer_openloop_steps( 3, self.mirror_parameters[self.mirror_channel][ "opt_steps1_pos" ], self.mirror_parameters[self.mirror_channel][ "opt_amplitude1_pos" ] + self.mirror_amplitutde_increase, ) elif key.isdigit() and 1 <= int(key) <= 8: self.mirror_channel = int(key) opt_mirrorname = self.mirror_parameters[self.mirror_channel][ "opt_mirrorname" ] autostop = self.mirror_parameters[self.mirror_channel]["opt_signal_stop"] averaging_time = self.mirror_parameters[self.mirror_channel][ "opt_averaging_time" ] print( f"\nSelected mirror channel {self.mirror_channel}: {opt_mirrorname}. Autostop {autostop}. Signal averaging time: {averaging_time}\r" ) if int(key) == 6: dev.rtx.controller.laser_tracker_on() dev.rtx.controller._omny_interferometer_switch_channel(self.mirror_channel) max = 0 printit = True elif key == "+": print("\nIncreasing voltage amplitudes by 100.\r") self.mirror_amplitutde_increase += 100 elif key == "-": print("\nDecreasing voltage amplitudes by 100.\r") self.mirror_amplitutde_increase -= 100 elif key == "a": if self.mirror_channel != -1: dev.rtx.controller._omny_interferometer_optimize( mirror_channel=self.mirror_channel, channel=3 ) dev.rtx.controller._omny_interferometer_optimize( mirror_channel=self.mirror_channel, channel=4 ) dev.rtx.controller._omny_interferometer_optimize( mirror_channel=self.mirror_channel, channel=3 ) dev.rtx.controller._omny_interferometer_optimize( mirror_channel=self.mirror_channel, channel=4 ) if self.mirror_channel != -1 and printit: printit = False signal = dev.rtx.controller._omny_interferometer_get_signalsample( self.mirror_parameters[self.mirror_channel]["opt_signalchannel"], self.mirror_parameters[self.mirror_channel]["opt_averaging_time"], ) if signal > max: max = signal info_str = f"Channel {self.mirror_channel}, {opt_mirrorname}, Current signal: {signal:.0f}" filling = " " * (50 - len(info_str)) # Calculate the number of filled and unfilled segments length = 30 percentage = signal / max filled_length = int(length * percentage) unfilled_length = length - filled_length bar = "#" * filled_length + "-" * unfilled_length print(info_str + filling + "0 " + bar + f" {max:.0f} (q)uit\r", end="") except IOError: # No input available, keep looping pass # Sleep for a short period to avoid high CPU usage time.sleep(0.02) finally: # Restore the terminal to its original state termios.tcsetattr(fd, termios.TCSADRAIN, old_term) fcntl.fcntl(fd, fcntl.F_SETFL, old_flags) dev.rtx.controller._omny_interferometer_switch_alloff() self.mirror_channel = -1 self.mirror_amplitutde_increase = 0 dev.rtx.controller.show_signal_strength_interferometer() def show_signal_strength_interferometer(self): dev.rtx.controller.show_signal_strength_interferometer() def omny_interferometer_align_incoupling_angle(self): dev.rtx.controller.omny_interferometer_align_incoupling_angle() def interferometer_tweak_otrack(self): self.OMNYTools.tweak_cursor( dev.otrackz, 0.1, dev.otracky, 0.1, special_command=dev.rtx.controller.laser_tracker_print_intensity_for_otrack_tweaking, ) def feedback_enable_with_reset(self): dev.rtx.controller.feedback_enable_with_reset() def feedback_disable(self): dev.rtx.controller.feedback_disable() def feedback_status(self): if dev.rtx.controller.feedback_is_running(): print("Feedback is running.") else: print("Feedback is NOT running.") def laser_tracker_on(self): dev.rtx.controller.laser_tracker_on() def laser_tracker_off(self): dev.rtx.controller.laser_tracker_off() def laser_tracker_show_all(self): dev.rtx.controller.laser_tracker_show_all() def omny_interferometer_align_tracking(self): dev.rtx.controller.omny_interferometer_align_tracking() def laser_tracker_check_and_wait_for_signalstrength(self): dev.rtx.controller.laser_tracker_check_and_wait_for_signalstrength()