Source code for csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools
"""Module providing debugging tools for the BEC IPython client at cSAXS."""
from __future__ import annotations
import inspect
import json
import os
import re
import socket
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import TYPE_CHECKING, Literal
import numpy as np
from pydantic import BaseModel
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from slugify import slugify
if TYPE_CHECKING:
from bec_ipython_client.main import BECIPythonClient
from bec_lib.devicemanager import DeviceManagerBase
from bec_lib.scans import Scans
from bec_widgets.cli.client_utils import BECGuiClient
scans: Scans # type: ignore[no-redef]
bec: BECIPythonClient # type: ignore[no-redef]
dev: DeviceManagerBase # type: ignore[no-redef]
[docs]
class Detector(BaseModel):
"""Model representing a detector configuration."""
name: str
hostnames: list[str]
cfg: dict
[docs]
def to_identifier(text: str) -> str:
"""
Convert an unsafe string into a valid Python identifier.
"""
name = slugify(text.strip(), separator="_")
name = re.sub(r"[^a-zA-Z0-9_]", "", name)
if not name:
raise ValueError(f"Cannot convert '{text}' to a valid identifier.")
if name[0].isdigit():
name = f"_{name}"
return name
[docs]
class DebugTools:
"""A collection of debugging tools for the BEC IPython client at cSAXS."""
_PURPOSE = (
"Debugging helpers for the cSAXS BEC IPython client. These tools are intended for advanced users "
"and developers to diagnose and troubleshoot issues within the BEC environment. "
"Below are the available methods together with a brief description of their functionality."
)
######################
## Internal Methods ##
######################
def _describe(self) -> None:
"""Pretty-print a description of this debugging tool."""
console = Console()
# Offset for IPython prompt misplacement
console.print("\n\n", end="")
header = Text("DebugTools", style="bold cyan")
purpose = Text(self._PURPOSE, style="dim")
console.print(Panel(purpose, title=header, expand=False))
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Method", style="bold", no_wrap=True)
table.add_column("Description")
for name, member in inspect.getmembers(self, predicate=inspect.ismethod):
if name.startswith("_"):
continue
doc = inspect.getdoc(member)
short_doc = doc.splitlines()[0] if doc else ""
table.add_row(name, short_doc)
console.print(table)
def _repr_pretty_(self, p, cycle: bool) -> None:
if cycle:
p.text("DebugTools(...)")
else:
self._describe()
#####################
### MCS Card Check ###
#####################
def _check_if_device_is_loaded(self, device_name: str):
"""Check if a device is loaded in the current BEC session."""
if device_name not in dev:
raise RuntimeError(
f"Device {device_name} was not loaded in the current active BEC session."
)
[docs]
def mcs_test_acquire(
self, mode: Literal["high_frame", "medium_frame", "low_frame"] = "high_frame"
):
"""
Method to perform a test acquisition with randomized exposure time, burst frames, and cycles
on the MCS card using the DDG trigger setup.
Args:
mode (Literal["high_frame", "medium_frame", "low_frame"]): The mode of the test.
- 'high_frame': Tests high frame rates with short exposure times.
- 'medium_frame': Tests medium frame rates with moderate exposure times.
- 'low_frame': Tests low frame rates with longer exposure times.
"""
self._check_if_device_is_loaded("mcs")
self._check_if_device_is_loaded("ddg1")
self._check_if_device_is_loaded("ddg2")
if mode == "high_frame":
burst_frames = np.random.randint(10_000, 100_000) # between 10000 and 100000
cycles = np.random.randint(5, 20) # between 5 and 20
exp_time = (
np.random.rand() * (0.001 - 0.201e-3) + 0.201e-3
) # between 0.000201 ms and 0.001 s
elif mode == "medium_frame":
burst_frames = np.random.randint(50, 500) # between 50 and 500
cycles = np.random.randint(1, 10) # between 1 and 10
exp_time = np.random.rand() * (0.01 - 0.001) + 0.001 # between 0.001 ms and 0.01 s
elif mode == "low_frame":
burst_frames = np.random.randint(5, 20) # between 5 and 20
cycles = np.random.randint(1, 5) # between 1 and 5
exp_time = np.random.rand() * (2 - 0.1) + 0.1 # between 0.1 ms and 2 s
else:
raise ValueError(f"Invalid mode '{mode}' specified for acquire scan test.")
print(
f"Starting acquire measurement with exp_time={exp_time:.6f}, burst_frames={burst_frames}, cycles={cycles}"
)
s = scans.acquire(
exp_time=exp_time, frames_per_trigger=burst_frames, burst_at_each_point=cycles
)
s.wait(file_written=True)
print("Acquire measurement finished.")
print("Checking MCS data...")
scan_data = bec.history.get_by_scan_id(s.scan.scan_id)
mcs_data = scan_data.devices.mcs
print(mcs_data)
shape = mcs_data._info["mcs_mca_mca1"]["value"]["shape"]
expected_shape = (cycles * burst_frames,)
# Assert will raise an error if the shapes do not match
assert (
shape == expected_shape
), f"MCS data shape {shape} does not match expected shape {expected_shape}."
########################
### JFJ/Eiger Checks ###
########################
def _get_jfj_eiger_config(self) -> dict[str, Detector]:
"""Retrieve the current JFJ/Eiger detector configuration from the BEC client."""
# FIXME: Implement REST API call once ready for use from Leo Sala's team.
ret = {}
base_path = os.path.dirname(__file__)
config_path = os.path.join(base_path, "jfj_config.json")
with open(config_path, "r", encoding="utf-8") as fh:
cfg = json.load(fh)
for entry in cfg["detector"]:
det = Detector(
name=to_identifier(entry["description"]), hostnames=entry["hostname"], cfg=cfg
)
ret[det.name] = det
return ret
[docs]
def list_detectors(self) -> list[str]:
"""
List the names of all JFJ/Eiger detectors configured in the BEC client.
Returns:
list[str]: A list of detector names.
"""
detectors = self._get_jfj_eiger_config()
return list(detectors.keys())
[docs]
def ping_detector(self, detector_name: str) -> bool:
"""
Ping a JFJ/Eiger detector to check if it is reachable.
Args:
detector_name (str): The name of the detector to ping.
Returns:
bool: True if the detector is reachable, False otherwise.
"""
detectors = self._get_jfj_eiger_config()
if detector_name not in detectors:
raise ValueError(f"Detector '{detector_name}' not found in configuration.")
det = detectors[detector_name]
results = self._ping_many(det.hostnames)
table = Table(title=f"Ping results for detector '{detector_name}'")
table.add_column("Hostname", style="cyan", no_wrap=True)
table.add_column("Status", style="magenta")
for host, alive in results.items():
status = "[green]OK[/green]" if alive else "[red]DOWN[/red]"
table.add_row(host, status)
console = Console()
console.print(table)
def _ping_many(self, hosts: list[str], port=22, timeout=2, max_workers=None):
max_workers = max_workers or len(hosts)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
primed_ping = partial(self._ping, port=port, timeout=timeout)
pings = executor.map(primed_ping, hosts)
return dict(zip(hosts, pings))
def _ping(self, host: str, port=23, timeout=2): # telnet is port 23
address = (host, port)
try:
with socket.create_connection(address, timeout):
return True
except OSError:
return False
[docs]
def open_it_service_page(self):
"""Open the overview of IT services hosted by Science IT Infrastructure and Services for cSAXS."""
gui: BECGuiClient = bec.gui
dock_area = gui.new()
print("Opening IT service page in new dock...")
url = "https://metrics.psi.ch/d/saf8mxv/x12sa?orgId=1&from=now-24h&to=now&timezone=browser&var-receiver_hosts=sls-jfjoch-001.psi.ch&var-writer_hosts=xbl-daq-34.psi.ch&var-beamline=X12SA&var-slurm_partitions=csaxs&var-receiver_services=broker&var-writer_services=writer&refresh=15m"
# FIXME BEC WIDGETS v3
dock = dock_area.new()
wb = dock.new(widget=gui.available_widgets.WebsiteWidget)
wb.set_url(url)