Source code for csaxs_bec.devices.panda_box.panda_box_omny
"""Module to integrate the PandaBox for cSAXS measurements."""
import time
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd_devices import StatusBase
from ophyd_devices.devices.panda_box.panda_box import PandaBox, PandaState
from csaxs_bec.devices.utils.utils import fetch_scan_info
logger = bec_logger.logger
[docs]
class PandaBoxOMNY(PandaBox):
"""PandaBox integration for OMNY. This class implements OMNY specific logic for the PandaBox integration."""
[docs]
def on_init(self):
super().on_init()
self._acquisition_group = "burst"
self._timeout_on_completed = 10
self.scan_parameters: ScanServerScanInfo | None = None
[docs]
def on_stage(self):
start_time = time.time()
super().on_stage()
self.scan_parameters = fetch_scan_info(self.scan_info)
# TODO, adjust as seen fit.
# Adjust the acquisition group based on scan parameters if needed
if self.scan_parameters.scan_type == "hardware_triggered":
self._acquisition_group = "fly"
elif self.scan_parameters.scan_type == "software_triggered":
if self.scan_parameters.frames_per_trigger == 1:
self._acquisition_group = "monitored"
else:
self._acquisition_group = "burst"
logger.info(f"PandaBox {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
[docs]
def on_complete(self):
"""On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox."""
def _check_capture_complete():
captured = 0
start_time = time.monotonic()
try:
expected_points = int(
self.scan_parameters.num_points * self.scan_parameters.frames_per_trigger
)
while captured < expected_points:
ret = self.send_raw("*PCAP.CAPTURED?")
captured = int(ret[0].split("=")[-1])
time.sleep(0.01)
if (time.monotonic() - start_time) > self._timeout_on_completed / 2:
logger.info(
f"Waiting for capture on device {self.name} to complete: captured {captured}/{expected_points} points."
)
if (time.monotonic() - start_time) > self._timeout_on_completed:
raise TimeoutError(
f"Pandabox {self.name} did not complete after {self._timeout_on_completed} with points captured {captured}/{expected_points}"
)
finally:
self._disarm()
status_captured = self.task_handler.submit_task(_check_capture_complete, run=True)
self.cancel_on_stop(status_captured)
return status_captured
if __name__ == "__main__":
import time
panda = PandaBoxOMNY(
name="omny_panda",
host="omny-panda.psi.ch",
signal_alias={
"FMC_IN.VAL2.Value": "alias",
"FMC_IN.VAL1.Min": "alias2",
"FMC_IN.VAL1.Max": "alias3",
"FMC_IN.VAL1.Mean": "alias4",
},
)
panda.on_connected()
status = StatusBase(obj=panda)
panda.add_status_callback(
status=status, success=[PandaState.DISARMED], failure=[PandaState.READY]
)
panda.stop()
status.wait(timeout=2)
panda.unstage()
logger.info(f"Panda connected")
ret = panda.stage()
logger.info(f"Panda staged")
ret = panda.pre_scan()
ret.wait(timeout=5)
logger.info(f"Panda pre scan done")
time.sleep(5)
panda.stop()
st = panda.complete()
st.wait(timeout=5)
logger.info(f"Measurement completed")
panda.unstage()
logger.info(f"Panda Unstaged")