Source code for csaxs_bec.bec_ipython_client.plugins.cSAXS.smaract


import builtins
import time

from bec_lib import bec_logger

# Logger initialization
logger = bec_logger.logger

# Pull BEC globals if present
if builtins.__dict__.get("bec") is not None:
    bec = builtins.__dict__.get("bec")
    dev = builtins.__dict__.get("dev")
    scans = builtins.__dict__.get("scans")

[docs] def umv(*args): return scans.umv(*args, relative=False)
[docs] class cSAXSInitSmaractStagesError(Exception): pass
[docs] class cSAXSInitSmaractStages: """ Runtime SmarAct utilities for referencing and moving to initial positions. This class no longer relies on static mappings. Instead, it: - discovers available devices from `list(dev.keys())` - reads the numeric channel/axis from each device's `user_parameter['bl_smar_stage']` - reads `init_position` from `user_parameter['init_position']` """ def __init__(self, client) -> None: self.client = client # ------------------------------ # Internal helpers (runtime-based) # ------------------------------ def _ensure_all_session_devices_enabled(self, selection: set | None = None, try_enable: bool = True): """ Ensure all session devices (or a selection) that define 'bl_smar_stage' are enabled. Parameters ---------- selection : set | None If provided, only devices in this set are considered. try_enable : bool If True, attempt to set device.enabled = True for devices that expose 'enabled' and are False. If False, only report status without changing it. Returns ------- dict { "enabled_now": [device_names enabled by this call], "already_enabled": [device_names already enabled or without 'enabled' attr], "failed": [device_names that could not be enabled], "inaccessible": [device_names not accessible] } """ enabled_now = [] already_enabled = [] failed = [] inaccessible = [] # Build axis map to restrict to SmarAct-based devices (same logic as other helpers) axis_map = self._build_session_axis_map(selection=selection) for dev_name in sorted(axis_map.keys()): try: d = self._get_device_object(dev_name) if d is None: inaccessible.append(dev_name) logger.warning(f"[cSAXS] Device {dev_name} not accessible.") continue # If device has no 'enabled' attribute, treat as already enabled/usable if not hasattr(d, "enabled"): already_enabled.append(dev_name) continue # If already enabled try: if getattr(d, "enabled"): already_enabled.append(dev_name) continue except Exception: # If reading enabled fails, treat as inaccessible for safety failed.append(dev_name) logger.warning(f"[cSAXS] Could not read 'enabled' for {dev_name}.") continue # Device exists and is disabled if try_enable: try: logger.info(f"[cSAXS] Enabling device {dev_name} (was disabled).") setattr(d, "enabled", True) # small delay to let device initialize if needed time.sleep(0.05) if getattr(d, "enabled"): enabled_now.append(dev_name) logger.info(f"[cSAXS] Device {dev_name} enabled.") else: failed.append(dev_name) logger.warning(f"[cSAXS] Device {dev_name} still disabled after enabling attempt.") except Exception as exc: failed.append(dev_name) logger.error(f"[cSAXS] Failed to enable {dev_name}: {exc}") else: # Not trying to enable, just report failed.append(dev_name) except Exception as exc: failed.append(dev_name) logger.error(f"[cSAXS] _ensure_all_session_devices_enabled error for {dev_name}: {exc}") return { "enabled_now": enabled_now, "already_enabled": already_enabled, "failed": failed, "inaccessible": inaccessible, } def _yesno(self, question: str, default: str = "y") -> bool: """ Use OMNYTools.yesno if available; otherwise default to 'yes' (or fallback to input()). """ try: if hasattr(self, "OMNYTools") and hasattr(self.OMNYTools, "yesno"): return self.OMNYTools.yesno(question, default) except Exception: pass # Fallback: default answer without interaction # (Safe default: 'y' proceeds; adjust if you want interactive input) logger.info(f"[cSAXS] (yesno fallback) {question} -> default '{default}'") return (default or "y").lower().startswith("y") def _get_user_param_safe(self, device_name: str, key: str): """ Safe access to device user parameters from current BEC session. """ try: return dev[device_name].user_parameter.get(key) except Exception: return None def _iter_session_devices(self): """ Yield device names available in current BEC session. """ if dev is None: return for name in list(dev.keys()): yield name def _build_session_axis_map(self, selection: set | None = None) -> dict: """ Build runtime axis map {device_name: channel} for devices that define 'bl_smar_stage'. If 'selection' is provided, restrict to names in selection. """ axis_map = {} missing = [] for name in self._iter_session_devices() or []: if selection is not None and name not in selection: continue ch = self._get_user_param_safe(name, "bl_smar_stage") if ch is None: missing.append(name) continue try: axis_map[name] = int(ch) except Exception: missing.append(name) if missing and selection is None: logger.info( "[cSAXS] Devices without 'bl_smar_stage' (ignored): " + ", ".join(sorted(missing)) ) return axis_map def _get_device_object(self, device_name: str): """ Return the live device object from BEC 'dev'. """ try: return getattr(dev, device_name) except Exception: return None # ------------------------------ # Public API # ------------------------------
[docs] def smaract_reference_stages(self, force: bool = False, devices_to_reference=None): """ Reference SmarAct stages using runtime discovery. Parameters ---------- force : bool, optional If True, re-reference ALL selected stages. If False (default), only reference stages that are currently NOT referenced. devices_to_reference : iterable of str or str, optional If provided, only these devices will be considered for referencing. If None, all devices in the current session that define 'bl_smar_stage' are considered. Behavior -------- - Runtime-based: reads axis channel from user_parameter['bl_smar_stage']. - If devices_to_reference is given → restrict referencing to those. - If force=False → skip devices already referenced. - If force=True → re-reference selected devices always. - Only newly referenced devices are passed to smaract_components_to_initial_position(devices_to_move=[...]) afterwards. Examples -------- Reference only stages that are NOT referenced yet (default) csaxs.smaract_reference_stages() Force re-reference of all stages csaxs.smaract_reference_stages(force=True) Reference only specific stages csaxs.smaract_reference_stages( devices_to_reference=["sl3trxi", "sl3trxo", "xbpm3x"] ) Reference selected stages and force re-referencing csaxs.smaract_reference_stages( devices_to_reference=["sl4trxi", "sl4trxo"], force=True ) Reference a single device csaxs.smaract_reference_stages( devices_to_reference="xbimtrx" ) Reference only the selected devices (skip already-referenced ones) csaxs.smaract_reference_stages( devices_to_reference=["sl3trxi", "sl4trxo", "sl5trxt"] ) Check referencing status of all stages csaxs.smaract_check_all_referenced() """ # Normalize selection if isinstance(devices_to_reference, str): devices_to_reference = [devices_to_reference] selection = set(devices_to_reference) if devices_to_reference else None # First: ensure all relevant devices are enabled before attempting referencing enable_report = self._ensure_all_session_devices_enabled(selection=selection, try_enable=True) if enable_report["failed"]: logger.warning( "[cSAXS] Some devices could not be enabled before referencing: " + ", ".join(sorted(enable_report["failed"])) ) if enable_report["inaccessible"]: logger.warning( "[cSAXS] Some devices were inaccessible before referencing: " + ", ".join(sorted(enable_report["inaccessible"])) ) # Build axis map for selected devices (or all devices present) axis_map = self._build_session_axis_map(selection=selection) if selection: unknown = sorted(list(selection - set(axis_map.keys()))) if unknown: print(f"Unknown devices requested or missing 'bl_smar_stage' (ignored): {unknown}") newly_referenced = [] already_referenced = [] failed = [] to_verify = [] # devices that need a final verification print("\nStarting SmarAct referencing...\n") for dev_name in sorted(axis_map.keys()): ch = axis_map[dev_name] d = self._get_device_object(dev_name) if d is None: print(f"{dev_name}: device not accessible, skipping.") failed.append(dev_name) continue # If device exposes 'enabled' and is False, skip (we already tried enabling above) try: if hasattr(d, "enabled") and not getattr(d, "enabled"): print(f"{dev_name}: device disabled, skipping.") failed.append(dev_name) continue except Exception: print(f"{dev_name}: could not read enabled state, skipping.") failed.append(dev_name) continue try: is_ref = d.controller.axis_is_referenced(ch) # Skip if already referenced and not forcing if is_ref and not force: print(f"{dev_name}: already referenced, skipping.") already_referenced.append(dev_name) continue # Start referencing print(f"{dev_name}: referencing axis...") d.controller.set_closed_loop_move_speed(ch, 1) d.controller.find_reference_mark(ch, 0, 1000, 1) time.sleep(0.1) # Add to list for final verification to_verify.append((dev_name, ch, d)) except Exception as e: print(f"Error referencing {dev_name} (axis {ch}): {e}") failed.append(dev_name) time.sleep(1.0) print("\nVerifying referencing state...\n") for dev_name, ch, d in to_verify: try: if d.controller.axis_is_referenced(ch): print(f"{dev_name}: successfully referenced.") newly_referenced.append(dev_name) else: print(f"{dev_name}: referencing FAILED.") failed.append(dev_name) except Exception as e: print(f"{dev_name}: verification error: {e}") failed.append(dev_name) # --- Summary --- print("\n--- Referencing summary ---") print(f"Newly referenced: {newly_referenced}") print(f"Already referenced (kept): {already_referenced}") print(f"Failed: {failed}") print("-----------------------------------------\n") # --- Move newly referenced only --- if newly_referenced: print("Moving newly referenced stages to initial positions...") self.smaract_components_to_initial_position(devices_to_move=newly_referenced) else: print("No newly referenced stages.")
[docs] def smaract_check_all_referenced(self): """ Check reference state for all SmarAct devices that define 'bl_smar_stage'. This now enables all relevant devices first (attempt), then performs the checks. """ # Attempt to enable all relevant devices first (do not force enabling if you prefer) enable_report = self._ensure_all_session_devices_enabled(selection=None, try_enable=True) if enable_report["enabled_now"]: print("Now enabled devices which were disabled before: " + ", ".join(sorted(enable_report["enabled_now"]))) if enable_report["failed"]: print("Could not enable: " + ", ".join(sorted(enable_report["failed"]))) if enable_report["inaccessible"]: print("Inaccessible: " + ", ".join(sorted(enable_report["inaccessible"]))) axis_map = self._build_session_axis_map(selection=None) for dev_name in sorted(axis_map.keys()): ch = axis_map[dev_name] d = self._get_device_object(dev_name) if d is None: print(f"{dev_name}: device not accessible or unsupported.") continue # Skip devices that expose 'enabled' and are False try: if hasattr(d, "enabled") and not getattr(d, "enabled"): print(f"{dev_name} (axis {ch}) is disabled; skipping reference check.") continue except Exception: print(f"{dev_name} (axis {ch}) enabled-state unknown; skipping.") continue try: if d.controller.axis_is_referenced(ch): print(f"{dev_name} (axis {ch}) is referenced.") else: print(f"{dev_name} (axis {ch}) is NOT referenced.") except Exception as e: print(f"Error checking {dev_name} (axis {ch}): {e}")
[docs] def smaract_components_to_initial_position(self, devices_to_move=None): """ Move selected (or all) SmarAct-based components to their configured init_position. Parameters ---------- devices_to_move : iterable of str or str, optional Specific device names to move (e.g. ["xbpm3x", "sl3trxi"]). If None, all devices in the current session that define 'bl_smar_stage' are considered. Behavior -------- - Runtime-based: uses user_parameter['bl_smar_stage'] (numeric channel) and 'init_position'. - Only axes that are referenced will be moved. - Unreferenced axes are skipped with a WARNING; the operation continues. - Devices missing `init_position` are skipped and listed. - At the end, a summary warns if some stages could not be moved because they were not referenced. """ # Normalize selection if isinstance(devices_to_move, str): devices_to_move = [devices_to_move] selection = set(devices_to_move) if devices_to_move else None # Resolve axis map based on selection axis_map = self._build_session_axis_map(selection=selection) unknown_requested = [] if selection: unknown_requested = sorted(list(selection - set(axis_map.keys()))) if unknown_requested: logger.warning( "[cSAXS] Requested devices unknown or missing 'bl_smar_stage': " + ", ".join(unknown_requested) ) # First confirmation: intent scope_desc = "all SmarAct-based components" if selection is None else "the selected SmarAct-based components" if not self._yesno( f"Do you want to move {scope_desc} to the init position as defined in the config file?", "y", ): return planned_moves = [] not_referenced = [] missing_params = [] inaccessible_devices = [] # --- Pre-check phase --- for dev_name in sorted(axis_map.keys()): d = self._get_device_object(dev_name) if d is None: logger.warning(f"[cSAXS] Device {dev_name} not accessible, skipping.") inaccessible_devices.append(dev_name) continue ch = axis_map[dev_name] try: # Reference check if not d.controller.axis_is_referenced(ch): not_referenced.append(dev_name) continue # Fetch init_position (from user parameters) init_pos = self._get_user_param_safe(dev_name, "init_position") if init_pos is None: missing_params.append(dev_name) continue planned_moves.append((dev_name, float(init_pos))) except Exception as exc: logger.error(f"[cSAXS] Error during pre-check for {dev_name}: {exc}") if not planned_moves: # Nothing to move—still summarize why. header = "\nNo motions planned. Summary of issues:" lines = [] if not_referenced: lines.append(" - Not referenced: " + ", ".join(sorted(not_referenced))) if missing_params: lines.append(" - Missing init_position: " + ", ".join(sorted(missing_params))) if inaccessible_devices: lines.append(" - Not accessible: " + ", ".join(sorted(inaccessible_devices))) if unknown_requested: lines.append(" - Unknown requested: " + ", ".join(sorted(unknown_requested))) if not lines: lines.append(" - (No eligible devices or nothing to do.)") print(header) for line in lines: print(line) logger.warning("[cSAXS] Nothing to do.") return # --- Summary table --- print("\nPlanned SmarAct motions to initial position:") print("-" * 60) print(f"{'Device':<35} {'Init position':>20}") print("-" * 60) for dev_name, init_pos in planned_moves: print(f"{dev_name:<35} {init_pos:>20.6g}") print("-" * 60) # Notes / diagnostics if selection is not None: print("\nNote: Only the following devices were requested to move:") print(", ".join(sorted(selection))) if unknown_requested: print("\nNote: The following requested devices are unknown and were ignored:") print(", ".join(unknown_requested)) if not_referenced: print("\nNote: The following devices are NOT referenced and will be skipped:") print(", ".join(sorted(not_referenced))) if missing_params: print("\nNote: The following devices have no init_position defined and will be skipped:") print(", ".join(sorted(missing_params))) if inaccessible_devices: print("\nNote: The following devices were not accessible and will be skipped:") print(", ".join(sorted(inaccessible_devices))) # Second confirmation: execution if not self._yesno("Proceed with the motions listed above?", "y"): logger.info("[cSAXS] Motion to initial position aborted by user.") return # --- Execution phase (SIMULTANEOUS MOTION) --- if umv is None: logger.error("[cSAXS] 'umv' is not available in this session.") return # Build a flat argument list: [dev1, pos1, dev2, pos2, ...] move_args = [] for dev_name, init_pos in planned_moves: d = self._get_device_object(dev_name) if d is None: logger.error(f"[cSAXS] Could not access {dev_name}, skipping.") continue move_args.append(d) move_args.append(init_pos) logger.info(f"[cSAXS] Preparing move: {dev_name} -> {init_pos}") if not move_args: logger.warning("[cSAXS] No valid devices left for simultaneous motion.") return # Trigger simultaneous move try: logger.info(f"[cSAXS] Starting simultaneous motion of {len(planned_moves)} devices.") umv(*move_args) # simultaneous move except Exception as exc: logger.error(f"[cSAXS] Simultaneous motion failed: {exc}") return logger.info("[cSAXS] Simultaneous SmarAct motion to initial positions completed.") # Final warning summary about unreferenced devices if not_referenced: logger.warning( "[cSAXS] Some stages were NOT moved because they are not referenced:\n" + ", ".join(sorted(not_referenced)) + "\nPlease reference these axes and re-run if needed." )
[docs] class cSAXSSmaract: def __init__(self, client) -> None: self.client = client def _get_user_param_safe(self, device_name: str, key: str): try: return dev[device_name].user_parameter.get(key) except Exception: return None
[docs] def fshn1in(self): """ Move fast shutter n1 to its 'in' position defined in user parameters. """ in_pos = self._get_user_param_safe("fast_shutter_n1_x", "in") if in_pos is None: logger.error("[cSAXS] No 'in' position defined for fast_shutter_n1_x.") return if umv is None: logger.error("[cSAXS] 'umv' is not available in this session.") return umv(dev.fast_shutter_n1_x, in_pos)