Source code for csaxs_bec.devices.epics.allied_vision_camera

"""Module for the EPICS integration of the AlliedVision Camera via Vimba SDK."""

import threading
import traceback
from enum import IntEnum

import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt, Kind, Signal
from ophyd.areadetector import ADComponent as ADCpt
from ophyd.areadetector import DetectorBase
from ophyd_devices import PreviewSignal
from ophyd_devices.devices.areadetector.cam import VimbaDetectorCam
from ophyd_devices.devices.areadetector.plugins import ImagePlugin_V35 as ImagePlugin
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from typeguard import typechecked

logger = bec_logger.logger


[docs] class ACQUIRE_MODES(IntEnum): """Acquiring enums for Allied Vision Camera""" ACQUIRING = 1 DONE = 0
[docs] class AlliedVisionCamera(PSIDeviceBase, DetectorBase): """ Epics Area Detector interface for the Allied Vision Alvium G1-507m camera via Vimba SDK. The IOC runs with under the prefix: 'X12SA-GIGECAM-AV1:'. """ USER_ACCESS = ["start_live_mode", "stop_live_mode"] cam = ADCpt(VimbaDetectorCam, "cam1:") image = ADCpt(ImagePlugin, "image1:") preview = Cpt( PreviewSignal, name="preview", ndim=2, num_rotation_90=0, transpose=False, doc="Preview signal of the AlliedVision camera.", ) live_mode_enabled = Cpt( Signal, name="live_mode_enabled", value=False, doc="Enable or disable live mode.", kind=Kind.config, ) def __init__( self, *, name: str, prefix: str, poll_rate: int = 5, num_rotation_90: int = 0, transpose: bool = False, scan_info=None, device_manager=None, **kwargs, ): super().__init__( name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) self._poll_thread = threading.Thread( target=self._poll_array_data, daemon=True, name=f"{self.name}_poll_thread" ) self._poll_thread_kill_event = threading.Event() self._poll_start_event = threading.Event() if poll_rate <= 0: logger.warning( f"Poll rate must be positive for Camera {self.name} and non-zero, setting to 1 Hz." ) poll_rate = 1 self.stop_live_mode() elif poll_rate > 10: logger.warning(f"Poll rate too high for Camera {self.name}, setting to 10 Hz max.") poll_rate = 10 self._poll_rate = poll_rate self._unique_array_id = 0 self._pv_timeout = 2.0 self.image: ImagePlugin self.preview.num_rotation_90 = num_rotation_90 self.preview.transpose = transpose self._live_mode_lock = threading.RLock() self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False) self.cam.acquire.subscribe(self._on_live_mode_enabled_changed, run=False)
[docs] def start_live_mode(self) -> None: """Start live mode.""" self.live_mode_enabled.put(True)
[docs] def stop_live_mode(self) -> None: """Stop live mode.""" self.live_mode_enabled.put(False)
def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None: self._apply_live_mode(bool(value)) def _apply_live_mode(self, enabled: bool) -> None: with self._live_mode_lock: if enabled: if not self._poll_start_event.is_set(): self._poll_start_event.set() self.cam.acquire.put(ACQUIRE_MODES.ACQUIRING.value) # Start acquisition else: logger.info(f"Live mode already started for {self.name}.") return if self._poll_start_event.is_set(): self._poll_start_event.clear() self.cam.acquire.put(ACQUIRE_MODES.DONE.value) # Stop acquisition else: logger.info(f"Live mode already stopped for {self.name}.")
[docs] def on_connected(self): """Reset the unique array ID on connection.""" self.cam.array_counter.set(0).wait(timeout=self._pv_timeout) self.cam.array_callbacks.set(1).wait(timeout=self._pv_timeout) self._poll_thread.start()
def _poll_array_data(self): """Poll the array data for preview updates.""" while self._poll_start_event.wait(): while not self._poll_thread_kill_event.wait(1 / self._poll_rate): try: # First check if there is a new image if self.image.unique_id.get() != self._unique_array_id: self._unique_array_id = self.image.unique_id.get() else: continue # No new image, skip update # Get new image data value = self.image.array_data.get() if value is None: logger.info(f"No image data available for preview of {self.name}") continue array_size = self.image.array_size.get() if array_size[0] == 0: # 2D image, not color image array_size = array_size[1:] # Geometry correction for the image data = np.reshape(value, array_size) self.preview.put(data) except Exception: # pylint: disable=broad-except content = traceback.format_exc() logger.error( f"Error while polling array data for preview of {self.name}: {content}" )
[docs] def on_destroy(self): """Stop the polling thread on destruction.""" self._poll_start_event.set() self._poll_thread_kill_event.set() if self._poll_thread.is_alive(): self._poll_thread.join(timeout=2)