"""
SCAN PLUGINS
All new scans should be derived from ScanBase. ScanBase provides various methods that can be customized and overriden
but they are executed in a specific order:
- self.initialize # initialize the class if needed
- self.read_scan_motors # used to retrieve the start position (and the relative position shift if needed)
- self.prepare_positions # prepare the positions for the scan. The preparation is split into multiple sub fuctions:
- self._calculate_positions # calculate the positions
- self._set_positions_offset # apply the previously retrieved scan position shift (if needed)
- self._check_limits # tests to ensure the limits won't be reached
- self.open_scan # send an open_scan message including the scan name, the number of points and the scan motor names
- self.stage # stage all devices for the upcoming acquisiton
- self.run_baseline_readings # read all devices to get a baseline for the upcoming scan
- self.scan_core # run a loop over all position
- self._at_each_point(ind, pos) # called at each position with the current index and the target positions as arguments
- self.finalize # clean up the scan, e.g. move back to the start position; wait everything to finish
- self.unstage # unstage all devices that have been staged before
- self.cleanup # send a close scan message and perform additional cleanups if needed
"""
import time
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.scans import AsyncFlyScanBase
logger = bec_logger.logger
[docs]
class SgalilGrid(AsyncFlyScanBase):
scan_name = "sgalil_grid"
scan_report_hint = "device_progress"
required_kwargs = []
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
def __init__(
self,
start_y: float,
end_y: float,
interval_y: int,
start_x: float,
end_x: float,
interval_x: int,
*args,
exp_time: float = 0.1,
readout_time: float = 0.1,
**kwargs,
):
"""
SGalil-based grid scan.
Args:
start_y (float): start position of y axis (fast axis)
end_y (float): end position of y axis (fast axis)
interval_y (int): number of points in y axis
start_x (float): start position of x axis (slow axis)
end_x (float): end position of x axis (slow axis)
interval_x (int): number of points in x axis
exp_time (float): exposure time in seconds. Default is 0.1s
readout_time (float): readout time in seconds, minimum of 3e-3s (3ms)
Exp:
scans.sgalil_grid(start_y = val1, end_y= val1, interval_y = val1, start_x = val1, end_x = val1, interval_x = val1, exp_time = 0.02, readout_time = 3e-3)
"""
super().__init__(*args, **kwargs)
# Always scan from positive x & y to negative x & y
if start_y > end_y:
self.start_y = start_y
self.end_y = end_y
else:
self.start_y = end_y
self.end_y = start_y
if start_x > end_x:
self.start_x = start_x
self.end_x = end_x
else:
self.start_x = end_x
self.end_x = start_x
self.interval_y = interval_y
self.interval_x = interval_x
self.exp_time = exp_time
self.readout_time = readout_time
self.num_pos = int(interval_x * interval_y)
self.scan_motors = ["samx", "samy"]
# Scan progress related variables
self.timeout_progress = 0
self.progress_point = 0
self.timeout_scan_abortion = 10 # 42 # duty cycles of scan segment update
self.sleep_time = 1
def scan_report_instructions(self):
if not self.scan_report_hint:
yield None
return
yield from self.stubs.scan_report_instruction({"device_progress": ["mcs"]})
[docs]
def pre_scan(self):
yield from self._move_scan_motors_and_wait([self.start_x, self.start_y])
yield from self.stubs.pre_scan()
# TODO move to start position
[docs]
def scan_progress(self) -> int:
"""Timeout of the progress bar. This gets updated in the frequency of scan segments"""
msg = self.device_manager.connector.get(MessageEndpoints.device_progress("mcs"))
if not msg:
self.timeout_progress += 1
return self.timeout_progress
# TODO which update is that!
updated_progress = int(msg.content["value"])
if updated_progress == int(self.progress_point):
self.timeout_progress += 1
return self.timeout_progress
else:
self.timeout_progress = 0
self.progress_point = updated_progress
return self.timeout_progress
[docs]
def scan_core(self):
"""
This is the main event loop.
"""
# set up the delay generators
yield from self.stubs.send_rpc_and_wait(
"ddg_detectors",
"burst_enable",
count=self.interval_y,
delay=0,
period=(self.exp_time + self.readout_time),
config="first",
)
yield from self.stubs.send_rpc_and_wait(
"ddg_mcs",
"burst_enable",
count=self.interval_y,
delay=0,
period=(self.exp_time + self.readout_time),
config="first",
)
# Disable burst mod on DDF for fsh and EN of MCS card
yield from self.stubs.send_rpc_and_wait("ddg_fsh", "burst_disable")
# Set width of FSH opening to 0
yield from self.stubs.send_rpc_and_wait(
"ddg_fsh", "set_channels", "width", 0, channels=["channelCD"]
)
# TODO disable fsh ddg bc SGalil trigger it directly
# Setup triggering
status_ddg_detectors_source = yield from self.stubs.send_rpc_and_wait(
"ddg_detectors", "source.set", 2
)
status_ddg_mcs_source = yield from self.stubs.send_rpc_and_wait("ddg_mcs", "source.set", 1)
# Setup mcs_points per line
# status_mcs_points_per_line = yield from self.stubs.send_rpc_and_wait(
# "mcs", "num_use_all.set", self.interval_y + 1
# )
# status_mcs_lines = yield from self.stubs.send_rpc_and_wait(
# "mcs", "num_lines.set", self.interval_x
# )
# status_ddg_mcs_ttlwidth = yield from self.stubs.send_rpc_and_wait(
# "ddg_mcs", "set_channels", "width", 3e-3
# )
yield from self.stubs.send_rpc_and_wait("ddg_mcs", "set_channels", "delay", 0)
# wait for the delay generators to finish setting up
status_ddg_detectors_source.wait()
status_ddg_mcs_source.wait()
yield from self.stubs.send_rpc_and_wait("ddg_fsh", "trigger")
# trigger_ddg_fsh.wait()
# status_mcs_points_per_line.wait()
# status_mcs_lines.wait()
kickoff_status = yield from self.stubs.kickoff(
device="samx",
parameter={
"start_y": self.start_y,
"end_y": self.end_y,
"interval_y": self.interval_y,
"start_x": self.start_x,
"end_x": self.end_x,
"interval_x": self.interval_x,
"exp_time": self.exp_time,
"readout_time": self.readout_time,
},
wait=False,
)
while not kickoff_status.done:
# readout the monitored device and wait for the fly scan to finish
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
time.sleep(self.sleep_time)
if self.scan_progress() > int(self.timeout_scan_abortion / self.sleep_time):
logger.info("would have raised a scan abortion here")
# raise ScanAbortion()
# try:
# logger.info(f'Scan progress check {self.scan_progress()} and {int(self.timeout_scan_abortion/self.sleep_time)}')
# logger.info(f'Potential scan abortion {self.scan_progress() > int(self.timeout_scan_abortion/self.sleep_time)}')
# if self.scan_progress() > int(self.timeout_scan_abortion/self.sleep_time):
# logger.info('Testing Scan abortion, would have raised here!')
# except Exception as exc:
# logger.info(f'{exc}')
def _move_scan_motors_and_wait(self, pos):
# TODO: remove this method once BEC MR 637 is merged
# pylint: disable=no-member
if hasattr(super(), "_move_scan_motors_and_wait"):
yield from super()._move_scan_motors_and_wait(pos)
else:
yield from self._move_and_wait(pos)