Source code for csaxs_bec.devices.epics.falcon_csaxs

"""Falcon Sitoro detector class for cSAXS beamline."""

import enum
import os
import threading
from typing import Literal

from bec_lib.file_utils import get_full_path
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices import CompareStatus, FileEventSignal
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35 as HDF5Plugin
from ophyd_devices.devices.dxp import EpicsDXPFalcon, EpicsMCARecord, Falcon
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase

logger = bec_logger.logger


[docs] class FalconError(Exception): """Base class for exceptions in this module."""
[docs] class ACQUIRESTATUS(enum.IntEnum): """Detector states for Falcon detector""" DONE = 0 ACQUIRING = 1 # or Capturing
[docs] class TriggerSource(enum.IntEnum): """Trigger source for Falcon detector""" USER = 0 GATE = 1 SYNC = 2
[docs] class MappingSource(enum.IntEnum): """Mapping source for Falcon detector""" SPECTRUM = 0 MAPPING = 1
[docs] class FalconControl(Falcon): """Falcon Control class at cSAXS. prefix: 'X12SA-SITORO:'""" dxp = Cpt(EpicsDXPFalcon, "dxp1:") mca = Cpt(EpicsMCARecord, "mca1") hdf5 = Cpt(HDF5Plugin, "HDF1:")
[docs] class FalconcSAXS(PSIDeviceBase, FalconControl): """ Falcon Sitoro detector for CSAXS class attributes: dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector mca (EpicsMCARecord) : MCA parameters for Falcon detector hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector MIN_READOUT (float) : Minimum readout time for the detector """ # specify minimum readout time for detector MIN_READOUT = 3e-3 _pv_timeout = 3 # Timeout for PV operations in seconds file_event = Cpt(FileEventSignal, name="file_event")
[docs] def on_init(self) -> None: """Initialize Falcon Sitoro detector""" self._lock = threading.RLock() self._readout_time = self.MIN_READOUT self._value_pixel_per_buffer = 20 self._queue_size = 2000 self._full_path = ""
[docs] def on_connected(self): """ Setup Falcon Sitoro detector default parameters once signals are connected """ self.on_stop() self._initialize_detector() self._initialize_detector_backend() self.set_trigger( mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0 )
[docs] def set_trigger( self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: Literal[0, 1] = 0, ) -> None: """ Set triggering mode for detector Args: mapping_mode (MappingSource): Mapping mode for the detector trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal ignore_gate (int): Ignore gate from TTL signal; defaults to 0 """ mapping = int(mapping_mode) trigger = int(trigger_source) self.collect_mode.put(mapping) self.pixel_advance_mode.put(trigger) self.ignore_gate.put(ignore_gate)
def _initialize_detector(self) -> None: """Initialize Falcon detector""" # 1 Realtime self.preset_mode.put(1) # 0 Normal, 1 Inverted self.input_logic_polarity.put(0) # 0 Manual 1 Auto self.auto_pixels_per_buffer.put(0) # Sets the number of pixels/spectra in the buffer self.pixels_per_buffer.put(self._value_pixel_per_buffer) def _initialize_detector_backend(self) -> None: """Initialize the detector backend for Falcon.""" # Enable HDF5 plugin self.hdf5.enable.put(1) # Use layout.xml file for cSAXS Falcon. FIXME:Should be checked if IOC runs on different host. self.hdf5.xml_file_name.put("layout.xml") # TODO Check if lazy open is needed and wanted! self.hdf5.lazy_open.put(1) self.hdf5.temp_suffix.put("") # Size of the queue for the number of spectra allowed in the buffer. If too small, data is lost at high throughput self.hdf5.queue_size.put(self._queue_size) self.hdf5.file_template.put("%s%s") self.hdf5.file_write_mode.put(2) # Set nd_array mode to 1: This means segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate self.nd_array_mode.put(1)
[docs] def on_stage(self): """ This method is called when the detector is staged for acquisition. We use the information in scan_info.msg about the upcoming scan to set all relevant parameters on the detector. """ # Calculate relevant parameters num_points = self.scan_info.msg.num_points frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) overall_frames = int(num_points * frames_per_trigger) exp_time = self.scan_info.msg.scan_parameters["exp_time"] self._full_path = get_full_path(self.scan_info.msg, self.name) # Check that exposure time is larger than readout time readout_time = max( self.scan_info.msg.scan_parameters.get("readout_time", self.MIN_READOUT), self.MIN_READOUT, ) if exp_time < readout_time: raise ValueError( f"Exposure time {exp_time} is less than minimum readout time {readout_time}" ) # TODO: Add h5_entries for linking the Falcon NEXUS entries with the master file self.file_event.put(file_path=self._full_path, done=False, successful=False) self.preset_real_time.put(exp_time) self.pixels_per_run.put(overall_frames) # Prepare detector backend PVs file_path, file_name = os.path.split(self._full_path) self.hdf5.file_path.put(file_path) self.hdf5.file_name.put(file_name) self.hdf5.num_capture.put(overall_frames) # Reset spectrum counter in filewriter, used for indexing & identifying missing triggers self.hdf5.array_counter.put(0) # Start file writing self.hdf5.capture.put(1) # Start the acquisition self.start_all.put(1)
[docs] def on_pre_scan(self): """ Method for actions just before the scan starts. """ status_camera = CompareStatus( self.acquire_busy, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout ) status_writer = CompareStatus( self.hdf5.capture, ACQUIRESTATUS.ACQUIRING, timeout=self._pv_timeout ) # Logical combine of statuses status = status_camera & status_writer self.cancel_on_stop(status) return status
def _complete_callback(self, status: CompareStatus) -> None: """Callback for when the device completes a scan.""" # FIXME Add proper h5 entries once checked if status.success: self.file_event.put( file_path=self._full_path, # pylint: disable:protected-access done=True, successful=True, ) else: self.file_event.put( file_path=self._full_path, # pylint: disable:protected-access done=True, successful=False, )
[docs] def on_complete(self) -> None: """Complete detector and backend""" # Calculate relevant parameters num_points = self.scan_info.msg.num_points frames_per_trigger = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) overall_frames = int(num_points * frames_per_trigger) status_detector = CompareStatus(self.dxp.current_pixel, overall_frames, run=True) status_backend = CompareStatus(self.hdf5.array_counter, overall_frames, run=True) status = status_detector & status_backend self.cancel_on_stop(status) status.add_callback(self._complete_callback) return status
[docs] def on_stop(self) -> None: """Stop detector and backend""" self.stop_all.put(1) self.hdf5.capture.put(0) self.erase_all.put(1)
if __name__ == "__main__": falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:")