Source code for csaxs_bec.scans.jungfrau_joch_scan
"""Module with JungfrauJochTestScan class."""
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion
logger = bec_logger.logger
[docs]
class JungfrauJochTestScan(AsyncFlyScanBase):
"""Owis-based grid scan."""
scan_name = "jjf_test"
# scan_report_hint = "device_progress"
required_kwargs = ["points", "exp_time", "readout_time"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
gui_config = {
"Acquisition Parameters": ["num_points", "cycles"],
"Exposure Parameters": ["exp_time", "readout_time"],
}
def __init__(
self, num_points: int, exp_time: float, readout_time: float, cycles: int = 1, **kwargs
):
"""
JungfrauJoch Test scan.
Args:
device (DeviceBase) : The device to be triggered, currently only for delaygenerator csaxs
num_points (int) : Number of points per burst
exp_time (float) : exposure time.
readout_time (float): readout time of detector
cycles (int) : number of cycles, default is 1
Example:
scans.jjf_test(points = 100, exp_time= 1e-3, readout_time=1e-3, cycles = 2)
"""
if readout_time <= 0:
raise ScanAbortion(f"Readout time must be larger than 0, provided value {readout_time}")
super().__init__(exp_time=exp_time, readout_time=readout_time, **kwargs)
self.device = "ddg"
self.num_points = num_points
self.cycles = cycles
self.primary_readout_cycle = 0.2
[docs]
def scan_core(self):
logger.info(f"Starting with Scan Core")
total_exposure = self.num_points * (self.exp_time + self.readout_time)
for i in range(self.cycles):
logger.info(f"Beginning cycle {i} of {self.cycles}")
status = yield from self.stubs.trigger(min_wait=total_exposure, wait=False)
yield from self.stubs.read(group="monitored", point_id=self.point_id, wait=True)
self.point_id += 1
status.wait()
logger.info(f"Finished cycle {i} of {self.cycles}")
logger.info(f"Finished scan")
self.num_pos = self.point_id