Source code for csaxs_bec.devices.ids_cameras.ids_camera

"""IDS Camera class for cSAXS IDS cameras."""

from __future__ import annotations

import threading
from typing import TYPE_CHECKING, Literal

import numpy as np
from ophyd import Component as Cpt, Signal, Kind

from bec_lib import messages
from bec_lib.logger import bec_logger
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal

if TYPE_CHECKING:
    from bec_lib.devicemanager import ScanInfo

logger = bec_logger.logger


[docs] class IDSCamera(PSIDeviceBase): """IDS Camera class for cSAXS. This class inherits from PSIDeviceBase and implements the necessary methods to interact with the IDS camera using the pyueye library. """ image = Cpt( PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0, transpose=False, ) roi_signal = Cpt( AsyncSignal, name="roi_signal", ndim=0, max_size=1000, doc="Signal for the region of interest (ROI).", async_update={"type": "add", "max_shape": [None]}, ) live_mode_enabled = Cpt( Signal, name="live_mode_enabled", value=False, doc="Enable or disable live mode.", kind=Kind.config, ) USER_ACCESS = ["start_live_mode", "stop_live_mode", "mask", "set_rect_roi", "get_last_image"] def __init__( self, *, name: str, camera_id: int, prefix: str = "", scan_info: ScanInfo | None = None, m_n_colormode: Literal[0, 1, 2, 3] = 1, bits_per_pixel: Literal[8, 24] = 24, live_mode: bool = False, num_rotation_90: int = 0, transpose: bool = False, force_monochrome: bool = False, **kwargs, ): """Initialize the IDS Camera. Args: name (str): Name of the device. camera_id (int): The ID of the camera device. prefix (str): Prefix for the device. scan_info (ScanInfo | None): Scan information for the device. m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera. bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera. live_mode (bool): Whether to enable live mode for the camera. """ super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs) self._live_mode_thread: threading.Thread | None = None self._stop_live_mode_event: threading.Event = threading.Event() self.cam = Camera( camera_id=camera_id, m_n_colormode=m_n_colormode, bits_per_pixel=bits_per_pixel, connect=False, ) self._inputs = {"live_mode": live_mode} self._mask = np.zeros((1, 1), dtype=np.uint8) self.image.num_rotation_90 = num_rotation_90 self.image.transpose = transpose self._force_monochrome = force_monochrome self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False) self.live_mode_enabled.put(bool(live_mode)) ############## Live Mode Methods ############## def start_live_mode(self) -> None: self.live_mode_enabled.put(True) def stop_live_mode(self) -> None: self.live_mode_enabled.put(False) @property def mask(self) -> np.ndarray: """Return the current region of interest (ROI) for the camera.""" return self._mask @mask.setter def mask(self, value: np.ndarray): """ Set the region of interest (ROI) for the camera. Args: value (np.ndarray): The mask to set as the ROI. """ if value.ndim != 2: raise ValueError("ROI mask must be a 2D array.") img_shape = (self.cam.cam.height.value, self.cam.cam.width.value) if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]: raise ValueError( f"ROI mask shape {value.shape} does not match image shape {img_shape}." ) self._mask = value def _on_live_mode_enabled_changed(self, *args, value, **kwargs): """Callback for when live mode is changed.""" enabled = bool(value) if enabled and self.cam._connected is False: # pylint: disable=protected-access self.cam.on_connect() if enabled: self._start_live() else: self._stop_live()
[docs] def set_rect_roi(self, x: int, y: int, width: int, height: int): """Set the rectangular region of interest (ROI) for the camera.""" if x < 0 or y < 0 or width <= 0 or height <= 0: raise ValueError("ROI coordinates and dimensions must be positive integers.") img_shape = (self.cam.cam.height.value, self.cam.cam.width.value) if x + width > img_shape[1] or y + height > img_shape[0]: raise ValueError("ROI exceeds camera dimensions.") mask = np.zeros(img_shape, dtype=np.uint8) mask[y : y + height, x : x + width] = 1 self.mask = mask
def _start_live(self): """Start the live mode for the camera.""" if self._live_mode_thread is not None: logger.info("Live mode thread is already running.") return self._stop_live_mode_event.clear() self._live_mode_thread = threading.Thread( target=self._live_mode_loop, args=(self._stop_live_mode_event,) ) self._live_mode_thread.start() def _stop_live(self): """Stop the live mode for the camera.""" if self._live_mode_thread is None: logger.info("Live mode thread is not running.") return self._stop_live_mode_event.set() self._live_mode_thread.join(timeout=5) if self._live_mode_thread.is_alive(): logger.warning("Live mode thread did not stop gracefully.") else: self._live_mode_thread = None logger.info("Live mode stopped.") def _live_mode_loop(self, stop_event: threading.Event): """Loop to capture images in live mode.""" self.cam.set_camera_rate_limiting(True) while not stop_event.is_set(): try: self.process_data(self.cam.get_image_data()) except Exception as e: logger.error(f"Error in live mode loop: {e}") break stop_event.wait(0.2) # 5 Hz self.cam.set_camera_rate_limiting(False)
[docs] def process_data(self, image: np.ndarray | None): """Process the image data before sending it to the preview signal.""" if image is None: return self.image.put(image)
[docs] def get_last_image(self) -> np.ndarray: """Get the last captured image from the camera.""" image = self.image.get() if image: return image.data
############## User Interface Methods ##############
[docs] def on_connected(self): """Connect to the camera.""" self.cam.force_monochrome = self._force_monochrome self.cam.on_connect() self.live_mode_enabled.put(bool(self._inputs.get("live_mode", False))) self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
[docs] def on_destroy(self): """Clean up resources when the device is destroyed.""" self.cam.on_disconnect() super().on_destroy()
[docs] def on_trigger(self): """Handle the trigger event.""" if not bool(self.live_mode_enabled.get()): return image = self.image.get() if image is not None: image: messages.DevicePreviewMessage if self.mask.shape[0:2] != image.data.shape[0:2]: logger.info( f"ROI shape does not match image shape, skipping ROI application for device {self.name}." ) return if len(image.data.shape) == 3: # If the image has multiple channels, apply the mask to each channel data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data n_channels = 3 else: data = image.data * self.mask n_channels = 1 self.roi_signal.put(np.sum(data) / (np.sum(self.mask) * n_channels))
if __name__ == "__main__": # Example usage of the IDSCamera class camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False) print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")