Source code for csaxs_bec.devices.omny.webcam_viewer

import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt, Kind, Signal
from ophyd_devices import PreviewSignal
import traceback 

from bec_lib.logger import bec_logger

logger = bec_logger.logger

[docs] class WebcamViewer(Device): USER_ACCESS = ["start_live_mode", "stop_live_mode"] preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False) live_mode_enabled = Cpt( Signal, name="live_mode_enabled", value=False, doc="Enable or disable live mode.", kind=Kind.config, ) def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None: super().__init__(name=name, **kwargs) self.url = url self._connection = None self._update_thread = None self._buffer = b"" self._shutdown_event = threading.Event() self._live_mode_lock = threading.RLock() self.preview.num_rotation_90 = num_rotation_90 self.preview.transpose = transpose self.live_mode_enabled.subscribe(self._on_live_mode_enabled_changed, run=False) def start_live_mode(self) -> None: self.live_mode_enabled.put(True) def stop_live_mode(self) -> None: self.live_mode_enabled.put(False) def _on_live_mode_enabled_changed(self, *args, value, **kwargs) -> None: self._apply_live_mode(bool(value)) def _apply_live_mode(self, enabled: bool) -> None: with self._live_mode_lock: if enabled: if self._update_thread is not None and self._update_thread.is_alive(): return self._shutdown_event.clear() self._update_thread = threading.Thread(target=self._update_loop, daemon=True) self._update_thread.start() return if self._update_thread is None: return self._shutdown_event.set() if self._connection is not None: try: self._connection.close() except Exception: # pylint: disable=broad-except pass self._connection = None self._update_thread.join(timeout=2) if self._update_thread.is_alive(): logger.warning("Webcam live mode thread did not stop within timeout.") return self._update_thread = None self._buffer = b"" self._shutdown_event.clear() def _update_loop(self) -> None: while not self._shutdown_event.is_set(): try: self._connection = requests.get(self.url, stream=True, timeout=5) for chunk in self._connection.iter_content(chunk_size=1024): if self._shutdown_event.is_set(): break self._buffer += chunk start = self._buffer.find(b'\xff\xd8') # JPEG start end = self._buffer.find(b'\xff\xd9') # JPEG end if start == -1 or end == -1: continue jpg = self._buffer[start:end+2] self._buffer = self._buffer[end+2:] image = cv2.imdecode(np.frombuffer(jpg, np.uint8), cv2.IMREAD_COLOR) if image is not None: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) self.preview.put(image) except Exception as exc: content = traceback.format_exc() logger.error(f"Image update loop failed: {content}")