Source code for csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools

import builtins
import datetime
import fcntl
import os
import socket
import subprocess
import sys
import termios
import threading
import time
import tty
from pathlib import Path

import epics
import numpy as np
from bec_lib import bec_logger
from rich import box
from rich.console import Console
from rich.table import Table

logger = bec_logger.logger

if builtins.__dict__.get("bec") is not None:
    bec = builtins.__dict__.get("bec")
    dev = builtins.__dict__.get("dev")
    scans = builtins.__dict__.get("scans")


[docs] def umv(*args): return scans.umv(*args, relative=False)
[docs] def umvr(*args): return scans.umv(*args, relative=True)
[docs] class OMNYToolsError(Exception): pass
[docs] class OMNYTools: HEADER = "\033[95m" OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" def __init__(self, client) -> None: self.client = client @staticmethod def _get_user_param_safe(device, var): param = dev[device].user_parameter if not param or param.get(var) is None: raise ValueError(f"Device {device} has no user parameter definition for {var}.") return param.get(var) def printgreen(self, string: str): print(self.OKGREEN + string + self.ENDC) def printgreenbold(self, string: str): print(self.BOLD + self.OKGREEN + string + self.ENDC) def yesno(self, message: str, default="none", autoconfirm=0) -> bool: if autoconfirm and default == "y": self.printgreen(message + " Automatically confirming default: yes") return True elif autoconfirm and default == "n": self.printgreen(message + " Automatically confirming default: no") return False if default == "y": message_ending = " [Y]/n? " elif default == "n": message_ending = " y/[N]? " else: message_ending = " y/n? " while True: user_input = input(self.OKBLUE + message + message_ending + self.ENDC) if ( user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes" ) or (default == "y" and user_input == ""): return True if ( user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No" ) or (default == "n" and user_input == ""): return False else: print("Please expicitely confirm y or n.") def tweak_cursor( self, dev1, step1: float, dev2="none", step2: float = "0", special_command="none" ): if dev1 not in dev.enabled_devices: print(f"Device 1 {dev} is not in enabled devices.") return if dev2 not in dev.enabled_devices and dev2 != "none": print(f"Device 2 {dev} is not in enabled devices.") return # Save the current terminal settings fd = sys.stdin.fileno() old_term = termios.tcgetattr(fd) try: # Set the terminal to raw mode to capture single key presses tty.setraw(fd) # Set stdin to non-blocking mode old_flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | os.O_NONBLOCK) print("Tweak Cursor." + self.BOLD + self.OKBLUE + "Press (q) to quit!\r" + self.ENDC) while True: try: # Read single character input key = sys.stdin.read(1) if key == "q": print("\n\rExiting tweak mode\r") break elif key == "\x1b": # Escape sequences for arrow keys next1, next2 = sys.stdin.read(2) if next1 == "[": if next2 == "A": if dev2 != "none": umvr(dev2, step2) if special_command != "none": special_command() elif next2 == "B": if dev2 != "none": umvr(dev2, -step2) if special_command != "none": special_command() elif next2 == "C": umvr(dev1, step1) if special_command != "none": special_command() elif next2 == "D": umvr(dev1, -step1) if special_command != "none": special_command() elif key == "+": step1 = step1 * 2 if dev2 != "none": step2 = step2 * 2 print(f"\rDouble step size. New step size: {step1}, {step2}\r") elif key == "-": step1 = step1 / 2 if dev2 != "none": step2 = step2 / 2 print(f"\rHalf step size. New step size: {step1}, {step2}\r") except IOError: pass time.sleep(0.02) finally: # Restore the terminal to its original state termios.tcsetattr(fd, termios.TCSADRAIN, old_term) fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
import socket
[docs] class PtychoReconstructor: """Writes ptychography reconstruction queue files after each scan projection. An external reconstruction engine monitors the queue folder and picks up .dat files as they are written. Usage: reconstructor = PtychoReconstructor(folder_name="reconstruction_queue") reconstructor.write( scan_list=[1023, 1024], next_scan_number=1025, base_path="~/data/raw", ) """ def __init__(self, folder_name: str = "reconstruction_queue"): self.folder_name = folder_name def _accounts_match(self) -> bool: """Check if bec.active_account matches the current system user (p vs e prefix).""" try: bec = builtins.__dict__.get("bec") active = bec.active_account # e.g. "p23092" system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092" print(f"Active server account {active}, BEC client account {system_user}.") return active[1:] == system_user[1:] except Exception: logger.warning("Failed to compare active account to system user.") return False
[docs] def write(self, scan_list: list, next_scan_number: int, base_path: str = "~/data/raw/analysis/"): """Write a reconstruction queue file for the given scan list. Args: scan_list (list): Scan numbers belonging to this projection (may contain multiple entries when stitching). next_scan_number (int): The current next scan number, used to name the queue file. base_path (str): Root path under which the queue folder lives. """ if not self._accounts_match(): logger.warning("Active BEC account does not match system user — skipping queue file write.") return base_path = os.path.expanduser(base_path) queue_path = Path(os.path.join(base_path, self.folder_name)) queue_path.mkdir(parents=True, exist_ok=True) last_scan_number = next_scan_number - 1 queue_file = os.path.abspath( os.path.join(queue_path, f"scan_{last_scan_number:05d}.dat") ) with open(queue_file, "w") as f: scans = " ".join(str(s) for s in scan_list) f.write(f"p.scan_number {scans}\n") f.write("p.check_nextscan_started 1\n")
[docs] class TomoIDManager: """Registers a tomography measurement in the OMNY sample database and returns its assigned tomo ID. Usage: id_manager = TomoIDManager() tomo_id = id_manager.register( sample_name="my_sample", date="2024-03-08", eaccount="e12345", scan_number=1001, setup="lamni", additional_info="test info", user="BEC", ) """ #OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php" OMNY_URL = "https://v1p0zyg2w9n2k9c1.myfritz.net/samples/newmeasurement.php" OMNY_USER = "" OMNY_PASSWORD = "" TMP_FILE = "/tmp/currsamplesnr.txt"
[docs] def register( self, sample_name: str, date: str, eaccount: str, scan_number: int, setup: str, additional_info: str, user: str, ) -> int: """Register a new measurement and return the assigned tomo ID. Args: sample_name (str): Name of the sample. date (str): Date string (e.g. "2024-03-08"). eaccount (str): E-account identifier. scan_number (int): First scan number of the measurement. setup (str): Setup name (e.g. "lamni"). additional_info (str): Any additional sample information. user (str): User name. Returns: int: The tomo ID assigned by the OMNY database. """ url = ( f"{self.OMNY_URL}" f"?sample={sample_name}" f"&date={date}" f"&eaccount={eaccount}" f"&scannr={scan_number}" f"&setup={setup}" f"&additional={additional_info}" f"&user={user}" ) # subprocess.run( # f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}" # f" -q -O {self.TMP_FILE} '{url}'", # shell=True, # ) #print(url) subprocess.run( f"wget -q -O {self.TMP_FILE} '{url}'", shell=True, ) with open(self.TMP_FILE) as f: return int(f.read())