Source code for csaxs_bec.scans.flomni_fermat_scan

"""
SCAN PLUGINS

All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:

- self.initialize                        # initialize the class if needed
- self.read_scan_motors                  # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions                 # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
    - self._calculate_positions          # calculate the positions
    - self._set_positions_offset         # apply the previously retrieved scan position shift (if needed)
    - self._check_limits                 # tests to ensure the limits won't be reached
- self.open_scan                         # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage                             # stage all devices for the upcoming acquisiton
- self.run_baseline_readings             # read all devices to get a baseline for the upcoming scan
- self.scan_core                         # run a loop over all position
    - self._at_each_point(ind, pos)      # called at each position with the current index and the target positions as arguments
- self.finalize                          # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage                           # unstage all devices that have been staged before
- self.cleanup                           # send a close scan message and perform additional cleanups if needed
"""

import time

import numpy as np
from bec_lib import bec_logger, messages
from bec_lib.alarm_handler import Alarms
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import AsyncFlyScanBase

from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import TRIGGERSOURCE

logger = bec_logger.logger


[docs] class FlomniFermatScan(AsyncFlyScanBase): scan_name = "flomni_fermat_scan" scan_type = "fly" required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"] arg_input = {} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} def __init__( self, fovx: float, fovy: float, cenx: float, ceny: float, exp_time: float, step: float, zshift: float, angle: float = None, corridor_size: float = 3, parameter: dict = None, frames_per_trigger: int = 1, **kwargs, ): """ A flomni scan following Fermat's spiral. Args: fovx(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 200 um fovy(float) [um]: Fov in the piezo plane (i.e. piezo range). Max 100 um cenx(float) [um]: center position in x. ceny(float) [um]: center position in y. exp_time(float) [s]: exposure time per burst frame frames_per_trigger(int) : Number of burst frames per point step(float) [um]: stepsize zshift(float) [um]: shift in z angle(float) [deg]: rotation angle (will rotate first) corridor_size(float) [um]: corridor size for the corridor optimization. Default 3 um Returns: Examples: >>> scans.flomni_fermat_scan(fovx=20, fovy=25, cenx=0.02, ceny=0, zshift=0, angle=0, step=0.5, exp_time=0.01, frames_per_trigger=1) """ super().__init__( parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs ) self.show_live_table = False self.axis = [] self.fovx = fovx self.fovy = fovy self.cenx = cenx self.ceny = ceny self.step = step self.zshift = zshift self.angle = angle self.optim_trajectory = "corridor" self.optim_trajectory_corridor = corridor_size if self.fovy > 100: raise ScanAbortion("The FOV in y must be smaller than 100 um.") if self.fovx > 200: raise ScanAbortion("The FOV in x must be smaller than 200 um.") if self.zshift > 100: logger.warning("The zshift is larger than 100 um. It will be limited to 100 um.") self.zshift = 100 if self.zshift < -100: logger.warning("The zshift is smaller than -100 um. It will be limited to -100 um.") self.zshift = -100 self.flomni_rotation_status = None
[docs] def scan_report_instructions(self): """Scan report instructions for the progress bar""" yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
@property def monitor_sync(self) -> str: return "rt_positions" def initialize(self): self.scan_motors = [] self.update_readout_priority() def _optimize_trajectory(self): self.positions = self.optimize_corridor( self.positions, corridor_size=self.optim_trajectory_corridor )
[docs] def reverse_trajectory(self): """ Reverse the trajectory. Every other scan should be reversed to shorten the movement time. In order to keep the last state, even if the server is restarted, the state is stored in a global variable in redis. """ msg = self.connector.get(MessageEndpoints.global_vars("reverse_flomni_trajectory")) if msg: val = msg.content.get("value", False) else: val = False self.connector.set( MessageEndpoints.global_vars("reverse_flomni_trajectory"), messages.VariableMessage(value=(not val)), ) return val
[docs] def prepare_positions(self): self._calculate_positions() self._optimize_trajectory() flip_axes = self.reverse_trajectory() if flip_axes: self.positions = np.flipud(self.positions) self.num_pos = len(self.positions) self._check_min_positions()
def _check_min_positions(self): if self.num_pos < 20: raise ScanAbortion( f"The number of positions must exceed 20. Currently: {self.num_pos}." ) def _prepare_setup(self): yield from self.stubs.send_rpc_and_wait("rtx", "controller.clear_trajectory_generator") yield from self.flomni_rotation(self.angle) yield from self.stubs.send_rpc_and_wait("rty", "set", self.positions[0][1]) def _prepare_setup_part2(self): # Prepare DDG1 to use yield from self.stubs.send_rpc_and_wait( "ddg1", "set_trigger", TRIGGERSOURCE.EXT_RISING_EDGE.value ) if self.flomni_rotation_status: self.flomni_rotation_status.wait() # rtx_status = yield from self.stubs.set(device="rtx", value=self.positions[0][0], wait=False) rtx_status = yield from self.stubs.set(device="rtx", value=self.cenx, wait=False) rtz_status = yield from self.stubs.set(device="rtz", value=self.positions[0][2], wait=False) yield from self.stubs.send_rpc_and_wait("rtx", "controller.laser_tracker_on") rtx_status.wait() rtz_status.wait() # status = yield from self.stubs.send_rpc("rtx", "move", self.cenx) # status.wait() yield from self._transfer_positions_to_flomni() tracker_signal_status = yield from self.stubs.send_rpc_and_wait( "rtx", "controller.laser_tracker_check_signalstrength" ) yield from self.stubs.send_rpc_and_wait( "rtx", "controller.move_samx_to_scan_region", self.cenx ) # self.device_manager.connector.send_client_info(tracker_signal_status) if tracker_signal_status == "low": error_info = messages.ErrorInfo( error_message="Signal strength of the laser tracker is low, but sufficient to continue. Realignment recommended!", compact_error_message="Low signal strength of the laser tracker. Realignment recommended!", exception_type="LaserTrackerSignalStrengthLow", device="rtx", ) self.device_manager.connector.raise_alarm(severity=Alarms.WARNING, info=error_info) elif tracker_signal_status == "toolow": raise ScanAbortion( "Signal strength of the laser tracker is too low for scanning. Realignment required!" ) def flomni_rotation(self, angle): # get last setpoint (cannot be based on pos get because they will deviate slightly) fsamroy_current_setpoint = yield from self.stubs.send_rpc_and_wait( "fsamroy", "user_setpoint.get" ) if angle == fsamroy_current_setpoint: logger.info("No rotation required") else: logger.info("Rotating to requested angle") yield from self.stubs.scan_report_instruction( { "readback": { "RID": self.metadata["RID"], "devices": ["fsamroy"], "start": [fsamroy_current_setpoint], "end": [angle], } } ) self.flomni_rotation_status = yield from self.stubs.set( device="fsamroy", value=angle, wait=False ) def _transfer_positions_to_flomni(self): yield from self.stubs.send_rpc_and_wait( "rtx", "controller.add_pos_to_scan", self.positions.tolist() ) def _calculate_positions(self): self.positions = self.get_flomni_fermat_spiral_pos( -np.abs(self.fovx / 2), np.abs(self.fovx / 2), -np.abs(self.fovy / 2), np.abs(self.fovy / 2), step=self.step, spiral_type=0, center=False, )
[docs] def get_flomni_fermat_spiral_pos( self, m1_start, m1_stop, m2_start, m2_stop, step=1, spiral_type=0, center=False ): """ Calculate positions for a Fermat spiral scan. Args: m1_start(float): start position in m1 m1_stop(float): stop position in m1 m2_start(float): start position in m2 m2_stop(float): stop position in m2 step(float): stepsize spiral_type(int): 0 for traditional Fermat spiral center(bool): whether to include the center position Returns: positions(array): positions """ positions = [] phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi start = int(not center) length_axis1 = np.abs(m1_stop - m1_start) length_axis2 = np.abs(m2_stop - m2_start) n_max = int(length_axis1 * length_axis2 * 3.2 / step / step) z_pos = self.zshift for ii in range(start, n_max): radius = step * 0.57 * np.sqrt(ii) # FOV is restructed below at check pos in range if abs(radius * np.sin(ii * phi)) > length_axis1 / 2: continue if abs(radius * np.cos(ii * phi)) > length_axis2 / 2: continue x = radius * np.sin(ii * phi) y = radius * np.cos(ii * phi) positions.append([x + self.cenx, y + self.ceny, z_pos]) left_lower_corner = [ min(m1_start, m1_stop) + self.cenx, min(m2_start, m2_stop) + self.ceny, z_pos, ] right_upper_corner = [ max(m1_start, m1_stop) + self.cenx, max(m2_start, m2_stop) + self.ceny, z_pos, ] positions.append(left_lower_corner) positions.append(right_upper_corner) return np.array(positions)
[docs] def scan_core(self): # send off the flyer yield from self.stubs.kickoff(device="rt_positions") # start the readout loop of the flyer status = yield from self.stubs.complete(device="rt_positions", wait=False) # read the monitors until the flyer is done while not status.done: yield from self.stubs.read(group="monitored", point_id=self.point_id) self.point_id += 1 time.sleep(1) logger.debug("reading monitors")
[docs] def move_to_start(self): """return to the start position""" # in flomni, we need to move to the start position of the next scan, which is the end position of the current scan # this method is called in finalize and overwrites the default move_to_start() if isinstance(self.positions, np.ndarray) and len(self.positions[-1]) == 3: # yield from self.stubs.set(device=["rtx", "rty", "rtz"], value=self.positions[-1]) # in x we move to cenx, then we avoid jumps in centering routine value = self.positions[-1] value[0] = self.cenx yield from self.stubs.set(device=["rtx", "rty", "rtz"], value=value) return logger.warning("No positions found to return to start")
[docs] def cleanup(self): yield from self.stubs.send_rpc_and_wait( "ddg1", "set_trigger", TRIGGERSOURCE.SINGLE_SHOT.value ) yield from super().cleanup()
[docs] def run(self): self.initialize() yield from self.read_scan_motors() self.prepare_positions() yield from self._prepare_setup() yield from self.scan_report_instructions() yield from self.open_scan() yield from self.stage() yield from self.run_baseline_reading() yield from self._prepare_setup_part2() yield from self.pre_scan() yield from self.scan_core() yield from self.finalize() yield from self.unstage() yield from self.cleanup()