import builtins
import time
from bec_lib import bec_logger
# Logger initialization
logger = bec_logger.logger
# Pull BEC globals if present
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
[docs]
def umv(*args):
return scans.umv(*args, relative=False)
[docs]
class cSAXSInitSmaractStagesError(Exception):
pass
[docs]
class cSAXSInitSmaractStages:
"""
Runtime SmarAct utilities for referencing and moving to initial positions.
This class no longer relies on static mappings. Instead, it:
- discovers available devices from `list(dev.keys())`
- reads the numeric channel/axis from each device's `user_parameter['bl_smar_stage']`
- reads `init_position` from `user_parameter['init_position']`
"""
def __init__(self, client) -> None:
self.client = client
# ------------------------------
# Internal helpers (runtime-based)
# ------------------------------
def _ensure_all_session_devices_enabled(self, selection: set | None = None, try_enable: bool = True):
"""
Ensure all session devices (or a selection) that define 'bl_smar_stage' are enabled.
Parameters
----------
selection : set | None
If provided, only devices in this set are considered.
try_enable : bool
If True, attempt to set device.enabled = True for devices that expose 'enabled' and are False.
If False, only report status without changing it.
Returns
-------
dict
{
"enabled_now": [device_names enabled by this call],
"already_enabled": [device_names already enabled or without 'enabled' attr],
"failed": [device_names that could not be enabled],
"inaccessible": [device_names not accessible]
}
"""
enabled_now = []
already_enabled = []
failed = []
inaccessible = []
# Build axis map to restrict to SmarAct-based devices (same logic as other helpers)
axis_map = self._build_session_axis_map(selection=selection)
for dev_name in sorted(axis_map.keys()):
try:
d = self._get_device_object(dev_name)
if d is None:
inaccessible.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} not accessible.")
continue
# If device has no 'enabled' attribute, treat as already enabled/usable
if not hasattr(d, "enabled"):
already_enabled.append(dev_name)
continue
# If already enabled
try:
if getattr(d, "enabled"):
already_enabled.append(dev_name)
continue
except Exception:
# If reading enabled fails, treat as inaccessible for safety
failed.append(dev_name)
logger.warning(f"[cSAXS] Could not read 'enabled' for {dev_name}.")
continue
# Device exists and is disabled
if try_enable:
try:
logger.info(f"[cSAXS] Enabling device {dev_name} (was disabled).")
setattr(d, "enabled", True)
# small delay to let device initialize if needed
time.sleep(0.05)
if getattr(d, "enabled"):
enabled_now.append(dev_name)
logger.info(f"[cSAXS] Device {dev_name} enabled.")
else:
failed.append(dev_name)
logger.warning(f"[cSAXS] Device {dev_name} still disabled after enabling attempt.")
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] Failed to enable {dev_name}: {exc}")
else:
# Not trying to enable, just report
failed.append(dev_name)
except Exception as exc:
failed.append(dev_name)
logger.error(f"[cSAXS] _ensure_all_session_devices_enabled error for {dev_name}: {exc}")
return {
"enabled_now": enabled_now,
"already_enabled": already_enabled,
"failed": failed,
"inaccessible": inaccessible,
}
def _yesno(self, question: str, default: str = "y") -> bool:
"""
Use OMNYTools.yesno if available; otherwise default to 'yes' (or fallback to input()).
"""
try:
if hasattr(self, "OMNYTools") and hasattr(self.OMNYTools, "yesno"):
return self.OMNYTools.yesno(question, default)
except Exception:
pass
# Fallback: default answer without interaction
# (Safe default: 'y' proceeds; adjust if you want interactive input)
logger.info(f"[cSAXS] (yesno fallback) {question} -> default '{default}'")
return (default or "y").lower().startswith("y")
def _get_user_param_safe(self, device_name: str, key: str):
"""
Safe access to device user parameters from current BEC session.
"""
try:
return dev[device_name].user_parameter.get(key)
except Exception:
return None
def _iter_session_devices(self):
"""
Yield device names available in current BEC session.
"""
if dev is None:
return
for name in list(dev.keys()):
yield name
def _build_session_axis_map(self, selection: set | None = None) -> dict:
"""
Build runtime axis map {device_name: channel} for devices that define 'bl_smar_stage'.
If 'selection' is provided, restrict to names in selection.
"""
axis_map = {}
missing = []
for name in self._iter_session_devices() or []:
if selection is not None and name not in selection:
continue
ch = self._get_user_param_safe(name, "bl_smar_stage")
if ch is None:
missing.append(name)
continue
try:
axis_map[name] = int(ch)
except Exception:
missing.append(name)
if missing and selection is None:
logger.info(
"[cSAXS] Devices without 'bl_smar_stage' (ignored): " + ", ".join(sorted(missing))
)
return axis_map
def _get_device_object(self, device_name: str):
"""
Return the live device object from BEC 'dev'.
"""
try:
return getattr(dev, device_name)
except Exception:
return None
# ------------------------------
# Public API
# ------------------------------
[docs]
def smaract_reference_stages(self, force: bool = False, devices_to_reference=None):
"""
Reference SmarAct stages using runtime discovery.
Parameters
----------
force : bool, optional
If True, re-reference ALL selected stages.
If False (default), only reference stages that are currently NOT referenced.
devices_to_reference : iterable of str or str, optional
If provided, only these devices will be considered for referencing.
If None, all devices in the current session that define 'bl_smar_stage' are considered.
Behavior
--------
- Runtime-based: reads axis channel from user_parameter['bl_smar_stage'].
- If devices_to_reference is given → restrict referencing to those.
- If force=False → skip devices already referenced.
- If force=True → re-reference selected devices always.
- Only newly referenced devices are passed to
smaract_components_to_initial_position(devices_to_move=[...]) afterwards.
Examples
--------
Reference only stages that are NOT referenced yet (default)
csaxs.smaract_reference_stages()
Force re-reference of all stages
csaxs.smaract_reference_stages(force=True)
Reference only specific stages
csaxs.smaract_reference_stages(
devices_to_reference=["sl3trxi", "sl3trxo", "xbpm3x"]
)
Reference selected stages and force re-referencing
csaxs.smaract_reference_stages(
devices_to_reference=["sl4trxi", "sl4trxo"],
force=True
)
Reference a single device
csaxs.smaract_reference_stages(
devices_to_reference="xbimtrx"
)
Reference only the selected devices (skip already-referenced ones)
csaxs.smaract_reference_stages(
devices_to_reference=["sl3trxi", "sl4trxo", "sl5trxt"]
)
Check referencing status of all stages
csaxs.smaract_check_all_referenced()
"""
# Normalize selection
if isinstance(devices_to_reference, str):
devices_to_reference = [devices_to_reference]
selection = set(devices_to_reference) if devices_to_reference else None
# First: ensure all relevant devices are enabled before attempting referencing
enable_report = self._ensure_all_session_devices_enabled(selection=selection, try_enable=True)
if enable_report["failed"]:
logger.warning(
"[cSAXS] Some devices could not be enabled before referencing: "
+ ", ".join(sorted(enable_report["failed"]))
)
if enable_report["inaccessible"]:
logger.warning(
"[cSAXS] Some devices were inaccessible before referencing: "
+ ", ".join(sorted(enable_report["inaccessible"]))
)
# Build axis map for selected devices (or all devices present)
axis_map = self._build_session_axis_map(selection=selection)
if selection:
unknown = sorted(list(selection - set(axis_map.keys())))
if unknown:
print(f"Unknown devices requested or missing 'bl_smar_stage' (ignored): {unknown}")
newly_referenced = []
already_referenced = []
failed = []
to_verify = [] # devices that need a final verification
print("\nStarting SmarAct referencing...\n")
for dev_name in sorted(axis_map.keys()):
ch = axis_map[dev_name]
d = self._get_device_object(dev_name)
if d is None:
print(f"{dev_name}: device not accessible, skipping.")
failed.append(dev_name)
continue
# If device exposes 'enabled' and is False, skip (we already tried enabling above)
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name}: device disabled, skipping.")
failed.append(dev_name)
continue
except Exception:
print(f"{dev_name}: could not read enabled state, skipping.")
failed.append(dev_name)
continue
try:
is_ref = d.controller.axis_is_referenced(ch)
# Skip if already referenced and not forcing
if is_ref and not force:
print(f"{dev_name}: already referenced, skipping.")
already_referenced.append(dev_name)
continue
# Start referencing
print(f"{dev_name}: referencing axis...")
d.controller.set_closed_loop_move_speed(ch, 1)
d.controller.find_reference_mark(ch, 0, 1000, 1)
time.sleep(0.1)
# Add to list for final verification
to_verify.append((dev_name, ch, d))
except Exception as e:
print(f"Error referencing {dev_name} (axis {ch}): {e}")
failed.append(dev_name)
time.sleep(1.0)
print("\nVerifying referencing state...\n")
for dev_name, ch, d in to_verify:
try:
if d.controller.axis_is_referenced(ch):
print(f"{dev_name}: successfully referenced.")
newly_referenced.append(dev_name)
else:
print(f"{dev_name}: referencing FAILED.")
failed.append(dev_name)
except Exception as e:
print(f"{dev_name}: verification error: {e}")
failed.append(dev_name)
# --- Summary ---
print("\n--- Referencing summary ---")
print(f"Newly referenced: {newly_referenced}")
print(f"Already referenced (kept): {already_referenced}")
print(f"Failed: {failed}")
print("-----------------------------------------\n")
# --- Move newly referenced only ---
if newly_referenced:
print("Moving newly referenced stages to initial positions...")
self.smaract_components_to_initial_position(devices_to_move=newly_referenced)
else:
print("No newly referenced stages.")
[docs]
def smaract_check_all_referenced(self):
"""
Check reference state for all SmarAct devices that define 'bl_smar_stage'.
This now enables all relevant devices first (attempt), then performs the checks.
"""
# Attempt to enable all relevant devices first (do not force enabling if you prefer)
enable_report = self._ensure_all_session_devices_enabled(selection=None, try_enable=True)
if enable_report["enabled_now"]:
print("Now enabled devices which were disabled before: " + ", ".join(sorted(enable_report["enabled_now"])))
if enable_report["failed"]:
print("Could not enable: " + ", ".join(sorted(enable_report["failed"])))
if enable_report["inaccessible"]:
print("Inaccessible: " + ", ".join(sorted(enable_report["inaccessible"])))
axis_map = self._build_session_axis_map(selection=None)
for dev_name in sorted(axis_map.keys()):
ch = axis_map[dev_name]
d = self._get_device_object(dev_name)
if d is None:
print(f"{dev_name}: device not accessible or unsupported.")
continue
# Skip devices that expose 'enabled' and are False
try:
if hasattr(d, "enabled") and not getattr(d, "enabled"):
print(f"{dev_name} (axis {ch}) is disabled; skipping reference check.")
continue
except Exception:
print(f"{dev_name} (axis {ch}) enabled-state unknown; skipping.")
continue
try:
if d.controller.axis_is_referenced(ch):
print(f"{dev_name} (axis {ch}) is referenced.")
else:
print(f"{dev_name} (axis {ch}) is NOT referenced.")
except Exception as e:
print(f"Error checking {dev_name} (axis {ch}): {e}")
[docs]
def smaract_components_to_initial_position(self, devices_to_move=None):
"""
Move selected (or all) SmarAct-based components to their configured init_position.
Parameters
----------
devices_to_move : iterable of str or str, optional
Specific device names to move (e.g. ["xbpm3x", "sl3trxi"]).
If None, all devices in the current session that define 'bl_smar_stage' are considered.
Behavior
--------
- Runtime-based: uses user_parameter['bl_smar_stage'] (numeric channel) and 'init_position'.
- Only axes that are referenced will be moved.
- Unreferenced axes are skipped with a WARNING; the operation continues.
- Devices missing `init_position` are skipped and listed.
- At the end, a summary warns if some stages could not be moved because they were not referenced.
"""
# Normalize selection
if isinstance(devices_to_move, str):
devices_to_move = [devices_to_move]
selection = set(devices_to_move) if devices_to_move else None
# Resolve axis map based on selection
axis_map = self._build_session_axis_map(selection=selection)
unknown_requested = []
if selection:
unknown_requested = sorted(list(selection - set(axis_map.keys())))
if unknown_requested:
logger.warning(
"[cSAXS] Requested devices unknown or missing 'bl_smar_stage': "
+ ", ".join(unknown_requested)
)
# First confirmation: intent
scope_desc = "all SmarAct-based components" if selection is None else "the selected SmarAct-based components"
if not self._yesno(
f"Do you want to move {scope_desc} to the init position as defined in the config file?",
"y",
):
return
planned_moves = []
not_referenced = []
missing_params = []
inaccessible_devices = []
# --- Pre-check phase ---
for dev_name in sorted(axis_map.keys()):
d = self._get_device_object(dev_name)
if d is None:
logger.warning(f"[cSAXS] Device {dev_name} not accessible, skipping.")
inaccessible_devices.append(dev_name)
continue
ch = axis_map[dev_name]
try:
# Reference check
if not d.controller.axis_is_referenced(ch):
not_referenced.append(dev_name)
continue
# Fetch init_position (from user parameters)
init_pos = self._get_user_param_safe(dev_name, "init_position")
if init_pos is None:
missing_params.append(dev_name)
continue
planned_moves.append((dev_name, float(init_pos)))
except Exception as exc:
logger.error(f"[cSAXS] Error during pre-check for {dev_name}: {exc}")
if not planned_moves:
# Nothing to move—still summarize why.
header = "\nNo motions planned. Summary of issues:"
lines = []
if not_referenced:
lines.append(" - Not referenced: " + ", ".join(sorted(not_referenced)))
if missing_params:
lines.append(" - Missing init_position: " + ", ".join(sorted(missing_params)))
if inaccessible_devices:
lines.append(" - Not accessible: " + ", ".join(sorted(inaccessible_devices)))
if unknown_requested:
lines.append(" - Unknown requested: " + ", ".join(sorted(unknown_requested)))
if not lines:
lines.append(" - (No eligible devices or nothing to do.)")
print(header)
for line in lines:
print(line)
logger.warning("[cSAXS] Nothing to do.")
return
# --- Summary table ---
print("\nPlanned SmarAct motions to initial position:")
print("-" * 60)
print(f"{'Device':<35} {'Init position':>20}")
print("-" * 60)
for dev_name, init_pos in planned_moves:
print(f"{dev_name:<35} {init_pos:>20.6g}")
print("-" * 60)
# Notes / diagnostics
if selection is not None:
print("\nNote: Only the following devices were requested to move:")
print(", ".join(sorted(selection)))
if unknown_requested:
print("\nNote: The following requested devices are unknown and were ignored:")
print(", ".join(unknown_requested))
if not_referenced:
print("\nNote: The following devices are NOT referenced and will be skipped:")
print(", ".join(sorted(not_referenced)))
if missing_params:
print("\nNote: The following devices have no init_position defined and will be skipped:")
print(", ".join(sorted(missing_params)))
if inaccessible_devices:
print("\nNote: The following devices were not accessible and will be skipped:")
print(", ".join(sorted(inaccessible_devices)))
# Second confirmation: execution
if not self._yesno("Proceed with the motions listed above?", "y"):
logger.info("[cSAXS] Motion to initial position aborted by user.")
return
# --- Execution phase (SIMULTANEOUS MOTION) ---
if umv is None:
logger.error("[cSAXS] 'umv' is not available in this session.")
return
# Build a flat argument list: [dev1, pos1, dev2, pos2, ...]
move_args = []
for dev_name, init_pos in planned_moves:
d = self._get_device_object(dev_name)
if d is None:
logger.error(f"[cSAXS] Could not access {dev_name}, skipping.")
continue
move_args.append(d)
move_args.append(init_pos)
logger.info(f"[cSAXS] Preparing move: {dev_name} -> {init_pos}")
if not move_args:
logger.warning("[cSAXS] No valid devices left for simultaneous motion.")
return
# Trigger simultaneous move
try:
logger.info(f"[cSAXS] Starting simultaneous motion of {len(planned_moves)} devices.")
umv(*move_args) # simultaneous move
except Exception as exc:
logger.error(f"[cSAXS] Simultaneous motion failed: {exc}")
return
logger.info("[cSAXS] Simultaneous SmarAct motion to initial positions completed.")
# Final warning summary about unreferenced devices
if not_referenced:
logger.warning(
"[cSAXS] Some stages were NOT moved because they are not referenced:\n"
+ ", ".join(sorted(not_referenced))
+ "\nPlease reference these axes and re-run if needed."
)
[docs]
class cSAXSSmaract:
def __init__(self, client) -> None:
self.client = client
def _get_user_param_safe(self, device_name: str, key: str):
try:
return dev[device_name].user_parameter.get(key)
except Exception:
return None
[docs]
def fshn1in(self):
"""
Move fast shutter n1 to its 'in' position defined in user parameters.
"""
in_pos = self._get_user_param_safe("fast_shutter_n1_x", "in")
if in_pos is None:
logger.error("[cSAXS] No 'in' position defined for fast_shutter_n1_x.")
return
if umv is None:
logger.error("[cSAXS] 'umv' is not available in this session.")
return
umv(dev.fast_shutter_n1_x, in_pos)