Source code for csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools

"""Module providing debugging tools for the BEC IPython client at cSAXS."""

from __future__ import annotations

import inspect
import json
import os
import re
import socket
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import TYPE_CHECKING, Literal

import numpy as np
from pydantic import BaseModel
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from slugify import slugify

if TYPE_CHECKING:
    from bec_ipython_client.main import BECIPythonClient
    from bec_lib.devicemanager import DeviceManagerBase
    from bec_lib.scans import Scans
    from bec_widgets.cli.client_utils import BECGuiClient

    scans: Scans  # type: ignore[no-redef]

    bec: BECIPythonClient  # type: ignore[no-redef]

    dev: DeviceManagerBase  # type: ignore[no-redef]


[docs] class Detector(BaseModel): """Model representing a detector configuration.""" name: str hostnames: list[str] cfg: dict
[docs] def to_identifier(text: str) -> str: """ Convert an unsafe string into a valid Python identifier. """ name = slugify(text.strip(), separator="_") name = re.sub(r"[^a-zA-Z0-9_]", "", name) if not name: raise ValueError(f"Cannot convert '{text}' to a valid identifier.") if name[0].isdigit(): name = f"_{name}" return name
[docs] class DebugTools: """A collection of debugging tools for the BEC IPython client at cSAXS.""" _PURPOSE = ( "Debugging helpers for the cSAXS BEC IPython client. These tools are intended for advanced users " "and developers to diagnose and troubleshoot issues within the BEC environment. " "Below are the available methods together with a brief description of their functionality." ) ###################### ## Internal Methods ## ###################### def _describe(self) -> None: """Pretty-print a description of this debugging tool.""" console = Console() # Offset for IPython prompt misplacement console.print("\n\n", end="") header = Text("DebugTools", style="bold cyan") purpose = Text(self._PURPOSE, style="dim") console.print(Panel(purpose, title=header, expand=False)) table = Table(show_header=True, header_style="bold magenta") table.add_column("Method", style="bold", no_wrap=True) table.add_column("Description") for name, member in inspect.getmembers(self, predicate=inspect.ismethod): if name.startswith("_"): continue doc = inspect.getdoc(member) short_doc = doc.splitlines()[0] if doc else "" table.add_row(name, short_doc) console.print(table) def _repr_pretty_(self, p, cycle: bool) -> None: if cycle: p.text("DebugTools(...)") else: self._describe() ##################### ### MCS Card Check ### ##################### def _check_if_device_is_loaded(self, device_name: str): """Check if a device is loaded in the current BEC session.""" if device_name not in dev: raise RuntimeError( f"Device {device_name} was not loaded in the current active BEC session." )
[docs] def mcs_test_acquire( self, mode: Literal["high_frame", "medium_frame", "low_frame"] = "high_frame" ): """ Method to perform a test acquisition with randomized exposure time, burst frames, and cycles on the MCS card using the DDG trigger setup. Args: mode (Literal["high_frame", "medium_frame", "low_frame"]): The mode of the test. - 'high_frame': Tests high frame rates with short exposure times. - 'medium_frame': Tests medium frame rates with moderate exposure times. - 'low_frame': Tests low frame rates with longer exposure times. """ self._check_if_device_is_loaded("mcs") self._check_if_device_is_loaded("ddg1") self._check_if_device_is_loaded("ddg2") if mode == "high_frame": burst_frames = np.random.randint(10_000, 100_000) # between 10000 and 100000 cycles = np.random.randint(5, 20) # between 5 and 20 exp_time = ( np.random.rand() * (0.001 - 0.201e-3) + 0.201e-3 ) # between 0.000201 ms and 0.001 s elif mode == "medium_frame": burst_frames = np.random.randint(50, 500) # between 50 and 500 cycles = np.random.randint(1, 10) # between 1 and 10 exp_time = np.random.rand() * (0.01 - 0.001) + 0.001 # between 0.001 ms and 0.01 s elif mode == "low_frame": burst_frames = np.random.randint(5, 20) # between 5 and 20 cycles = np.random.randint(1, 5) # between 1 and 5 exp_time = np.random.rand() * (2 - 0.1) + 0.1 # between 0.1 ms and 2 s else: raise ValueError(f"Invalid mode '{mode}' specified for acquire scan test.") print( f"Starting acquire measurement with exp_time={exp_time:.6f}, burst_frames={burst_frames}, cycles={cycles}" ) s = scans.acquire( exp_time=exp_time, frames_per_trigger=burst_frames, burst_at_each_point=cycles ) s.wait(file_written=True) print("Acquire measurement finished.") print("Checking MCS data...") scan_data = bec.history.get_by_scan_id(s.scan.scan_id) mcs_data = scan_data.devices.mcs print(mcs_data) shape = mcs_data._info["mcs_mca_mca1"]["value"]["shape"] expected_shape = (cycles * burst_frames,) # Assert will raise an error if the shapes do not match assert ( shape == expected_shape ), f"MCS data shape {shape} does not match expected shape {expected_shape}."
######################## ### JFJ/Eiger Checks ### ######################## def _get_jfj_eiger_config(self) -> dict[str, Detector]: """Retrieve the current JFJ/Eiger detector configuration from the BEC client.""" # FIXME: Implement REST API call once ready for use from Leo Sala's team. ret = {} base_path = os.path.dirname(__file__) config_path = os.path.join(base_path, "jfj_config.json") with open(config_path, "r", encoding="utf-8") as fh: cfg = json.load(fh) for entry in cfg["detector"]: det = Detector( name=to_identifier(entry["description"]), hostnames=entry["hostname"], cfg=cfg ) ret[det.name] = det return ret
[docs] def list_detectors(self) -> list[str]: """ List the names of all JFJ/Eiger detectors configured in the BEC client. Returns: list[str]: A list of detector names. """ detectors = self._get_jfj_eiger_config() return list(detectors.keys())
[docs] def ping_detector(self, detector_name: str) -> bool: """ Ping a JFJ/Eiger detector to check if it is reachable. Args: detector_name (str): The name of the detector to ping. Returns: bool: True if the detector is reachable, False otherwise. """ detectors = self._get_jfj_eiger_config() if detector_name not in detectors: raise ValueError(f"Detector '{detector_name}' not found in configuration.") det = detectors[detector_name] results = self._ping_many(det.hostnames) table = Table(title=f"Ping results for detector '{detector_name}'") table.add_column("Hostname", style="cyan", no_wrap=True) table.add_column("Status", style="magenta") for host, alive in results.items(): status = "[green]OK[/green]" if alive else "[red]DOWN[/red]" table.add_row(host, status) console = Console() console.print(table)
def _ping_many(self, hosts: list[str], port=22, timeout=2, max_workers=None): max_workers = max_workers or len(hosts) with ThreadPoolExecutor(max_workers=max_workers) as executor: primed_ping = partial(self._ping, port=port, timeout=timeout) pings = executor.map(primed_ping, hosts) return dict(zip(hosts, pings)) def _ping(self, host: str, port=23, timeout=2): # telnet is port 23 address = (host, port) try: with socket.create_connection(address, timeout): return True except OSError: return False
[docs] def open_it_service_page(self): """Open the overview of IT services hosted by Science IT Infrastructure and Services for cSAXS.""" gui: BECGuiClient = bec.gui dock_area = gui.new() print("Opening IT service page in new dock...") url = "https://metrics.psi.ch/d/saf8mxv/x12sa?orgId=1&from=now-24h&to=now&timezone=browser&var-receiver_hosts=sls-jfjoch-001.psi.ch&var-writer_hosts=xbl-daq-34.psi.ch&var-beamline=X12SA&var-slurm_partitions=csaxs&var-receiver_services=broker&var-writer_services=writer&refresh=15m" # FIXME BEC WIDGETS v3 dock = dock_area.new() wb = dock.new(widget=gui.available_widgets.WebsiteWidget) wb.set_url(url)