Source code for csaxs_bec.scans.jungfrau_joch_scan

"""Module with JungfrauJochTestScan class."""

from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion

logger = bec_logger.logger


[docs] class JungfrauJochTestScan(AsyncFlyScanBase): """Owis-based grid scan.""" scan_name = "jjf_test" # scan_report_hint = "device_progress" required_kwargs = ["points", "exp_time", "readout_time"] arg_input = {} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} gui_config = { "Acquisition Parameters": ["num_points", "cycles"], "Exposure Parameters": ["exp_time", "readout_time"], } def __init__( self, num_points: int, exp_time: float, readout_time: float, cycles: int = 1, **kwargs ): """ JungfrauJoch Test scan. Args: device (DeviceBase) : The device to be triggered, currently only for delaygenerator csaxs num_points (int) : Number of points per burst exp_time (float) : exposure time. readout_time (float): readout time of detector cycles (int) : number of cycles, default is 1 Example: scans.jjf_test(points = 100, exp_time= 1e-3, readout_time=1e-3, cycles = 2) """ if readout_time <= 0: raise ScanAbortion(f"Readout time must be larger than 0, provided value {readout_time}") super().__init__(exp_time=exp_time, readout_time=readout_time, **kwargs) self.device = "ddg" self.num_points = num_points self.cycles = cycles self.primary_readout_cycle = 0.2
[docs] def scan_core(self): logger.info(f"Starting with Scan Core") total_exposure = self.num_points * (self.exp_time + self.readout_time) for i in range(self.cycles): logger.info(f"Beginning cycle {i} of {self.cycles}") status = yield from self.stubs.trigger(min_wait=total_exposure, wait=False) yield from self.stubs.read(group="monitored", point_id=self.point_id, wait=True) self.point_id += 1 status.wait() logger.info(f"Finished cycle {i} of {self.cycles}") logger.info(f"Finished scan") self.num_pos = self.point_id