Source code for csaxs_bec.scans.scans_v4.cont_grid

"""
Continuous grid scan with 2-axis. The scan requires the fast axis to properly implement base velocity as well as high velocity and high acceleration time

Scan procedure:
    - prepare_scan
    - open_scan
    - stage
    - pre_scan
    - scan_core
        - at_each_point (optionally called by scan_core)
    - post_scan
    - unstage
    - close_scan
    - on_exception (called if any exception is raised during the scan)
"""

from __future__ import annotations

import time
from copy import deepcopy
from typing import Annotated, TypedDict

import numpy as np
from bec_lib.device import DeviceBase
from bec_lib.logger import bec_logger
from bec_lib.scan_args import DefaultArgType, ScanArgument
from bec_server.scan_server.errors import ScanAbortion
from bec_server.scan_server.scans import position_generators
from bec_server.scan_server.scans.scan_base import ScanBase, ScanType
from bec_server.scan_server.scans.scan_modifier import scan_hook

from csaxs_bec.scans.scan_customization.scan_components import CsaxsBecScanComponents

logger = bec_logger.logger


[docs] class ContinuousMotorParameter(TypedDict): """TypedDict for the parameters related to the continuous motor, which are needed to properly restore the motor state after the scan.""" original_velocity: float | None original_acceleration: float | None target_velocity: float | None base_velocity: float | None acc_time: float | None premove_distance: float | None shutter_open_delay: float | None num_lines: int | None
[docs] class ContGrid(ScanBase): # Scan Type: Hardware triggered or software triggered? # If the main trigger and readout logic is done within the at_each_point method in scan_core, choose SOFTWARE_TRIGGERED. # If the main trigger and readout logic is implemented on a device that is simply kicked off in this scan, choose HARDWARE_TRIGGERED. # This primarily serves as information for devices: The device may need to react differently if a software trigger is expected # for every point. scan_type = ScanType.SOFTWARE_TRIGGERED # Scan name: This is the name of the scan, e.g. "line_scan". This is used for display purposes and to identify the scan type in user interfaces. # Choose a descriptive name that does not conflict with existing scan names. # It must be a valid Python identifier, that is, it can only contain letters, numbers, and underscores, and must not start with a number. scan_name = "cont_grid" gui_config = { "Scan Parameters": [ "fast_axis", "fast_start", "fast_end", "fast_step_size", "stepper_axis", "stepper_start", "stepper_stop", "stepper_step_size", ], "Acquisition Parameters": ["exp_time", "relative", "fast_axis_always_in_pos_dir"], } def __init__( # fmt: off self, fast_axis: Annotated[DeviceBase, ScanArgument(display_name="Fast Axis", description="Axis with continuous motion.")], fast_start: Annotated[float, ScanArgument(display_name="Fast Start", description="Start position for measurement points of the fast axis.")], fast_end: Annotated[float, ScanArgument(display_name="Fast End", description="End position for measurement points of the fast axis.")], fast_step_size: Annotated[float, ScanArgument(display_name="Fast Step Size", description="Step size for points of the continuous motion axis.")], stepper_axis: Annotated[DeviceBase, ScanArgument(display_name="Step Axis", description="Step axis of the grid scan, stepping through the lines.")], stepper_start: Annotated[float, ScanArgument(display_name="Step Start", description="Start position of the step axis.")], stepper_stop: Annotated[float, ScanArgument(display_name="Step Stop", description="End position of the step axis.")], stepper_step_size: Annotated[float, ScanArgument(display_name="Step Step Size", description="Step size of the step axis in units of the motor.")], exp_time: DefaultArgType.ExposureTime, relative: DefaultArgType.Relative = False, fast_axis_always_in_pos_dir: bool = True, **kwargs, # fmt: on ): """ Continuous grid scan with 2-axis. The scan requires the fast axis to properly implement base velocity as well as high velocity and high acceleration time Args: fast_axis (DeviceBase): Axis with continuous motion. fast_start (float): Start position for measurement points of the fast axis. fast_end (float): End position for measurement points of the fast axis. fast_step_size (float): Step size for points of the continuous motion axis. stepper_axis (DeviceBase): Step axis of the grid scan, stepping through the lines. stepper_start (float): Start position of the step axis. stepper_stop (float): End position of the step axis. stepper_step_size (float): Step size of the step axis in units of the motor. relative (bool): Whether the positions are relative to the current position. Default is False, i.e. absolute positions. exp_time (float): Exposure time in seconds fast_axis_always_in_pos_dir (bool): Whether to always scan in the positive direction, default is True. Returns: ScanReport """ super().__init__(**kwargs) self.components = CsaxsBecScanComponents(self) self._baseline_readout_status = None self.fast_axis = fast_axis self.fast_axis_always_in_pos_dir = fast_axis_always_in_pos_dir if self.fast_axis_always_in_pos_dir and (fast_end < fast_start): # Switch if needed fast_start, fast_end = fast_end, fast_start self.fast_start = fast_start self.fast_end = fast_end self.fast_step_size = fast_step_size self.stepper_axis = stepper_axis self.stepper_start = stepper_start self.stepper_stop = stepper_stop self.stepper_step_size = stepper_step_size self.relative = relative self.exp_time = exp_time self.ddg1 = self.device_manager.devices["ddg1"] self.mcs = self.device_manager.devices["mcs"] self.motors = [fast_axis, stepper_axis] self._cont_motor_params: ContinuousMotorParameter = ContinuousMotorParameter() self.update_scan_info(relative=relative, exp_time=exp_time)
[docs] @scan_hook def prepare_scan(self): """ Prepare the scan. This can include any steps that need to be executed before the scan is opened, such as preparing the positions (if not done already) or setting up the devices. """ self._check_motor_inputs() frames_per_trigger = int( np.ceil(np.abs(self.fast_end - self.fast_start) / self.fast_step_size) ) self._cont_motor_params["num_lines"] = int( np.ceil(np.abs(self.stepper_stop - self.stepper_start) / self.stepper_step_size) ) positions = position_generators.nd_grid_positions( [ (self.fast_start, self.fast_end, frames_per_trigger), (self.stepper_start, self.stepper_stop, self._cont_motor_params["num_lines"]), ], snaked=False, ) # Count only the end point of each line as a valid position, as the fast axis is continuously moving and only triggered at # the beginning of the line moving to the end point. # Get device specific parameters self._fetch_device_params() # Adjust relative positions if needed if self.relative: self.start_positions = self.components.get_start_positions(self.motors) positions += self.start_positions self.fast_start += self.start_positions[0] self.fast_end += self.start_positions[0] self.stepper_start += self.start_positions[1] self.stepper_stop += self.start_positions[1] self.positions = deepcopy(positions[(frames_per_trigger - 1) :: frames_per_trigger, :]) # Adjust premove self.fast_start -= self._cont_motor_params["premove_distance"] self.fast_end += self._cont_motor_params["premove_distance"] self.actions.set_device_readout_priority(self.motors, priority="monitored") self.update_scan_info( positions=self.positions, num_points=len(self.positions), frames_per_trigger=frames_per_trigger, computed_positions=positions, num_lines=self._cont_motor_params["num_lines"], ) self.actions.add_scan_report_instruction_device_progress(self.mcs) self._baseline_readout_status = self.actions.read_baseline_devices(wait=False) self._premove_motor_status = self.actions.set( self.motors, [self.fast_start, self.stepper_start], wait=False )
[docs] @scan_hook def open_scan(self): """ Open the scan. This step must call self.actions.open_scan() to ensure that a new scan is opened. Make sure to prepare the scan metadata before, either in prepare_scan() or in open_scan() itself and call self.update_scan_info(...) to update the scan metadata if needed. """ self.actions.open_scan()
[docs] @scan_hook def stage(self): """ Stage the devices for the upcoming scan. The stage logic is typically implemented on the device itself (i.e. by the device's stage method). However, if there are any additional steps that need to be executed before staging the devices, they can be implemented here. """ self.actions.stage_all_devices()
[docs] @scan_hook def pre_scan(self): """ Pre-scan steps to be executed before the main scan logic. This is typically the last chance to prepare the devices before the core scan logic is executed. For example, this is a good place to initialize time-criticial devices, e.g. devices that have a short timeout. The pre-scan logic is typically implemented on the device itself. """ self._premove_motor_status.wait() self.actions.pre_scan_all_devices()
[docs] @scan_hook def scan_core(self): """ Core scan logic to be executed during the scan. This is where the main scan logic should be implemented. """ # Only use every second position, at each point will use for line_index in range(self._cont_motor_params["num_lines"]): line_start = time.time() status_mcs = self.ddg1.prepare_mcs_on_trigger() status_motor = self.actions.set( self.motors, [self.fast_start, self.positions[line_index][1]] ) status_motor.wait() logger.info( f"Overhead from motor motion for line {line_index}: {(time.time() - line_start):.02f}s" ) status_mcs.wait() mcs_aquiring_status = self.mcs.get_transition_status( signal_name="acquiring", transition=[1, 0] ) logger.info( f"Overhead before calling trigger for line {line_index}: {(time.time() - line_start):.02f}s" ) self.at_each_point( motors=[self.fast_axis], positions=np.array([self.fast_end]), status_mcs=mcs_aquiring_status, ) self._restore_motor_properties()
[docs] @scan_hook def at_each_point( self, motors: list[str | DeviceBase], positions: np.ndarray, status_mcs, last_positions: np.ndarray | None = None, ): """ Logic to be executed at each point of the scan. This is where the main scan logic should be implemented, e.g. triggering the readout devices. The at_each_point logic is typically implemented on the device itself. """ self.fast_axis.velocity.set(self._cont_motor_params["target_velocity"]).wait(timeout=5) self.fast_axis.acceleration.set(self._cont_motor_params["acc_time"]).wait(timeout=5) move_status = self.actions.set(motors, positions, wait=False) time.sleep(self._cont_motor_params["acc_time"]) self.ddg1.trigger_shot.put(1) while not move_status.done: self.actions.read_monitored_devices(wait=True) try: move_status.wait(timeout=0.5) except TimeoutError: continue try: status_mcs.wait(timeout=3) except TimeoutError as exc: raise ScanAbortion( f"MCS card did not go back to DONE after receiving all triggers and an extra 3 seconds. " ) from exc
[docs] @scan_hook def post_scan(self): """ Post-scan steps to be executed after the main scan logic. """ self._restore_motor_properties() status = self.actions.complete_all_devices(wait=False) if self.relative: self.components.move_and_wait(self.motors, self.start_positions) status.wait()
[docs] @scan_hook def unstage(self): """Unstage the scan by executing post-scan steps.""" self.actions.unstage_all_devices()
[docs] @scan_hook def close_scan(self): """Close the scan.""" if self._baseline_readout_status is not None: self._baseline_readout_status.wait() self.actions.close_scan() self.actions.check_for_unchecked_statuses()
[docs] @scan_hook def on_exception(self, exception: Exception): """ Handle exceptions that occur during the scan. This is a good place to implement any cleanup logic that needs to be executed in case of an exception, such as returning the devices to a safe state or moving the motors back to their starting position. """ self._restore_motor_properties() if self.relative: self.components.move_and_wait(self.motors, self.start_positions)
####################################################### ######### Helper methods for the scan logic ########### ####################################################### # Implement scan-specific helper methods below. def _check_motor_inputs(self): """ Check the motor inputs for validity, e.g. whether the start and stop positions are not too close for the given step size. """ if np.isclose(self.fast_start, self.fast_end, atol=self.fast_step_size): raise ScanAbortion( f"Fast stop {self.fast_end} and fast start {self.fast_start} positions are too close for the given step size {self.fast_step_size}." ) if np.isclose(self.stepper_start, self.stepper_stop, atol=self.stepper_step_size): raise ScanAbortion( f"Stepper stop {self.stepper_stop} and stepper start {self.stepper_start} positions are too close for the given step size {self.stepper_step_size}." ) def _restore_motor_properties(self): vel = self._cont_motor_params["original_velocity"] acc = self._cont_motor_params["original_acceleration"] if vel is not None: self.fast_axis.velocity.put(vel) if acc is not None: self.fast_axis.acceleration.put(acc) def _fetch_device_params(self): self._cont_motor_params["shutter_open_delay"] = self.ddg1.get_shutter_to_open_delay() self._cont_motor_params["original_acceleration"] = self.fast_axis.acceleration.get() self._cont_motor_params["original_velocity"] = self.fast_axis.velocity.get() self._cont_motor_params["base_velocity"] = self.fast_axis.base_velocity.get() target_vel = self.fast_step_size / self.exp_time if target_vel > self._cont_motor_params["original_velocity"]: raise ScanAbortion( f"Requested velocity of {target_vel} exceeds maximum velocity {self._cont_motor_params['original_velocity']} of motor {self.fast_axis.name}." ) if target_vel < self._cont_motor_params["base_velocity"]: raise ScanAbortion( f"Requested velocity of {target_vel} is below base velocity {self._cont_motor_params['base_velocity']}." ) accceleration_time = ( (target_vel - self._cont_motor_params["base_velocity"]) / ( self._cont_motor_params["original_velocity"] - self._cont_motor_params["base_velocity"] ) * self._cont_motor_params["original_acceleration"] ) self._cont_motor_params["target_velocity"] = target_vel self._cont_motor_params["acc_time"] = accceleration_time if self._cont_motor_params["acc_time"] < self._cont_motor_params["shutter_open_delay"]: extra_distance = ( self._cont_motor_params["target_velocity"] * self._cont_motor_params["shutter_open_delay"] ) # Adjust acc time to account for shutter delay self._cont_motor_params["acc_time"] += self._cont_motor_params["shutter_open_delay"] else: extra_distance = 0 self._cont_motor_params["premove_distance"] = self._compute_premove_distance( additional_distance=extra_distance ) def _compute_premove_distance(self, additional_distance: float) -> float: return ( 0.5 * ( self._cont_motor_params["target_velocity"] + self._cont_motor_params["base_velocity"] ) * self._cont_motor_params["acc_time"] + additional_distance * self._cont_motor_params["target_velocity"] )