Source code for csaxs_bec.devices.panda_box.panda_box

"""Module to integrate the PandaBox for cSAXS measurements."""

import time

from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState

from csaxs_bec.devices.utils.utils import fetch_scan_info

logger = bec_logger.logger


[docs] class PandaBoxCSAXS(PandaBox): """ PandaBox integration for cSAXS. This class implements cSAXS specific logic for the PandaBox integration. TODO: This logic is not yet mapped to any existing hardware. Adapt Docstring once the hardware is defined and integrated. """
[docs] def on_init(self): super().on_init() self._acquisition_group = "burst" self._timeout_on_completed = 10 self.scan_parameters: ScanServerScanInfo | None = None
[docs] def on_stage(self): self.scan_parameters = fetch_scan_info(self.scan_info) start_time = time.time() super().on_stage() # TODO, adjust as seen fit. # Adjust the acquisition group based on scan parameters if needed if self.scan_parameters.scan_type == "hardware_triggered": self._acquisition_group = "fly" elif self.scan_parameters.scan_type == "software_triggered": if self.scan_parameters.frames_per_trigger == 1: self._acquisition_group = "monitored" else: self._acquisition_group = "burst" logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
[docs] def on_complete(self): """On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox.""" def _check_capture_complete(): captured = 0 start_time = time.monotonic() try: expected_points = int( self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger ) while captured < expected_points: ret = self.send_raw("*PCAP.CAPTURED?") captured = int(ret[0].split("=")[-1]) time.sleep(0.01) if (time.monotonic() - start_time) > self._timeout_on_completed / 2: logger.info( f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points." ) if (time.monotonic() - start_time) > self._timeout_on_completed: raise TimeoutError( f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}" ) finally: self._disarm() status_captured = self.task_handler.submit_task(_check_capture_complete, run=True) self.cancel_on_stop(status_captured) return status_captured
if __name__ == "__main__": import time panda = PandaBoxCSAXS( name="omny_panda", host="omny-panda.psi.ch", signal_alias={ "FMC_IN.VAL2.Value": "alias", "FMC_IN.VAL1.Min": "alias2", "FMC_IN.VAL1.Max": "alias3", "FMC_IN.VAL1.Mean": "alias4", }, ) panda.on_connected() status = StatusBase(obj=panda) panda.add_status_callback( status=status, success=[PandaState.DISARMED], failure=[PandaState.READY] ) panda.stop() status.wait(timeout=2) panda.unstage() logger.info(f"Panda connected") ret = panda.stage() logger.info(f"Panda staged") ret = panda.pre_scan() ret.wait(timeout=5) logger.info(f"Panda pre scan done") time.sleep(5) panda.stop() st = panda.complete() st.wait(timeout=5) logger.info(f"Measurement completed") panda.unstage() logger.info(f"Panda Unstaged")