Source code for csaxs_bec.devices.epics.eps

"""EPS module for cSAXS beamline: defines the EPS device with its components and methods."""

# fmt: off
# Disable Black formatting for this file to preserve an easier readable structure for the component definitions.

# pylint: disable=line-too-long
from __future__ import annotations

import time

from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd_devices import PSIDeviceBase

logger = bec_logger.logger

# ---------------------------
# Registry: sections/channels
# ---------------------------


[docs] class EPSSubDevices(Device): """Base class for EPS sub-device components (e.g. alarms, valves, shutters). with common methods if needed."""
[docs] def describe(self) -> dict: desc = super().describe() for walk in self.walk_signals(): if walk.item.attr_name not in desc: desc[walk.item.attr_name] = walk.item.describe() return desc
[docs] class EPSAlarms(EPSSubDevices): """EPS alarms at the cSAXS beamline.""" eps_alarm_cnt = Cpt(EpicsSignalRO, read_pv="X12SA-EPS-PLC:AlarmCnt_EPS", add_prefix=("",), name="eps_alarm_cnt", kind=Kind.omitted, doc="X12SA EPS Alarm count", auto_monitor=True, labels={"alarm"}) mis_alarm_cnt = Cpt(EpicsSignalRO, read_pv="ARS00-MIS-PLC-01:AlarmCnt_Frontends", add_prefix=("",), name="mis_alarm_cnt", kind=Kind.omitted, doc="FrontEnd MIS Alarm count", auto_monitor=True, labels={"alarm"})
[docs] class ValvesFrontend(EPSSubDevices): """Valves frontend at the cSAXS beamline.""" fe_vvpg_0000 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-0000:PLC_OPEN", add_prefix=("",), name="fevvpg0000", kind=Kind.omitted, doc="FE-VVPG-0000", auto_monitor=True, labels={"valve"}) fe_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-1010:PLC_OPEN", add_prefix=("",), name="fevvpg1010", kind=Kind.omitted, doc="FE-VVPG-1010", auto_monitor=True, labels={"valve"}) fe_vvfv_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVFV-2010:PLC_OPEN", add_prefix=("",), name="fevvfv2010", kind=Kind.omitted, doc="FE-VVFV-2010", auto_monitor=True, labels={"valve"}) fe_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-VVPG-2010:PLC_OPEN", add_prefix=("",), name="fevvpg2010", kind=Kind.omitted, doc="FE-VVPG-2010", auto_monitor=True, labels={"valve"})
[docs] class ValvesOptics(EPSSubDevices): """Valves at the optics hutch.""" op_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-1010:PLC_OPEN", add_prefix=("",), name="opvvpg1010", kind=Kind.omitted, doc="OP-VVPG-1010", auto_monitor=True, labels={"valve"}) op_vvpg_2010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-2010:PLC_OPEN", add_prefix=("",), name="opvvpg2010", kind=Kind.omitted, doc="OP-VVPG-2010", auto_monitor=True, labels={"valve"}) op_vvpg_3010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3010:PLC_OPEN", add_prefix=("",), name="opvvpg3010", kind=Kind.omitted, doc="OP-VVPG-3010", auto_monitor=True, labels={"valve"}) op_vvpg_3020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-3020:PLC_OPEN", add_prefix=("",), name="opvvpg3020", kind=Kind.omitted, doc="OP-VVPG-3020", auto_monitor=True, labels={"valve"}) op_vvpg_4010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-4010:PLC_OPEN", add_prefix=("",), name="opvvpg4010", kind=Kind.omitted, doc="OP-VVPG-4010", auto_monitor=True, labels={"valve"}) op_vvpg_5010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-5010:PLC_OPEN", add_prefix=("",), name="opvvpg5010", kind=Kind.omitted, doc="OP-VVPG-5010", auto_monitor=True, labels={"valve"}) op_vvpg_6010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-6010:PLC_OPEN", add_prefix=("",), name="opvvpg6010", kind=Kind.omitted, doc="OP-VVPG-6010", auto_monitor=True, labels={"valve"}) op_vvpg_7010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-VVPG-7010:PLC_OPEN", add_prefix=("",), name="opvvpg7010", kind=Kind.omitted, doc="OP-VVPG-7010", auto_monitor=True, labels={"valve"})
[docs] class ValvesEndstation(EPSSubDevices): """Endstation valves at the cSAXS beamline.""" es_vvpg_1010 = Cpt(EpicsSignalRO, read_pv="X12SA-ES-VVPG-1010:PLC_OPEN", add_prefix=("",), name="esvvpg1010", kind=Kind.omitted, doc="ES-VVPG-1010", auto_monitor=True, labels={"valve"})
[docs] class ShuttersFrontend(EPSSubDevices): """Shutters frontend.""" fe_psh1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-PSH1-EMLS-0010:OPEN", add_prefix=("",), name="fepsh1", kind=Kind.omitted, doc="FE-PSH1-EMLS-0010", auto_monitor=True, labels={"shutter"}) fe_sto1 = Cpt(EpicsSignalRO, read_pv="X12SA-FE-STO1-EMLS-0010:OPEN", add_prefix=("",), name="festo1", kind=Kind.omitted, doc="FE-STO1-EMLS-0010", auto_monitor=True, labels={"shutter"})
[docs] class ShuttersEndstation(EPSSubDevices): """Shutters at the endstation.""" es_psh17010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EMLS-7010:OPEN", add_prefix=("",), name="espsh17010", kind=Kind.omitted, doc="OP-PSH1-EMLS-7010", auto_monitor=True, labels={"shutter"})
[docs] class DMMMonochromator(EPSSubDevices): """DMM monochromator signals at the cSAXS beamline.""" dmm_temp_surface_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3010:TEMP", add_prefix=("",), name="dmm_temp_surface_1", kind=Kind.omitted, doc="DMM Temp Surface 1", auto_monitor=True, labels={"temp"}) dmm_temp_surface_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3020:TEMP", add_prefix=("",), name="dmm_temp_surface_2", kind=Kind.omitted, doc="DMM Temp Surface 2", auto_monitor=True, labels={"temp"}) dmm_temp_shield_1_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3030:TEMP", add_prefix=("",), name="dmm_temp_shield_1_disaster", kind=Kind.omitted, doc="DMM Temp Shield 1 (disaster)", auto_monitor=True, labels={"temp"}) dmm_temp_shield_2_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-ETTC-3040:TEMP", add_prefix=("",), name="dmm_temp_shield_2_disaster", kind=Kind.omitted, doc="DMM Temp Shield 2 (disaster)", auto_monitor=True, labels={"temp"}) dmm_translation_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3010:THRU", add_prefix=("",), name="dmm_translation_thru", kind=Kind.omitted, doc="DMM Translation ThruPos", auto_monitor=True, labels={"switch"}) dmm_translation_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3020:IN", add_prefix=("",), name="dmm_translation_in", kind=Kind.omitted, doc="DMM Translation InPos", auto_monitor=True, labels={"switch"}) dmm_bragg_thru = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3030:THRU", add_prefix=("",), name="dmm_bragg_thru", kind=Kind.omitted, doc="DMM Bragg ThruPos", auto_monitor=True, labels={"switch"}) dmm_bragg_in = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMLS-3040:IN", add_prefix=("",), name="dmm_bragg_in", kind=Kind.omitted, doc="DMM Bragg InPos", auto_monitor=True, labels={"switch"}) dmm_heater_fault_xtal_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3050:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_1", kind=Kind.omitted, doc="DMM Heater Fault XTAL 1", auto_monitor=True, labels={"fault"}) dmm_heater_fault_xtal_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3060:SWITCH", add_prefix=("",), name="dmm_heater_fault_xtal_2", kind=Kind.omitted, doc="DMM Heater Fault XTAL 2", auto_monitor=True, labels={"fault"}) dmm_heater_fault_support_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM-EMSW-3070:SWITCH", add_prefix=("",), name="dmm_heater_fault_support_1", kind=Kind.omitted, doc="DMM Heater Fault Support 1", auto_monitor=True, labels={"fault"}) dmm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:ENERGY-GET", add_prefix=("",), name="dmm_energy", kind=Kind.omitted, doc="DMM Energy", auto_monitor=True, labels={"energy"}) dmm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:POSITION", add_prefix=("",), name="dmm_position", kind=Kind.omitted, doc="DMM Position", auto_monitor=True, labels={"string"}) dmm_stripe = Cpt(EpicsSignalRO, read_pv="X12SA-OP-DMM1:STRIPE", add_prefix=("",), name="dmm_stripe", kind=Kind.omitted, doc="DMM Stripe", auto_monitor=True, labels={"string"})
[docs] class CCMMonochromator(EPSSubDevices): """CCM monochromator signals at the cSAXS beamline.""" ccm_temp_crystal = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4010:TEMP", add_prefix=("",), name="ccm_temp_crystal", kind=Kind.omitted, doc="CCM Temp Crystal", auto_monitor=True, labels={"temp"}) ccm_temp_shield_disaster = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-ETTC-4020:TEMP", add_prefix=("",), name="ccm_temp_shield_disaster", kind=Kind.omitted, doc="CCM Temp Shield (disaster)", auto_monitor=True, labels={"temp"}) ccm_heater_fault_1 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4010:SWITCH", add_prefix=("",), name="ccm_heater_fault_1", kind=Kind.omitted, doc="CCM Heater Fault 1", auto_monitor=True, labels={"fault"}) ccm_heater_fault_2 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4020:SWITCH", add_prefix=("",), name="ccm_heater_fault_2", kind=Kind.omitted, doc="CCM Heater Fault 2", auto_monitor=True, labels={"fault"}) ccm_heater_fault_3 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM-EMSW-4030:SWITCH", add_prefix=("",), name="ccm_heater_fault_3", kind=Kind.omitted, doc="CCM Heater Fault 3", auto_monitor=True, labels={"fault"}) ccm_energy = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:ENERGY-GET", add_prefix=("",), name="ccm_energy", kind=Kind.omitted, doc="CCM Energy", auto_monitor=True, labels={"energy"}) ccm_position = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CCM1:POSITION", add_prefix=("",), name="ccm_position", kind=Kind.omitted, doc="CCM Position", auto_monitor=True, labels={"string"})
[docs] class CoolingWater(EPSSubDevices): """Cooling water signals at the cSAXS beamline.""" op_sl1_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL1-EFSW-2010:FLOW", add_prefix=("",), name="op_sl1_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL1-EFSW-2010", auto_monitor=True, labels={"flow"}) op_sl2_efsw_2010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL2-EFSW-2010:FLOW", add_prefix=("",), name="op_sl2_efsw_2010_flow", kind=Kind.omitted, doc="OP-SL2-EFSW-2010", auto_monitor=True, labels={"flow"}) op_eb1_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5010:FLOW", add_prefix=("",), name="op_eb1_efsw_5010_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5010", auto_monitor=True, labels={"flow"}) op_eb1_efsw_5020_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-EB1-EFSW-5020:FLOW", add_prefix=("",), name="op_eb1_efsw_5020_flow", kind=Kind.omitted, doc="OP-EB1-EFSW-5020", auto_monitor=True, labels={"flow"}) op_sl3_efsw_5010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-SL3-EFSW-5010:FLOW", add_prefix=("",), name="op_sl3_efsw_5010_flow", kind=Kind.omitted, doc="OP-SL3-EFSW-5010", auto_monitor=True, labels={"flow"}) op_kb_efsw_6010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-KB-EFSW-6010:FLOW", add_prefix=("",), name="op_kb_efsw_6010_flow", kind=Kind.omitted, doc="OP-KB-EFSW-6010", auto_monitor=True, labels={"flow"}) op_psh1_efsw_7010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-OP-PSH1-EFSW-7010:FLOW", add_prefix=("",), name="op_psh1_efsw_7010_flow", kind=Kind.omitted, doc="OP-PSH1-EFSW-7010", auto_monitor=True, labels={"flow"}) es_eb2_efsw_1010_flow = Cpt(EpicsSignalRO, read_pv="X12SA-ES-EB2-EFSW-1010:FLOW", add_prefix=("",), name="es_eb2_efsw_1010_flow", kind=Kind.omitted, doc="ES-EB2-EFSW-1010", auto_monitor=True, labels={"flow"}) op_cs_ecvw_0010 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0010:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0010", kind=Kind.omitted, doc="OP-CS-ECVW-0010", auto_monitor=True, labels={"valve"}) op_cs_ecvw_0020 = Cpt(EpicsSignalRO, read_pv="X12SA-OP-CS-ECVW-0020:PLC_OPEN", add_prefix=("",), name="op_cs_ecvw_0020", kind=Kind.omitted, doc="OP-CS-ECVW-0020", auto_monitor=True, labels={"valve"})
[docs] class EPS(PSIDeviceBase): """EPS device for the cSAXS beamline.""" USER_ACCESS = [ "show_all", "water_cooling_op", ] alarms = Cpt(EPSAlarms, name="alarms", doc="EPS Alarms") valves_frontend = Cpt(ValvesFrontend, name="valves_frontend", doc="Valves Frontend") valves_optics = Cpt(ValvesOptics, name="valves_optics", doc="Valves Optics Hutch") valves_es = Cpt(ValvesEndstation, name="valves_es", doc="Valves ES Hutch") shutters_frontend = Cpt(ShuttersFrontend, name="shutters_frontend", doc="Shutters Frontend") shutters_es = Cpt(ShuttersEndstation, name="shutters_es", doc="Shutters Endstation") dmm_monochromator = Cpt(DMMMonochromator, name="dmm_monochromator", doc="DMM Monochromator") ccm_monochromator = Cpt(CCMMonochromator, name="ccm_monochromator", doc="CCM Monochromator") cooling_water = Cpt(CoolingWater, name="cooling_water", doc="Cooling Water") # Acknowledgment signals for PLC communication (if needed for future use) ackerr = Cpt(EpicsSignal, read_pv="X12SA-EPS-PLC:ACKERR-REQUEST", add_prefix=("",), name="ackerr", kind=Kind.omitted, doc="ACKERR request - OP-CS-ECVW-0020", auto_monitor=True, labels={"request"}) request = Cpt(EpicsSignal, read_pv="X12SA-OP-CS-ECVW:PLC_REQUEST", add_prefix=("",), name="op_cs_ecvw_request", kind=Kind.omitted, doc="PLC request - OP-CS-ECVW-PLC_REQUEST", auto_monitor=True, labels={"request"}) def _notify(self, msg: str, show_as_client_msg: bool = True): """Utility method to print a message, and optionally send it to the client UI if it should be shown also as a client message.""" try: if show_as_client_msg: self.device_manager.connector.send_client_info(msg, scope="", show_asap=True) else: print(msg) except Exception: logger.error(f"Failed to send client message, falling back to print: {msg}") print(str(msg)) # ---------------------------------------------------------- # Water cooling operation # ----------------------------------------------------------
[docs] def safe_get(self, sig, default=None): """Helper method to safely get a signal value, returning a default if there's an error.""" try: return sig.get() except Exception as ex: logger.warning(f"Failed to get signal {sig.pvname}: {ex}") return default
[docs] def water_cooling_op(self): """ Open ECVW valves, reset EPS alarms, monitor for 20s, then ensure stability (valves remain open) for 10s. All messages sent to client. """ POLL_PERIOD = 2 TIMEOUT = 20 STABILITY = 15 self._notify("=== Water Cooling Operation ===") # --- Signals --- eps_alarm_sig = self.alarms.eps_alarm_cnt ackerr = self.ackerr request = self.request valves = [self.cooling_water.op_cs_ecvw_0010, self.cooling_water.op_cs_ecvw_0020] # Flow channels list extracted from CHANNELS flow_items = [walk.item for walk in self.cooling_water.walk_signals() if "flow" in walk.item._ophyd_labels_] # --- Step 1: EPS alarm reset --- alarm_value = self.safe_get(eps_alarm_sig, 0) if alarm_value and alarm_value > 0: self._notify(f"[WaterCooling] EPS alarms present ({alarm_value}) → resetting…") try: ackerr.put(1) except Exception as ex: self._notify(f"[WaterCooling] WARNING: ACKERR write failed: {ex}") time.sleep(0.3) else: self._notify("[WaterCooling] No EPS alarms detected.") # --- Step 2: Issue open request --- self._notify("[WaterCooling] Sending cooling-valve OPEN request…") try: request.put(1) except Exception as ex: self._notify(f"[WaterCooling] ERROR: Failed to send OPEN request: {ex}") return False # --- Step 3: Monitoring loop (clean client table output) --- start = time.time() end = start + TIMEOUT stable_until = None # Print (server-side) header once print("Monitoring valves and flow sensors...") print(f" Valves: {valves[0].attr_name[-4:]}, {valves[1].attr_name[-4:]}") print(f" Note: stability requires valves to remain OPEN for {STABILITY} seconds.") # One table header to the client (via device manager) # Fixed-width columns for alignment in monospaced UI table_header = f"{'Time':>6} | {'Valves':<21} | {'Flows (OK/FAIL/N/A)':<20}" self._notify(table_header) def snapshot(): # Valve snapshot v_states = [self.safe_get(v, None) for v in valves] v1 = f"{valves[0].attr_name[-4:]}=" + ("OPEN " if v_states[0] is True or v_states[0] == 1 else "CLOSED" if v_states[0] is False or v_states[0] == 0 else "N/A ") v2 = f"{valves[1].attr_name[-4:]}=" + ("OPEN " if v_states[1] is True or v_states[1] == 1 else "CLOSED" if v_states[1] is False or v_states[1] == 0 else "N/A ") # 2 valves with a single space between => width ~ 21 valve_str = f"{v1} {v2}" # Flow summary: OK/FAIL/N/A counts (compact) flow_states = [] for fsig in flow_items: fval = self.safe_get(fsig, None) flow_states.append(True if fval in (1, True) else False if fval in (0, False) else None) ok = sum(1 for f in flow_states if f is True) fail = sum(1 for f in flow_states if f is False) na = sum(1 for f in flow_states if f is None) flow_summary = f"{ok:>2} / {fail:>2} / {na:>2}" return v_states, valve_str, flow_summary while True: # TODO Consider adding a timeout to avoid infinite loop. now = time.time() elapsed = int(now - start) if now > end: # One last line to client v_states, valves_s, flows_s = snapshot() self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}") print("→ TIMEOUT: Cooling valves failed to remain OPEN.") return False # Live snapshot v_states, valves_s, flows_s = snapshot() # Exactly one concise line to client per cycle self._notify(f"{elapsed:>6}s | {valves_s:<21} | {flows_s:<20}") both_open = all(s is not None and bool(s) for s in v_states) if both_open: if stable_until is None: stable_until = now + STABILITY print(f"[WaterCooling] Both valves OPEN → starting {STABILITY}s stability window…") else: if now >= stable_until: print("→ SUCCESS: Valves remained OPEN during stability window.") return True else: if stable_until is not None: print("[WaterCooling] Valve closed again → restarting stability window.") stable_until = None time.sleep(POLL_PERIOD)
def show_all(self): red = "\x1b[91m" green = "\x1b[92m" white = "\x1b[0m" bold = "\x1b[1m" cyan = "\x1b[96m" # ---- New: enum maps for numeric -> string rendering ---- POSITION_ENUM = {0: "out of beam", 1: "in beam"} STRIPE_ENUM = {0: "Stripe 1 W/B4C", 1: "Stripe 2 NiV/B4C"} POSITION_ATTRS = {self.dmm_monochromator.dmm_position.attr_name, self.ccm_monochromator.ccm_position.attr_name} STRIPE_ATTRS = {self.dmm_monochromator.dmm_stripe.attr_name} def is_bool_like(v): return isinstance(v, (bool, int)) and v in (0, 1, True, False) # ---- Changed: accept attr in formatter so we can apply enum mapping ---- def fmt_value(value: any, signal: EpicsSignalRO): if value is None: return f"{red}MISSING{white}" attr = signal.attr_name # ---------- Explicit enum mappings by attribute ---------- if attr in POSITION_ATTRS: # Position comes as numeric 0/1 try: iv = int(value) return POSITION_ENUM.get(iv, f"{iv}") except Exception: # Fallback if it’s already a string or unexpected return f"{value}" if attr in STRIPE_ATTRS: # Stripe comes as numeric 0/1 try: iv = int(value) return STRIPE_ENUM.get(iv, f"{iv}") except Exception: return f"{value}" # ------------------- TEMPERATURE ------------------- if "temp" in signal._ophyd_labels_ and isinstance(value, (int, float)): return f"{value:.1f}" # ------------------- ENERGY ------------------------ if "energy" in signal._ophyd_labels_ and isinstance(value, (int, float)): return f"{value:.4f}" # ------------------- STRINGS ----------------------- if "string" in signal._ophyd_labels_ or "position" in signal._ophyd_labels_: # For other strings, just echo the value return f"{value}" # ------------------- SWITCH (ACTIVE/INACTIVE) ------ if "switch" in signal._ophyd_labels_ and is_bool_like(value): return f"{green+'ACTIVE'+white if value else red+'INACTIVE'+white}" # ------------------- FAULT (OK/FAULT) -------------- if "fault" in signal._ophyd_labels_ and is_bool_like(value): return f"{green+'OK'+white if not value else red+'FAULT'+white}" # ------------------- VALVE/SHUTTER ----------------- if ("valve" in signal._ophyd_labels_ or "shutter" in signal._ophyd_labels_) and is_bool_like(value): return f"{green+'OPEN'+white if value else red+'CLOSED'+white}" # ------------------- FLOW (OK/FAIL) ---------------- if "flow" in signal._ophyd_labels_ and is_bool_like(value): return f"{green}OK{white}" if bool(value) else f"{red}FAIL{white}" # ------------------- FALLBACK ----------------------- return f"{value}" # ------------------- PRINT START --------------------- print(f"{bold}X12SA EPS status{white}") for name, component in self._sig_attrs.items(): sub_device = getattr(self, name) rows = [] # Only print sub-devices, not individual request signals if not isinstance(sub_device, Device): continue print(f"\n{bold}{component.doc}{white}") for sub_walk in sub_device.walk_components(): cpt: Cpt = sub_walk.item it: EpicsSignalRO = getattr(sub_device, cpt.attr) val = self.safe_get(it) rows.append((cpt.doc, val, it)) label_width = max(32, *(len(label) for (label, _, _) in rows)) for label, value, it in rows: fv = fmt_value(value, it) # <-- pass attr to formatter print(f" - {label:<{label_width}} {fv}") if sub_device.attr_name == "cooling_water": v1 = self.safe_get(self.cooling_water.op_cs_ecvw_0010) v2 = self.safe_get(self.cooling_water.op_cs_ecvw_0020) def closed(v): return is_bool_like(v) and not bool(v) if closed(v1) and closed(v2): print(f"\n{cyan}Hint:{white} Both water cooling valves are CLOSED.\n" f"You can open them using: {bold}dev.x12saEPS.water_cooling_op(){white}")
# fmt: on # ---------------------------------------------------------- # Consistency report # ---------------------------------------------------------- # def consistency_report(self, *, verbose=True): # missing = [] # dupes = [] # seen = {} # for sub_device in self.walk_components(): # section = sub_device.name # for walk in sub_device.walk_components(): # cpt: Cpt = walk.ancestors[-1] # it: EpicsSignalRO = walk.item # if not hasattr(self, it["attr"]): # missing.append((section, it["attr"], it["label"], it["pv"])) # pv = it["pv"] # if pv in seen: # dupes.append((pv, seen[pv], (section, it["attr"], it["label"]))) # else: # seen[pv] = (section, it["attr"], it["label"]) # if verbose: # print("=== Consistency Report ===") # if missing: # print("\nMissing attributes:") # for sec, a, lbl, pv in missing: # print(f" - [{sec}] {a} {lbl} pv={pv}") # else: # print("\nNo missing attributes.") # if dupes: # print("\nDuplicate PVs:") # for pv, f1, f2 in dupes: # print(f" {pv} → {f1} AND {f2}") # else: # print("\nNo duplicate PVs.") # return {"missing_attrs": missing, "duplicate_pvs": dupes}