import builtins
import datetime
import os
import subprocess
import time
from pathlib import Path
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from bec_lib.scan_repeat import scan_repeat
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
OMNYTools,
PtychoReconstructor,
TomoIDManager,
)
# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
# FlomniWebpageGenerator,
# VERBOSITY_SILENT, # 0 — no output
# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default)
# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle
# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle
# )
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
[docs]
def umv(*args):
return scans.umv(*args, relative=False)
[docs]
class FlomniInitError(Exception):
pass
[docs]
class FlomniError(Exception):
pass
# class FlomniTools:
# def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
# if autoconfirm and default == "y":
# self.printgreen(message + " Automatically confirming default: yes")
# return True
# elif autoconfirm and default == "n":
# self.printgreen(message + " Automatically confirming default: no")
# return False
# if default == "y":
# message_ending = " [Y]/n? "
# elif default == "n":
# message_ending = " y/[N]? "
# else:
# message_ending = " y/n? "
# while True:
# user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
# if (
# user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
# ) or (default == "y" and user_input == ""):
# return True
# if (
# user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
# ) or (default == "n" and user_input == ""):
# return False
# else:
# print("Please expicitely confirm y or n.")
[docs]
class FlomniInitStagesMixin:
def flomni_init_stages(self):
if self.OMNYTools.yesno("Starting initialization of flOMNI stages. OK?"):
print("staring...")
else:
return
if self.check_all_axes_of_fomni_referenced():
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
print("ok then...")
else:
return
sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage")
sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip())
if not np.isclose(sensor_voltage, sensor_voltage_target, 0.25):
print(
f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}"
)
print(
"Verify that the value is acceptable and update config file. Reload config and start again."
)
return
print("Starting to drive ftransy to +y limit")
self.drive_axis_to_limit(dev.ftransy, "forward")
dev.ftransy.limits = [-100, 0]
print("done")
print("Starting to drive ftransz to -z limit")
self.drive_axis_to_limit(dev.ftransz, "reverse")
dev.ftransz.limits = [0, 145]
print("done")
print("Starting to drive ftransx to -x limit")
self.drive_axis_to_limit(dev.ftransx, "reverse")
dev.ftransx.limits = [0, 50]
print("done")
print("Starting to drive feyey to +y limit")
self.drive_axis_to_limit(dev.feyey, "forward")
dev.feyey.limits = [-1, -10]
print("done")
print("Starting to drive feyex to +x limit")
self.drive_axis_to_limit(dev.feyex, "forward")
dev.feyex.limits = [-30, -1]
print("done")
if self.OMNYTools.yesno(
"Init of foptz. Can the stage move to the upstream limit without collision?"
):
print("OK. continue.")
else:
return
print("Starting to drive foptz to -z limit")
self.drive_axis_to_limit(dev.foptz, "reverse")
dev.foptz.limits = [0, 27]
print("done")
print("Init of Smaract stages")
## smaract stages
max_repeat = 100
repeat = 0
axis_id_fosaz = dev.fosaz._config["deviceConfig"].get("axis_Id")
axis_id_numeric_fosaz = self.axis_id_to_numeric(axis_id_fosaz)
print("Moving fosaz upstream into the light curtain")
while True:
curtain_is_triggered = dev.foptz.controller.fosaz_light_curtain_is_triggered()
if curtain_is_triggered:
break
if repeat > max_repeat:
raise FlomniInitError("Failed to initialize fosaz within 100 repeats.")
dev.fosaz.controller.move_open_loop_steps(
axis_id_numeric_fosaz, -500, amplitude=4000, frequency=2000
)
time.sleep(1)
repeat += 1
print("Finding index of fosax, fosay, fosaz")
for ii in range(3):
dev.fosax.controller.find_reference_mark(ii, 0, 1000, 1)
time.sleep(1)
dev.fosax.limits = [10.2, 10.6]
dev.fosay.limits = [-3.1, -2.9]
dev.fosaz.limits = [-6, -4]
# dev.fosax.controller.describe()
print("Moving fosa stages to approximate beam positions")
umv(dev.fosaz, -5)
umv(dev.fosax, 10.4, dev.fosay, -3)
print("done")
print("Moving fheater to +y limit")
self.drive_axis_to_limit(dev.fheater, "reverse")
dev.fheater.limits = [-15, 0]
print("done")
print("Moving fsamy to -y limit")
self.drive_axis_to_limit(dev.fsamy, "reverse")
dev.fsamy.limits = [2, 3.1]
print("done")
if self.OMNYTools.yesno(
"Init of tracking stages. Did you remove the outer laser flight tubes?"
):
print("OK. continue.")
else:
print("Stopping.")
return
print("Moving tracky to -y limit")
self.drive_axis_to_limit(dev.ftracky, "reverse")
dev.ftracky.limits = [2.2, 2.8]
print("done")
print("Moving ftrackz to -z limit")
self.drive_axis_to_limit(dev.ftrackz, "reverse")
dev.ftrackz.limits = [4.5, 5.5]
print("done")
if self.OMNYTools.yesno("Init of sample stage. Is the piezo at about 0 deg?"):
print("OK. continue.")
else:
print("Stopping.")
return
print("Moving fsamx to +x limit")
self.drive_axis_to_limit(dev.fsamx, "forward")
dev.fsamx.limits = [-162, 0]
print("done")
print("Moving ftray to IN limit")
self.drive_axis_to_limit(dev.ftray, "reverse")
dev.ftray.limits = [-200, 0]
print("done")
print("Initializing UPR stage.")
if self.OMNYTools.yesno(
"To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"
):
print("OK. continue.")
else:
print("Stopping.")
return
while True:
low_limit, high_limit = dev.fsamroy.controller.get_motor_limit_switch("A")
if not high_limit:
print("Please push limit switch to the left.")
time.sleep(1)
continue
break
while True:
low_limit, high_limit = dev.fsamroy.controller.get_motor_limit_switch("A")
if not low_limit:
print("Please push limit switch to the right.")
time.sleep(1)
continue
break
if self.OMNYTools.yesno("Shall I start the index search?"):
print("OK. continue.. Starting index search.")
else:
print("Stopping.")
return
if dev.fsamroy.controller.is_motor_on("A"):
raise FlomniInitError("fsamroy should be off. Something is wrong. Mirko... help!")
dev.fsamroy.controller.socket_put_confirmed("XQ#MOTON")
dev.fsamroy.enabled = False
time.sleep(5)
dev.fsamroy.enabled = True
time.sleep(2)
dev.fsamroy.controller.socket_put_confirmed("XQ#REFAX")
while not dev.fsamroy.controller.all_axes_referenced():
print("Waiting for fsamroy to be referenced.")
time.sleep(1)
dev.fsamroy.limits = [-5, 365]
print("done")
if self.OMNYTools.yesno(
"Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"
):
print("OK. continue.")
else:
print("Stopping.")
return
print("Moving foptx to +x limit")
self.drive_axis_to_limit(dev.foptx, "forward")
dev.foptx.limits = [-16, -14]
print("done")
axis_id_fopty = dev.fopty._config["deviceConfig"].get("axis_Id")
while True:
low_limit, high_limit = dev.fopty.controller.get_motor_limit_switch(axis_id_fopty)
if not low_limit:
print(
"To ensure that the fopty end switch works, please push it down and hold it for"
" about 1 second."
)
time.sleep(1)
continue
break
if self.OMNYTools.yesno("Start limit switch search of fopty?"):
print("OK. continue.")
else:
print("Stopping.")
return
print("Moving fopty to -y limit")
self.drive_axis_to_limit(dev.fopty, "reverse")
dev.fopty.limits = [0, 4]
print("done")
dev.fsamx.controller.galil_show_all()
self.set_limits()
self._align_setup()
def check_all_axes_of_fomni_referenced(self) -> bool:
if (
dev.fosax.controller.axis_is_referenced(0)
& dev.fosax.controller.axis_is_referenced(1)
& dev.fosax.controller.axis_is_referenced(2)
& dev.fsamx.controller.all_axes_referenced()
& dev.feyex.controller.all_axes_referenced()
& dev.fsamroy.controller.all_axes_referenced()
& dev.fsamroy.controller.is_motor_on("A")
):
print("All axes of flomni are referenced.")
return True
else:
return False
def set_limits(self):
if self.OMNYTools.yesno("Set default limits for flOMNI?"):
print("setting limits...")
else:
print("Stopping.")
return
dev.ftransy.limits = [-100, 0]
dev.ftransz.limits = [0, 145]
dev.ftransx.limits = [0, 50]
dev.ftray.limits = [-200, 0]
dev.fsamy.limits = [2, 3.5]
dev.foptz.limits = [22.5, 28]
dev.foptx.limits = [-17, -12]
dev.fheater.limits = [-15, 0]
dev.feyex.limits = [-18, -1]
dev.feyey.limits = [-12, -1]
dev.fopty.limits = [0, 4]
dev.fosax.limits = [7, 10]
dev.fosay.limits = [-4.2, 7]
dev.fosaz.limits = [-6.5, 7.5]
# dev.rtx.limits = [-220, 220]
# dev.rty.limits = [-180, 180]
# dev.rtz.limits = [-220, 220]
dev.fsamroy.limits = [-5, 365]
dev.ftracky.limits = [2.2, 2.8]
dev.ftrackz.limits = [4.5, 5.5]
def _align_setup(self):
if self.OMNYTools.yesno("Start moving stages to default initial positions?", "y"):
print("Start moving stages...")
else:
print("Stopping.")
return
dev.rtx.controller.feedback_disable()
# positions for optics out and 50 mm distance to sample
umv(dev.ftrackz, 4.73, dev.ftracky, 2.5170, dev.foptx, -14.3, dev.fopty, 3.87)
# the fopty 3.87 should put us in place for a lower FZP on the lower FZP chip
umv(dev.foptz, 23)
flomni_samx_in = dev.fsamx.user_parameter.get("in")
if flomni_samx_in is None:
raise FlomniInitError(
"Could not find a fsamx in position. Please check your device config."
)
umv(dev.fsamx, flomni_samx_in)
flomni_samy_in = dev.fsamy.user_parameter.get("in")
if flomni_samy_in is None:
raise FlomniInitError(
"Could not find a fsamy in position. Please check your device config."
)
umv(dev.fsamy, flomni_samy_in)
# after init reduce vertical stage speed
dev.fsamy.controller.socket_put_confirmed("axspeed[5]=20000")
umv(dev.feyey, -8)
[docs]
class FlomniSampleTransferMixin:
def ensure_osa_back(self):
dev.fosaz.limits = [-12.6, -12.4]
umv(dev.fosaz, -12.5)
curtain_is_triggered = dev.fheater.controller.fosaz_light_curtain_is_triggered()
if not curtain_is_triggered:
raise FlomniError("Fosaz did not reach light curtain")
def move_fheater_up(self):
self.ensure_fheater_up()
def ensure_fheater_up(self):
axis_id = dev.fheater._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
low, high = dev.fheater.controller.get_motor_limit_switch(axis_id)
if high:
raise FlomniError("fheater in high limit. How did we get here?? Aborting.")
if not low:
self.ensure_osa_back()
if dev.fheater.readback.get() < -0.2:
umv(dev.fheater, -0.2)
dev.fheater.controller.drive_axis_to_limit(axis_id_numeric, "reverse")
def move_fheater_down(self):
axis_id = dev.fheater._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
self.ensure_osa_back()
fsamx_in = dev.fsamx.user_parameter.get("in")
if not np.isclose(dev.fsamx.readback.get(), fsamx_in, 0.2):
raise FlomniError("fsamx not in position. Aborting.")
fheater_in = dev.fheater.user_parameter.get("in")
umv(dev.fheater, fheater_in)
def ensure_gripper_up(self):
axis_id = dev.ftransy._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
low, high = dev.ftransy.controller.get_motor_limit_switch(axis_id)
if low:
raise FlomniError("Ftransy in low limit. How did we get here?? Aborting.")
if high:
return
if dev.ftransy.readback.get() < -0.5:
umv(dev.ftransy, -0.5)
dev.ftransy.controller.drive_axis_to_limit(axis_id_numeric, "forward")
def check_tray_in(self):
axis_id = dev.ftray._config["deviceConfig"].get("axis_Id")
low, high = dev.ftray.controller.get_motor_limit_switch(axis_id)
if high:
raise FlomniError("Ftray is in the 'OUT' position. Aborting.")
if not low:
raise FlomniError("Ftray is not at the 'IN' position. Aborting.")
def ftransfer_flomni_stage_in(self):
time.sleep(1)
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
# bool(float(dev.flomni_samples.sample_placed.sample0.get()))
if not sample_in_position:
raise FlomniError("There is no sample in the sample stage. Aborting.")
self.reset_correction()
dev.rtx.controller.feedback_disable()
self.ensure_fheater_up()
self.ensure_gripper_up()
self.check_tray_in()
fsamx_in = dev.fsamx.user_parameter.get("in")
umv(dev.fsamx, fsamx_in)
dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4]
print("Moving X-ray eye in.")
if self.OMNYTools.yesno(
"Please confirm that this is ok with the flight tube. This check is to be removed after commissioning",
"n",
):
print("OK. continue.")
else:
print("Stopping.")
raise FlomniError("Manual abort of x-ray eye in.")
self.feye_in()
print("Moving X-ray optics out.")
self.foptics_out()
self.xrayeye_update_frame()
def laser_tracker_show_all(self):
dev.rtx.controller.laser_tracker_show_all()
def laser_tracker_on(self):
dev.rtx.controller.laser_tracker_on()
time.sleep(0.2)
dev.rtx.controller.laser_tracker_check_signalstrength()
def laser_tracker_off(self):
dev.rtx.controller.laser_tracker_off()
def show_signal_strength_interferometer(self):
dev.rtx.controller.show_signal_strength_interferometer()
def feedback_disable(self):
self.device_manager.devices.rtx.controller.feedback_disable()
def feedback_enable_with_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
self.feedback_status()
def feedback_enable_without_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_without_reset()
self.feedback_status()
def feedback_status(self):
feedback_status = self.device_manager.devices.rtx.controller.feedback_is_running()
if feedback_status == True:
print("The rt feedback is \x1b[92mrunning\x1b[0m.")
else:
print("The rt feedback is \x1b[91mNOT\x1b[0m running.")
def lights_off(self):
self.device_manager.devices.fsamx.controller.lights_off()
def lights_on(self):
self.device_manager.devices.fsamx.controller.lights_on()
def ftransfer_flomni_stage_out(self):
self.flomnigui_show_cameras()
self.flomnigui_raise()
target_pos = -162
if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01):
return
umv(dev.fsamroy, 0)
self.feedback_disable()
self.ensure_fheater_up()
self.ensure_gripper_up()
self.check_tray_in()
self.laser_tracker_off()
time.sleep(0.05)
fsamy_in = dev.fsamy.user_parameter.get("in")
if fsamy_in is None:
raise FlomniError(
"Could not find an 'IN' position for fsamy. Please check your config."
)
umv(dev.fsamy, fsamy_in)
time.sleep(0.05)
self.laser_tracker_on()
time.sleep(0.05)
self.laser_tracker_off()
time.sleep(0.05)
self.drive_axis_to_limit(dev.fsamx, "forward")
dev.fsamx.limits = [-162, 0]
dev.fsamx.controller.socket_put_confirmed("axspeed[4]=25*stppermm[4]")
umv(dev.fsamx, target_pos)
def check_sensor_connected(self):
sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage")
sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip())
if not np.isclose(sensor_voltage, sensor_voltage_target, 0.5):
raise FlomniError(f"Sensor voltage is {sensor_voltage}, indicates an error. Aborting.")
def ftransfer_get_sample(self, position: int):
self.check_position_is_valid(position)
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError(
"The gripper does carry a sample. Cannot proceed getting another sample."
)
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
if not sample_in_position:
raise FlomniError(f"The planned pick position [{position}] does not have a sample.")
self.flomnigui_show_cameras()
self.ftransfer_gripper_move(position)
self.ftransfer_controller_enable_mount_mode()
if position == 0:
sample_height = -45 + dev.fsamy.user_parameter.get("in")
else:
sample_height = -17.5
dev.ftransy.controller.socket_put_confirmed(f"getaprch={sample_height:.1f}")
dev.ftransy.controller.socket_put_confirmed("XQ#GRGET,3")
print("The unmount process started.")
time.sleep(1)
while True:
in_progress = bool(
float(dev.ftransy.controller.socket_put_and_receive("MG mntprgs").strip())
)
if not in_progress:
break
self.ftransfer_confirm()
time.sleep(1)
self.ftransfer_controller_disable_mount_mode()
self.ensure_gripper_up()
signal_name = getattr(dev.flomni_samples.sample_names, f"sample{position}")
self.flomni_modify_storage_non_interactive(100, 1, signal_name.get())
self.flomni_modify_storage_non_interactive(position, 0, "-")
def ftransfer_show_all(self):
dev.flomni_samples.show_all()
def ftransfer_put_sample(self, position: int):
self.check_position_is_valid(position)
self.check_tray_in()
self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
# bool(float(dev.flomni_samples.sample_in_gripper.get()))
if not sample_in_gripper:
raise FlomniError("The gripper does not carry a sample.")
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
if sample_in_position:
raise FlomniError(f"The planned put position [{position}] already has a sample.")
self.flomnigui_show_cameras()
self.ftransfer_gripper_move(position)
self.ftransfer_controller_enable_mount_mode()
if position == 0:
sample_height = -45 + dev.fsamy.user_parameter.get("in")
else:
sample_height = -17.5
dev.ftransy.controller.socket_put_confirmed(f"mntaprch={sample_height:.1f}")
dev.ftransy.controller.socket_put_confirmed("XQ#GRPUT,3")
print("The mount process started.")
time.sleep(1)
while True:
in_progress = bool(
float(dev.ftransy.controller.socket_put_and_receive("MG mntprgs").strip())
)
if not in_progress:
break
self.ftransfer_confirm()
time.sleep(1)
self.ftransfer_controller_disable_mount_mode()
self.ensure_gripper_up()
sample_name = dev.flomni_samples.sample_in_gripper_name.get()
self.flomni_modify_storage_non_interactive(100, 0, "-")
self.flomni_modify_storage_non_interactive(position, 1, sample_name)
if position == 0:
self.ftransfer_flomni_stage_in()
bec.queue.next_dataset_number += 1
[docs]
def sample_get_name(self, position: int = 0) -> str:
"""
Get the name of the sample currently in the given position.
"""
signal_name = getattr(dev.flomni_samples.sample_names, f"sample{position}")
return signal_name.get()
def ftransfer_sample_change(self, new_sample_position: int):
self.check_tray_in()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper:
raise FlomniError("There is already a sample in the gripper. Aborting.")
self.check_position_is_valid(new_sample_position)
if new_sample_position == 0:
raise FlomniError(
"The new sample to place cannot be the sample in the sample stage. Aborting."
)
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
if not sample_placed:
raise FlomniError(
f"There is currently no sample in position [{new_sample_position}]. Aborting."
)
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
if sample_in_sample_stage:
# find a new home for the sample...
empty_slots = []
# for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name:
# continue
# if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1, 20):
if not dev.flomni_samples.is_sample_slot_used(j):
empty_slots.append(j)
if not empty_slots:
raise FlomniError("There are no empty slots available. Aborting.")
print(f"The following slots are empty: {empty_slots}.")
while True:
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}] ")
if user_input.strip() == "":
# No entry: use default
user_input = empty_slots[0]
break
try:
user_input = int(user_input)
if user_input not in empty_slots:
raise ValueError
break
except ValueError:
print("Please specify a valid number.")
continue
self.check_position_is_valid(user_input)
self.ftransfer_get_sample(0)
self.ftransfer_put_sample(user_input)
self.ftransfer_get_sample(new_sample_position)
self.ftransfer_put_sample(0)
def ftransfer_modify_storage(self, position: int, used: int):
if used:
name = input("What's the name of this sample? ")
else:
name = "-"
self.flomni_modify_storage_non_interactive(position, used, name)
def flomni_modify_storage_non_interactive(self, position: int, used: int, name: str):
if position == 100:
dev.flomni_samples.sample_in_gripper.set(used)
dev.flomni_samples.sample_in_gripper_name.set(name)
else:
signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
signal.set(used)
signal_name = getattr(dev.flomni_samples.sample_names, f"sample{position}")
signal_name.set(name)
def check_position_is_valid(self, position: int):
if 0 <= position < 21:
return
raise FlomniError(
f"The given position number [{position}] is not in the valid range of 0-21. "
)
def ftransfer_controller_enable_mount_mode(self):
dev.ftransy.controller.socket_put_confirmed("XQ#MNTMODE")
time.sleep(0.5)
if not self.ftransfer_controller_in_mount_mode():
raise FlomniError("System not switched to mount mode. Aborting.")
def ftransfer_controller_disable_mount_mode(self):
dev.ftransy.controller.socket_put_confirmed("XQ#POSMODE")
time.sleep(0.5)
if self.ftransfer_controller_in_mount_mode():
raise FlomniError("System is still in mount mode. Aborting.")
def ftransfer_controller_in_mount_mode(self) -> bool:
in_mount_mode = bool(
float(dev.ftransy.controller.socket_put_and_receive("MG mntmod").strip())
)
return in_mount_mode
def ftransfer_confirm(self):
confirm = int(float(dev.ftransy.controller.socket_put_and_receive("MG confirm").strip()))
if confirm != -1:
return
if self.OMNYTools.yesno("All OK? Continue?", "y"):
print("OK. continue.")
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
raise FlomniError("User abort sample transfer.")
def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
return status
def ftransfer_gripper_open(self):
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
# dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper:
raise FlomniError(
"Cannot open gripper. There is still a sample in the gripper! Aborting."
)
if not self.ftransfer_gripper_is_open():
dev.ftransy.controller.socket_put_confirmed("XQ#GROPEN,4")
def ftransfer_gripper_close(self):
if self.ftransfer_gripper_is_open():
dev.ftransy.controller.socket_put_confirmed("XQ#GRCLOS,4")
def ftransfer_gripper_move(self, position: int):
self.check_position_is_valid(position)
# this is not used for sample stage position!
self._ftransfer_shiftx = -0.15
self._ftransfer_shiftz = -0.5
fsamx_pos = dev.fsamx.readback.get()
if position == 0 and fsamx_pos > -160:
if self.OMNYTools.yesno(
"May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!",
"y",
):
print("OK. continue.")
self.ftransfer_flomni_stage_out()
else:
print("Stopping.")
return
self.ensure_gripper_up()
self.check_tray_in()
if position == 0:
umv(dev.ftransx, 11, dev.ftransz, 3.5950)
if position == 1:
umv(
dev.ftransx,
41.900 + self._ftransfer_shiftx,
dev.ftransz,
74.7500 + self._ftransfer_shiftz,
)
if position == 2:
umv(
dev.ftransx,
31.900 + self._ftransfer_shiftx,
dev.ftransz,
74.7625 + self._ftransfer_shiftz,
)
if position == 3:
umv(
dev.ftransx,
21.900 + self._ftransfer_shiftx,
dev.ftransz,
74.7750 + self._ftransfer_shiftz,
)
if position == 4:
umv(
dev.ftransx,
11.900 + self._ftransfer_shiftx,
dev.ftransz,
74.7875 + self._ftransfer_shiftz,
)
if position == 5:
umv(
dev.ftransx,
1.9000 + self._ftransfer_shiftx,
dev.ftransz,
74.8000 + self._ftransfer_shiftz,
)
if position == 6:
umv(
dev.ftransx,
41.900 + self._ftransfer_shiftx,
dev.ftransz,
89.7500 + self._ftransfer_shiftz,
)
if position == 7:
umv(
dev.ftransx,
31.900 + self._ftransfer_shiftx,
dev.ftransz,
89.7625 + self._ftransfer_shiftz,
)
if position == 8:
umv(
dev.ftransx,
21.900 + self._ftransfer_shiftx,
dev.ftransz,
89.7750 + self._ftransfer_shiftz,
)
if position == 9:
umv(
dev.ftransx,
11.900 + self._ftransfer_shiftx,
dev.ftransz,
89.7875 + self._ftransfer_shiftz,
)
if position == 10:
umv(
dev.ftransx,
1.900 + self._ftransfer_shiftx,
dev.ftransz,
89.8000 + self._ftransfer_shiftz,
)
if position == 11:
umv(
dev.ftransx,
41.95 + self._ftransfer_shiftx,
dev.ftransz,
124.75 + self._ftransfer_shiftz,
)
if position == 12:
umv(
dev.ftransx,
31.95 + self._ftransfer_shiftx,
dev.ftransz,
124.7625 + self._ftransfer_shiftz,
)
if position == 13:
umv(
dev.ftransx,
21.95 + self._ftransfer_shiftx,
dev.ftransz,
124.7750 + self._ftransfer_shiftz,
)
if position == 14:
umv(
dev.ftransx,
11.95 + self._ftransfer_shiftx,
dev.ftransz,
124.7875 + self._ftransfer_shiftz,
)
if position == 15:
umv(
dev.ftransx,
1.95 + self._ftransfer_shiftx,
dev.ftransz,
124.8000 + self._ftransfer_shiftz,
)
if position == 16:
umv(
dev.ftransx,
41.95 + self._ftransfer_shiftx,
dev.ftransz,
139.7500 + self._ftransfer_shiftz,
)
if position == 17:
umv(
dev.ftransx,
31.95 + self._ftransfer_shiftx,
dev.ftransz,
139.7625 + self._ftransfer_shiftz,
)
if position == 18:
umv(
dev.ftransx,
21.95 + self._ftransfer_shiftx,
dev.ftransz,
139.7750 + self._ftransfer_shiftz,
)
if position == 19:
umv(
dev.ftransx,
11.95 + self._ftransfer_shiftx,
dev.ftransz,
139.7875 + self._ftransfer_shiftz,
)
if position == 20:
umv(
dev.ftransx,
1.95 + self._ftransfer_shiftx,
dev.ftransz,
139.8000 + self._ftransfer_shiftz,
)
[docs]
class FlomniAlignmentMixin:
import csaxs_bec
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
default_correction_file_rel = "correction_flomni_20210300_360deg.txt"
# Build the absolute path correctly
default_correction_file = (
csaxs_bec_basepath.parent
/ "bec_ipython_client"
/ "plugins"
/ "flomni"
/ default_correction_file_rel
).resolve()
[docs]
def reset_correction(self, use_default_correction=True):
"""
Reset the correction to the default values.
If use_default_correction is False, the correction will be set to empty values.
Otherwise the default values will be loaded.
Args:
use_default_correction (bool, optional): If set to true, a call reset the correction to the default values. Defaults to True.
"""
self.corr_pos_y = []
self.corr_angle_y = []
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
if use_default_correction:
try:
self.read_additional_correction_y(self.default_correction_file)
logger.info(f"Applying default correction from {self.default_correction_file}")
except FileNotFoundError:
logger.warning(
f"Could not find default correction file {self.default_correction_file}."
)
logger.warning("Not applying any correction.")
def reset_tomo_alignment_fit(self):
self.client.delete_global_var("tomo_alignment_fit")
def read_alignment_offset(
self,
dir_path=os.path.expanduser("~/data/raw/logs/"),
setup="flomni",
use_vertical_default_values=True,
get_data_from_gui=False,
):
tomo_alignment_fit = np.zeros((2, 5))
if not get_data_from_gui:
"""
Read the alignment parameters from xray eye fit.
"""
with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file:
tomo_alignment_fit[0][0] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file:
tomo_alignment_fit[0][1] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file:
tomo_alignment_fit[0][2] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file:
tomo_alignment_fit[1][0] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file:
tomo_alignment_fit[1][1] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file:
tomo_alignment_fit[1][2] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_Ay3.txt"), "r") as file:
tomo_alignment_fit[1][3] = file.readline()
with open(os.path.join(dir_path, "ptychotomoalign_By3.txt"), "r") as file:
tomo_alignment_fit[1][4] = file.readline()
print("New alignment parameters loaded from filesystem, meaning Matlab fit:")
else:
params = dev.omny_xray_gui.fit_params_x.get()
# amplitude
tomo_alignment_fit[0][0] = params["SineModel_0_amplitude"]
# phase
tomo_alignment_fit[0][1] = params["SineModel_0_shift"]
# offset
tomo_alignment_fit[0][2] = params["LinearModel_1_intercept"]
print("applying vertical default values from mirror calibration, not from fit!")
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 0
tomo_alignment_fit[1][4] = 0
print("New alignment parameters loaded based on Xray eye alignment GUI:")
print(
f"X Amplitude {tomo_alignment_fit[0][0]}, "
f"X Phase {tomo_alignment_fit[0][1]}, "
f"X Offset {tomo_alignment_fit[0][2]}, "
f"Y Amplitude {tomo_alignment_fit[1][0]}, "
f"Y Phase {tomo_alignment_fit[1][1]}, "
f"Y Offset {tomo_alignment_fit[1][2]}, "
f"Y 3rd Order Amplitude {tomo_alignment_fit[1][3]}, "
f"Y 3rd Order Phase {tomo_alignment_fit[1][4]} ."
)
if use_vertical_default_values:
print(
f"Using default values for vertical alignment for setup {setup}. Optional: use_vertical_default_values=False"
)
if setup == "flomni":
tomo_alignment_fit[1][0] = 0
tomo_alignment_fit[1][1] = 0
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 0
tomo_alignment_fit[1][4] = 0
elif setup == "omny":
tomo_alignment_fit[1][0] = 2.588628
tomo_alignment_fit[1][1] = -2.385422
tomo_alignment_fit[1][2] = 0
tomo_alignment_fit[1][3] = 1.010583
tomo_alignment_fit[1][4] = -1.359157
print("Follwing parameters will be used:")
print(
f"X Amplitude {tomo_alignment_fit[0][0]}, "
f"X Phase {tomo_alignment_fit[0][1]}, "
f"X Offset {tomo_alignment_fit[0][2]}, "
f"Y Amplitude {tomo_alignment_fit[1][0]}, "
f"Y Phase {tomo_alignment_fit[1][1]}, "
f"Y Offset {tomo_alignment_fit[1][2]}, "
f"Y 3rd Order Amplitude {tomo_alignment_fit[1][3]}, "
f"Y 3rd Order Phase {tomo_alignment_fit[1][4]} ."
)
self.client.set_global_var("tomo_alignment_fit", tomo_alignment_fit.tolist())
# x amp, phase, offset, y amp, phase, offset, 3rd order amp, 3rd order phase
# 0 0 0 1 0 2 1 0 1 1 1 2 1 3 1 4
[docs]
def get_alignment_offset(self, angle: float):
"""
Compute the alignment offset for the given angle.
Args:
angle (float): The angle to compute the alignment offset for.
Returns:
tuple: The alignment offset in x, y and z direction.
"""
tomo_alignment_fit = self.client.get_global_var("tomo_alignment_fit")
if tomo_alignment_fit is None:
print("Not applying any alignment offsets. No tomo alignment fit data available.\n")
return (0, 0, 0)
# x amp, phase, offset, y amp, phase, offset
# 0 0 0 1 0 2 1 0 1 1 1 2
correction_x = (
tomo_alignment_fit[0][0] * np.sin(np.radians(angle) + tomo_alignment_fit[0][1])
+ tomo_alignment_fit[0][2]
)
correction_y = (
tomo_alignment_fit[1][0] * np.sin(np.radians(angle) + tomo_alignment_fit[1][1])
+ tomo_alignment_fit[1][2]
+ tomo_alignment_fit[1][3] * np.sin(3 * np.radians(angle) + tomo_alignment_fit[1][4])
)
correction_z = tomo_alignment_fit[0][0] * np.sin(
np.radians(angle + 90) + tomo_alignment_fit[0][1]
)
print(
f"Alignment offset x {correction_x}, y {correction_y}, z {correction_z} for angle"
f" {angle}\n"
)
return (correction_x, correction_y, correction_z)
def _read_correction_file(self, correction_file: str):
with open(correction_file, "r") as f:
num_elements = f.readline()
int_num_elements = int(num_elements.split(" ")[2])
corr_pos = []
corr_angle = []
for j in range(int_num_elements * 2):
line = f.readline()
value = line.split(" ")[2]
name = line.split(" ")[0].split("[")[0]
if name == "corr_pos":
corr_pos.append(float(value))
elif name == "corr_angle":
corr_angle.append(float(value))
print(
f"Loading default mirror correction from file {correction_file} containing {int_num_elements} elements."
)
return corr_pos, corr_angle
def read_additional_correction_y(self, correction_file: str):
self.corr_pos_y, self.corr_angle_y = self._read_correction_file(correction_file)
def read_additional_correction_y_2(self, correction_file: str):
self.corr_pos_y_2, self.corr_angle_y_2 = self._read_correction_file(correction_file)
def compute_additional_correction_y(self, angle):
return self._compute_additional_correction(angle, iteration=1)
def compute_additional_correction_y_2(self, angle):
return self._compute_additional_correction(angle, iteration=2)
def _compute_additional_correction(self, angle, iteration=1):
if iteration == 1:
corr_pos = self.corr_pos_y
corr_angle = self.corr_angle_y
elif iteration == 2:
corr_pos = self.corr_pos_y_2
corr_angle = self.corr_angle_y_2
if not corr_pos:
print("Not applying any additional correction. No data available.\n")
return 0
# find index of closest angle
for j, _ in enumerate(corr_pos):
newangledelta = np.fabs(corr_angle[j] - angle)
if j == 0:
angledelta = newangledelta
additional_correction_shift = corr_pos[j]
continue
if newangledelta < angledelta:
additional_correction_shift = corr_pos[j]
angledelta = newangledelta
if additional_correction_shift == 0 and angle > corr_angle[-1]:
additional_correction_shift = corr_pos[-1]
print(f"Additional correction shift {iteration} in y: {additional_correction_shift}")
return additional_correction_shift
class _ProgressProxy:
"""Dict-like proxy that persists the flOMNI progress dict as a BEC global variable.
Every read (`proxy["key"]`) fetches the current dict from the global var store,
and every write (`proxy["key"] = val`) fetches, updates, and saves it back.
This makes the progress state visible to all BEC client sessions via
``client.get_global_var("tomo_progress")``.
"""
_GLOBAL_VAR_KEY = "tomo_progress"
_DEFAULTS: dict = {
"subtomo": 0,
"subtomo_projection": 0,
"subtomo_total_projections": 1,
"projection": 0,
"total_projections": 1,
"angle": 0,
"tomo_type": 0,
"tomo_start_time": None,
"estimated_remaining_time": None,
"estimated_finish_time": None,
"heartbeat": None,
}
def __init__(self, client):
self._client = client
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _load(self) -> dict:
val = self._client.get_global_var(self._GLOBAL_VAR_KEY)
if val is None:
return dict(self._DEFAULTS)
return val
def _save(self, data: dict) -> None:
self._client.set_global_var(self._GLOBAL_VAR_KEY, data)
# ------------------------------------------------------------------
# Dict-like interface
# ------------------------------------------------------------------
def __getitem__(self, key):
return self._load()[key]
def __setitem__(self, key, value) -> None:
data = self._load()
data[key] = value
self._save(data)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._load()!r})"
def get(self, key, default=None):
return self._load().get(key, default)
def update(self, *args, **kwargs) -> None:
"""Update multiple fields in a single round-trip."""
data = self._load()
data.update(*args, **kwargs)
self._save(data)
def reset(self) -> None:
"""Reset all progress fields to their default values."""
self._save(dict(self._DEFAULTS))
def as_dict(self) -> dict:
"""Return a plain copy of the current progress state."""
return self._load()
[docs]
class Flomni(
FlomniInitStagesMixin,
FlomniSampleTransferMixin,
FlomniAlignmentMixin,
FlomniOpticsMixin,
cSAXSBeamlineChecks,
flomniGuiTools,
):
def __init__(self, client):
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.tomo_id = -1
self.special_angles = []
self.special_angle_repeats = 20
self.special_angle_tolerance = 20
self._current_special_angles = []
self._beam_is_okay = True
self.corr_pos_y = []
self.corr_angle_y = []
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self._progress_proxy = _ProgressProxy(self.client)
self._progress_proxy.reset()
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import (
FlomniWebpageGenerator,
)
self._webpage_gen = FlomniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
# upload_url="http://s1090968537.online.de/upload.php", # optional
upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php",
local_port=8080,
)
self._webpage_gen.start()
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
self.align = XrayEyeAlign(self.client, self)
self.set_client(client)
[docs]
def set_web_password(self, password: str) -> None:
"""Set the web password for the current BEC account."""
self._webpage_gen.set_web_password(password)
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
if self.OMNYTools.yesno(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.",
"y",
):
self.align = XrayEyeAlign(self.client, self)
try:
self.align.align(keep_shutter_open)
except KeyboardInterrupt as exc:
fsamx_in = self._get_user_param_safe(dev.fsamx, "in")
if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5):
print("Stopping alignment. Returning to fsamx in position.")
self.feedback_disable()
umv(dev.fsamx, fsamx_in)
raise exc
def xrayeye_update_frame(self, keep_shutter_open=False):
self.align.update_frame(keep_shutter_open)
def xrayeye_alignment_start(self, keep_shutter_open=False):
self.start_x_ray_eye_alignment(keep_shutter_open)
def drive_axis_to_limit(self, device, direction):
axis_id = device._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
device.controller.drive_axis_to_limit(axis_id_numeric, direction)
def axis_id_to_numeric(self, axis_id) -> int:
return ord(axis_id.lower()) - 97
[docs]
def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5):
"""Set the special angles for a tomo
Args:
angles (list): List of special angles
repeats (int, optional): Number of repeats at a special angle. Defaults to 20.
tolerance (float, optional): Number of repeats at a special angle. Defaults to 0.5.
"""
self.special_angles = angles
self.special_angle_repeats = repeats
self.special_angle_tolerance = tolerance
[docs]
def remove_special_angles(self):
"""Remove the special angles and set the number of repeats to 1"""
self.special_angles = []
self.special_angle_repeats = 1
@property
def progress(self) -> _ProgressProxy:
"""Proxy dict backed by the BEC global variable ``tomo_progress``.
Readable from any BEC client session via::
client.get_global_var("tomo_progress")
Individual fields can be read and written just like a regular dict::
flomni.progress["projection"] # read
flomni.progress["projection"] = 42 # write (persists immediately)
To update multiple fields atomically use :py:meth:`_ProgressProxy.update`::
flomni.progress.update(projection=42, angle=90.0)
To reset all fields to their defaults::
flomni.progress.reset()
"""
return self._progress_proxy
@progress.setter
def progress(self, val: dict) -> None:
"""Replace the entire progress dict.
Accepts a plain :class:`dict` and persists it to the global var store.
Example::
flomni.progress = {"projection": 0, "total_projections": 100, ...}
"""
if not isinstance(val, dict):
raise TypeError(f"progress must be a dict, got {type(val).__name__!r}")
self._progress_proxy._save(val)
@property
def tomo_shellstep(self):
val = self.client.get_global_var("tomo_shellstep")
if val is None:
return 1
return val
@tomo_shellstep.setter
def tomo_shellstep(self, val: float):
self.client.set_global_var("tomo_shellstep", val)
@property
def tomo_countingtime(self):
val = self.client.get_global_var("tomo_countingtime")
if val is None:
return 0.1
return val
@tomo_countingtime.setter
def tomo_countingtime(self, val: float):
self.client.set_global_var("tomo_countingtime", val)
@property
def manual_shift_y(self):
val = self.client.get_global_var("manual_shift_y")
if val is None:
return 0.0
return val
@manual_shift_y.setter
def manual_shift_y(self, val: float):
self.client.set_global_var("manual_shift_y", val)
@property
def fovx(self):
val = self.client.get_global_var("fovx")
if val is None:
return 20
return val
@fovx.setter
def fovx(self, val: float):
if val > 200:
raise ValueError("FOV cannot be larger than 200 um.")
self.client.set_global_var("fovx", val)
@property
def fovy(self):
val = self.client.get_global_var("fovy")
if val is None:
return 20
return val
@fovy.setter
def fovy(self, val: float):
if val > 100:
raise ValueError("FOV cannot be larger than 100 um.")
self.client.set_global_var("fovy", val)
@property
def tomo_type(self):
val = self.client.get_global_var("tomo_type")
if val is None:
return 1
return val
@tomo_type.setter
def tomo_type(self, val: float):
if val == 1:
# equally spaced tomography with 8 sub tomograms
self.client.set_global_var("tomo_type", val)
elif val == 2:
# golden ratio tomography (sorted bunches)
self.client.set_global_var("tomo_type", val)
elif val == 3:
# equally spaced tomography with starting angles shifted by golden ratio
self.client.set_global_var("tomo_type", val)
else:
raise ValueError("Unknown tomo_type.")
@property
def corridor_size(self):
val = self.client.get_global_var("corridor_size")
if val is None:
val = -1
return val
@corridor_size.setter
def corridor_size(self, val: float):
self.client.set_global_var("corridor_size", val)
@property
def stitch_x(self):
val = self.client.get_global_var("stitch_x")
if val is None:
return 0
return val
@stitch_x.setter
@typechecked
def stitch_x(self, val: int):
self.client.set_global_var("stitch_x", val)
@property
def stitch_y(self):
val = self.client.get_global_var("stitch_y")
if val is None:
return 0
return val
@stitch_y.setter
@typechecked
def stitch_y(self, val: int):
self.client.set_global_var("stitch_y", val)
@property
def ptycho_reconstruct_foldername(self):
val = self.client.get_global_var("ptycho_reconstruct_foldername")
if val is None:
return "ptycho_reconstruct"
return val
@ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val)
self.reconstructor.folder_name = val # keep reconstructor in sync
@property
def tomo_angle_stepsize(self):
val = self.client.get_global_var("tomo_angle_stepsize")
if val is None:
return 10.0
return val
@tomo_angle_stepsize.setter
def tomo_angle_stepsize(self, val: float):
self.client.set_global_var("tomo_angle_stepsize", val)
@property
def golden_max_number_of_projections(self):
val = self.client.get_global_var("golden_max_number_of_projections")
if val is None:
return 1000.0
return val
@golden_max_number_of_projections.setter
def golden_max_number_of_projections(self, val: float):
self.client.set_global_var("golden_max_number_of_projections", val)
@property
def tomo_stitch_overlap(self):
val = self.client.get_global_var("tomo_stitch_overlap")
if val is None:
return 0.2
return val
@tomo_stitch_overlap.setter
def tomo_stitch_overlap(self, val: float):
self.client.set_global_var("tomo_stitch_overlap", val)
@property
def tomo_angle_range(self):
"""Total angular sweep in degrees for tomo_type 1 (equally spaced
sub-tomograms), inclusive of the upper bound. Either 180 (default,
original behaviour) or 360."""
val = self.client.get_global_var("tomo_angle_range")
if val is None:
return 180
return val
@tomo_angle_range.setter
def tomo_angle_range(self, val: float):
if val not in (180, 360):
raise ValueError("tomo_angle_range must be 180 or 360 degrees.")
self.client.set_global_var("tomo_angle_range", val)
@property
def golden_projections_at_0_deg_for_damage_estimation(self):
val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation")
if val is None:
return 0
return val
@golden_projections_at_0_deg_for_damage_estimation.setter
def golden_projections_at_0_deg_for_damage_estimation(self, val: float):
self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val)
@property
def golden_ratio_bunch_size(self):
val = self.client.get_global_var("golden_ratio_bunch_size")
if val is None:
return 20
return val
@golden_ratio_bunch_size.setter
def golden_ratio_bunch_size(self, val: float):
self.client.set_global_var("golden_ratio_bunch_size", val)
@property
def frames_per_trigger(self):
"""Number of burst frames acquired per point/projection. Used both
by scans.flomni_fermat_scan (via tomo_scan_projection) and by
tomo_acquire_at_angle (scans.acquire)."""
val = self.client.get_global_var("frames_per_trigger")
if val is None:
return 1
return val
@frames_per_trigger.setter
def frames_per_trigger(self, val: int):
if isinstance(val, bool) or not isinstance(val, int):
raise ValueError("frames_per_trigger must be a positive integer.")
if val <= 0:
raise ValueError("frames_per_trigger must be a positive integer.")
self.client.set_global_var("frames_per_trigger", val)
@property
def single_point_instead_of_fermat_scan(self):
"""If True, tomo_scan acquires a single point (or burst) at each
angle via scans.acquire instead of running scans.flomni_fermat_scan.
Applies to all tomo_types, since it only changes how a given angle
is acquired, not which angles are visited."""
val = self.client.get_global_var("single_point_instead_of_fermat_scan")
if val is None:
return False
return val
@single_point_instead_of_fermat_scan.setter
def single_point_instead_of_fermat_scan(self, val: bool):
self.client.set_global_var("single_point_instead_of_fermat_scan", val)
@property
def sample_name(self):
return self.sample_get_name(0)
[docs]
def tomo_alignment_scan(self):
"""
Performs a tomogram alignment scan.
Collects all scan numbers acquired during the alignment, prints them at the end,
and creates a BEC scilog text entry summarising the alignment scan numbers.
"""
if self.get_alignment_offset(0) == (0, 0, 0):
print("It appears that the xrayeye alignemtn was not performend or loaded. Aborting.")
return
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
self.feye_out()
tags = ["BEC_alignment_tomo", self.sample_name]
self.write_alignment_scan_numbers(bec.queue.next_scan_number)
start_angle = 0
alignment_scan_numbers = []
angle_end = start_angle + 180
for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True):
successful = False
error_caught = False
if 0 <= angle < 180.05:
print(f"Starting flOMNI scan for angle {angle}")
while not successful:
try:
start_scan_number = bec.queue.next_scan_number
self.tomo_scan_projection(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
# todo here was if blchk success, then setting to success true
successful = True
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
# self._write_tomo_scan_number(scan_nr, angle, 0)
alignment_scan_numbers.append(scan_nr)
umv(dev.fsamroy, 0)
self.OMNYTools.printgreenbold(
"\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit by flomni.read_alignment_offset() ."
)
# summary of alignment scan numbers
scan_list_str = ", ".join(str(s) for s in alignment_scan_numbers)
# print(f"\nAlignment scan numbers ({len(alignment_scan_numbers)} total): {scan_list_str}")
# BEC scilog entry (no logo)
scilog_content = (
f"Alignment scan finished.\n"
f"Sample: {self.sample_name}\n"
f"Number of alignment scans: {len(alignment_scan_numbers)}\n"
f"Alignment scan numbers: {scan_list_str}\n"
)
print(scilog_content)
bec.messaging.scilog.new().add_text(scilog_content.replace("\n", "<br>")).add_tags(
"alignmentscan"
).send()
def write_alignment_scan_numbers(self, first_scan):
import os
file = os.path.expanduser("~/data/raw/logs/ptychotomoalign_scannum.txt")
os.makedirs(os.path.dirname(file), exist_ok=True)
scans = [first_scan + k for k in range(5)]
angles = [0, 45, 90, 135, 180]
x_vals = []
for angle in angles:
x, y, z = self.get_alignment_offset(angle)
x_vals.append(x)
zeros = [0] * len(angles)
with open(file, "w") as f:
f.write(" ".join(map(str, scans)) + "\n")
f.write(" ".join(map(str, angles)) + "\n")
f.write(" ".join(f"{x:.2f}" for x in x_vals) + "\n")
f.write(" ".join(map(str, x_vals)) + "\n")
f.write(" ".join(map(str, zeros)) + "\n")
[docs]
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""
Performs a sub tomogram scan.
Args:
subtomo_number (int): The sub tomogram number.
start_angle (float, optional): The start angle of the scan. Defaults to None.
"""
if start_angle is not None:
print(f"Sub tomo scan with start angle {start_angle} requested.")
if start_angle is None:
if subtomo_number == 1:
start_angle = 0
elif subtomo_number == 2:
start_angle = self.tomo_angle_stepsize / 8.0 * 4
elif subtomo_number == 3:
start_angle = self.tomo_angle_stepsize / 8.0 * 2
elif subtomo_number == 4:
start_angle = self.tomo_angle_stepsize / 8.0 * 6
elif subtomo_number == 5:
start_angle = self.tomo_angle_stepsize / 8.0 * 1
elif subtomo_number == 6:
start_angle = self.tomo_angle_stepsize / 8.0 * 5
elif subtomo_number == 7:
start_angle = self.tomo_angle_stepsize / 8.0 * 3
elif subtomo_number == 8:
start_angle = self.tomo_angle_stepsize / 8.0 * 7
# _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0
# compute number of projections
start = start_angle + _tomo_shift_angles
if subtomo_number % 2: # odd = forward
max_allowed_angle = self.tomo_angle_range + 0.05 + self.tomo_angle_stepsize
proposed_end = start + self.tomo_angle_range
angle_end = min(proposed_end, max_allowed_angle)
span = angle_end - start
else: # even = reverse
min_allowed_angle = 0
proposed_end = start - self.tomo_angle_range
angle_end = max(proposed_end, min_allowed_angle)
span = start - angle_end
# number of projections needed to maintain step size
N = int(span / self.tomo_angle_stepsize) + 1
angles = np.linspace(start, angle_end, num=N, endpoint=True)
if subtomo_number % 2: # odd subtomos → forward direction
# clamp end angle to max allowed
max_allowed_angle = self.tomo_angle_range + 0.05 + self.tomo_angle_stepsize
proposed_end = start + self.tomo_angle_range
angle_end = min(proposed_end, max_allowed_angle)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
else: # even subtomos → reverse direction
# go FROM start_angle down toward 0
min_allowed_angle = 0
proposed_end = start - self.tomo_angle_range
angle_end = max(proposed_end, min_allowed_angle)
angles = np.linspace(start, angle_end, num=N, endpoint=True)
for i, angle in enumerate(angles):
self.progress["subtomo"] = subtomo_number
# --- NEW LOGIC FOR OFFSET WHEN start_angle IS SPECIFIED ---
if i == 0:
step = self.tomo_angle_stepsize
sa = start_angle
if start_angle is None:
# normal operation: always start at zero
self._subtomo_offset = 0
else:
if subtomo_number % 2: # odd = forward direction
self._subtomo_offset = round(sa / step)
else: # even = reverse direction
self._subtomo_offset = round((self.tomo_angle_range - sa) / step)
# progress index must always increase
self.progress["subtomo_projection"] = self._subtomo_offset + i
# ------------------------------------------------------------
# existing progress fields
self.progress["subtomo_total_projections"] = int(
self.tomo_angle_range / self.tomo_angle_stepsize
)
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = (
self.tomo_angle_range / self.tomo_angle_stepsize
) * 8
self.progress["angle"] = angle
# finally do the scan at this angle
self._tomo_scan_at_angle(angle, subtomo_number)
@scan_repeat(max_repeats=10, default=True)
def _tomo_scan_at_angle(self, angle, subtomo_number):
if 0 <= angle < self.tomo_angle_range + 0.05:
self.progress["heartbeat"] = datetime.datetime.now().isoformat()
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
[docs]
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""start a tomo scan"""
if not self._check_eye_out_and_optics_in():
print(
"Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT."
)
if self.OMNYTools.yesno("Shall I continue?", "n"):
print("OK")
else:
print("Stopping.")
return
self.flomnigui_show_progress()
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
bec.builtin_actors.scan_interlock.trigger_setting = "restart_scan"
bec.builtin_actors.scan_interlock.enabled = True
self._current_special_angles = self.special_angles.copy()
# a new tomo scan was started
if (
(self.tomo_type == 1 and subtomo_start == 1 and start_angle is None)
or (self.tomo_type == 2 and projection_number == None)
or (self.tomo_type == 3 and projection_number == None)
):
# pylint: disable=undefined-variable
if bec.active_account != "":
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account,
bec.queue.next_scan_number,
"flomni",
"test additional info",
"BEC",
)
self.write_pdf_report()
else:
self.tomo_id = 0
self.write_pdf_report()
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
# reset stale estimates from any previous scan, otherwise the GUI
# would keep showing a leftover ETA from before this scan has
# accumulated enough projections to compute a fresh one
self.progress["estimated_remaining_time"] = None
self.progress["estimated_finish_time"] = None
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
# 8 equally spaced sub-tomograms
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
elif self.tomo_type == 2:
# Golden ratio tomography
previous_subtomo_number = -1
if projection_number == None:
ii = 0
else:
ii = projection_number
while True:
angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1)
if previous_subtomo_number != subtomo_number:
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["tomo_type"] = "Golden ratio tomography"
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
if self.golden_ratio_bunch_size > 0:
self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.golden_ratio_bunch_size
)
else:
self.progress["subtomo_total_projections"] = 0
self.progress["subtomo_projection"] = 0
if self.golden_max_number_of_projections > 0:
self.progress["total_projections"] = self.golden_max_number_of_projections
else:
self.progress["total_projections"] = 0
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
ii > self.golden_max_number_of_projections
and self.golden_max_number_of_projections > 0
):
print(
f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections"
)
break
elif self.tomo_type == 3:
# Equally spaced tomography, golden ratio starting angle
previous_subtomo_number = -1
if projection_number == None:
ii = 0
else:
ii = projection_number
while True:
angle, subtomo_number = self._golden_equally_spaced(
ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0
)
if previous_subtomo_number != subtomo_number:
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["tomo_type"] = (
"Equally spaced tomography, golden ratio starting angle"
)
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"]
)
if self.golden_max_number_of_projections > 0:
self.progress["total_projections"] = self.golden_max_number_of_projections
else:
self.progress["total_projections"] = 0
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
ii > self.golden_max_number_of_projections
and self.golden_max_number_of_projections > 0
):
print(
f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections"
)
break
else:
raise FlomniError("undefined tomo type")
self.progress["projection"] = self.progress["total_projections"]
self.progress["subtomo_projection"] = self.progress["subtomo_total_projections"]
self._print_progress()
self.OMNYTools.printgreenbold("Tomoscan finished")
@staticmethod
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string, e.g. '2h 03m 15s'."""
seconds = int(seconds)
h, remainder = divmod(seconds, 3600)
m, s = divmod(remainder, 60)
if h > 0:
return f"{h}h {m:02d}m {s:02d}s"
if m > 0:
return f"{m}m {s:02d}s"
return f"{s}s"
def _print_progress(self):
# --- compute and store estimated remaining time -----------------------
start_str = self.progress.get("tomo_start_time")
projection = self.progress["projection"]
total = self.progress["total_projections"]
if start_str is not None and total > 0 and projection > 9:
now = datetime.datetime.now()
elapsed = (now - datetime.datetime.fromisoformat(start_str)).total_seconds()
rate = projection / elapsed # projections per second
remaining_s = (total - projection) / rate
self.progress["estimated_remaining_time"] = remaining_s
eta_str = self._format_duration(remaining_s)
finish_dt = now + datetime.timedelta(seconds=remaining_s)
self.progress["estimated_finish_time"] = finish_dt.isoformat()
finish_str = finish_dt.strftime("%Y-%m-%d %H:%M:%S")
else:
eta_str = "N/A"
finish_str = "N/A"
# ----------------------------------------------------------------------
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']:.0f}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}")
print(f"Estimated remaining time: ........ {eta_str}")
print(f"Estimated finish time: ........... {finish_str}\x1b[0m")
self._flomnigui_update_progress()
[docs]
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
"""Add a sample to the omny sample database. This also retrieves the tomo id."""
return self.tomo_id_manager.register(
sample_name=samplename,
date=date,
eaccount=eaccount,
scan_number=scan_number,
setup=setup,
additional_info=sample_additional_info,
user=user,
)
def _at_each_angle(self, angle: float) -> None:
if "flomni_at_each_angle" in builtins.__dict__:
# pylint: disable=undefined-variable
flomni_at_each_angle(self, angle)
return
if self.single_point_instead_of_fermat_scan:
self.tomo_acquire_at_angle(angle)
return
self.tomo_scan_projection(angle)
def _golden(self, ii, howmany_sorted, maxangle, reverse=False):
"""returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number"""
golden = []
# occupy array with the range of golden angles
for iji in range(
(ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1
):
golden.append(
((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000
)
golden.sort()
subtomo_number = int(ii / howmany_sorted) + 1
if reverse and not subtomo_number % 2:
golden.reverse()
return (golden[ii % howmany_sorted], subtomo_number)
def _golden_equally_spaced(
self, ii, number_of_projections_per_subtomo, maxangle, reverse, verbose
):
"""returns angles for equally spaced tomography with starting angles of sub tomograms shifted according to golden ratio"""
"""ii is projection number starting at 1, reverse will execute the even sub tomograms in reverse direction"""
# ii is projection number starting at 1
angular_step = maxangle / number_of_projections_per_subtomo
subtomo_number = int(((ii - 1) * angular_step) / maxangle) + 1
start_angle = self._golden(subtomo_number - 1, 1, angular_step)
projection_number_of_subtomo = (
ii - (subtomo_number - 1) * number_of_projections_per_subtomo
) - 1
if reverse:
if subtomo_number % 2:
angle = start_angle[0] + projection_number_of_subtomo * angular_step
else:
angle = (
start_angle[0]
+ (number_of_projections_per_subtomo - 1) * angular_step
- projection_number_of_subtomo * angular_step
)
else:
angle = start_angle[0] + projection_number_of_subtomo * angular_step
if verbose:
print(
f"Equally spaced golden ratio tomography.\nAngular step: {angular_step}\nSubtomo Number: {subtomo_number}\nAngle: {angle}"
)
return angle, subtomo_number
[docs]
def tomo_reconstruct(self, base_path="~/data/raw/logs/reconstruction_queue"):
"""write the tomo reconstruct file for the reconstruction queue"""
bec = builtins.__dict__.get("bec")
self.reconstructor.write(
scan_list=self._current_scan_list,
next_scan_number=bec.queue.next_scan_number,
base_path=base_path,
)
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser("~/data/raw/logs/tomography_scannumbers.txt")
with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable
out_file.write(
f"{scan_number} {angle} {dev.fsamroy.read()['fsamroy']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {self.sample_name}\n"
)
def tomo_scan_projection(self, angle: float):
dev.rtx.controller.laser_tracker_check_signalstrength()
scans = builtins.__dict__.get("scans")
# additional_correction = self.align.compute_additional_correction(angle)
# additional_correction_2 = self.align.compute_additional_correction_2(angle)
# correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle)
offsets = self.get_alignment_offset(angle)
sum_offset_x = offsets[0]
sum_offset_y = (
offsets[1]
- self.compute_additional_correction_y(angle)
- self.compute_additional_correction_y_2(angle)
+ self.manual_shift_y
)
sum_offset_z = offsets[2]
self._current_scan_list = []
for stitch_x in range(-self.stitch_x, self.stitch_x + 1):
for stitch_y in range(-self.stitch_y, self.stitch_y + 1):
# pylint: disable=undefined-variable
corridor_size = self.corridor_size if self.corridor_size > 0 else None
self._current_scan_list.append(bec.queue.next_scan_number)
cenx = sum_offset_x + stitch_x * (self.fovx - self.tomo_stitch_overlap)
ceny = sum_offset_y + stitch_y * (self.fovy - self.tomo_stitch_overlap)
logger.info(
f"scans.flomni_fermat_scan(fovx={self.fovx}, fovy={self.fovy},"
f" step={self.tomo_shellstep}, cenx={cenx}, ceny={ceny},"
f" zshift={sum_offset_z}, angle={angle},"
f" exp_time={self.tomo_countingtime}, corridor_size={corridor_size})"
)
log_message = (
f"{str(datetime.datetime.now())}: flomni scan projection at angle {angle}, scan"
f" number {bec.queue.next_scan_number}.\n"
)
scan_kwargs = dict(
fovx=self.fovx,
fovy=self.fovy,
step=self.tomo_shellstep,
cenx=cenx,
ceny=ceny,
zshift=sum_offset_z,
angle=angle,
exp_time=self.tomo_countingtime,
frames_per_trigger=self.frames_per_trigger,
)
if self.corridor_size > 0:
scan_kwargs["corridor_size"] = self.corridor_size
scans.flomni_fermat_scan(**scan_kwargs)
self.tomo_reconstruct()
[docs]
def tomo_acquire_at_angle(self, angle: float, frames_per_trigger: int | None = None):
"""
Move fsamroy to `angle`, then move rtx/rty/rtz to the alignment-corrected
scan center (same alignment-offset logic as tomo_scan_projection, but
without stitching), and acquire a single frame or a burst via
scans.acquire instead of running a fermat scan.
This mirrors the positioning sequence used internally by
flomni_fermat_scan (rotation, then rtx/rty/rtz with laser-tracker
on/check/move-to-region), but executes it as plain blocking
client-side calls, since this runs in the BEC client, not on the
scan server.
Args:
angle (float): rotation angle [deg] to move fsamroy to.
frames_per_trigger (int, optional): number of burst frames for
this acquisition. Defaults to self.frames_per_trigger.
"""
scans = builtins.__dict__.get("scans")
# --- rotation ---
fsamroy_current_setpoint = dev.fsamroy.user_setpoint.get()
if angle != fsamroy_current_setpoint:
umv(dev.fsamroy, angle)
else:
print("No rotation required")
# --- alignment offset (same as tomo_scan_projection, no stitching) ---
offsets = self.get_alignment_offset(angle)
sum_offset_x = offsets[0]
sum_offset_y = (
offsets[1]
- self.compute_additional_correction_y(angle)
- self.compute_additional_correction_y_2(angle)
+ self.manual_shift_y
)
sum_offset_z = offsets[2]
# --- positioning + laser tracker, mirroring
# flomni_fermat_scan._prepare_setup_part2 ---
dev.rtx.controller.laser_tracker_on()
umv(dev.rtx, sum_offset_x, dev.rty, sum_offset_y, dev.rtz, sum_offset_z)
tracker_signal = dev.rtx.controller.laser_tracker_check_signalstrength()
# checks that the fsamx coarse stage is at a position that leaves
# sufficient piezo range on the fine (rtx) stage
dev.rtx.controller.move_samx_to_scan_region(sum_offset_x)
if tracker_signal == "low":
logger.warning(
"Signal strength of the laser tracker is low. Realignment recommended!"
)
elif tracker_signal == "toolow":
raise FlomniError(
"Signal strength of the laser tracker is too low for scanning. Realignment required!"
)
# --- acquire ---
n_frames = (
frames_per_trigger if frames_per_trigger is not None else self.frames_per_trigger
)
scans.acquire(exp_time=self.tomo_countingtime, frames_per_trigger=n_frames)
[docs]
def tomo_parameters(self):
"""print and update the tomo parameters"""
print("Current settings:")
print(f"Counting time <ctime> = {self.tomo_countingtime} s")
print(f"Stepsize microns <step> = {self.tomo_shellstep}")
print(f"FOV (200/100) <microns> = {self.fovx}, {self.fovy}")
print(f"Stitching number x,y = {self.stitch_x}, {self.stitch_y}")
print(f"Stitching overlap = {self.tomo_stitch_overlap}")
print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}")
print(f" _manual_shift_y <um> = {self.manual_shift_y}")
print(f"Frames per trigger (burst) = {self.frames_per_trigger}")
print(f"Single point instead of fermat = {self.single_point_instead_of_fermat_scan}")
print("")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
print(f"Angular range = {self.tomo_angle_range} degrees")
print(f"Total number of projections: {(self.tomo_angle_range/self.tomo_angle_stepsize)*8}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(
"Angular step of the final (combined) tomogram:"
f" {self.tomo_angle_range/((self.tomo_angle_range/self.tomo_angle_stepsize)*8)} degrees"
)
if self.tomo_type == 2:
print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography")
print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}")
if self.golden_max_number_of_projections > 0:
print(f"ending after {self.golden_max_number_of_projections} projections")
else:
print("ending by manual interruption")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"repeating prjections at 0 degrees at the beginning of every second subtomogram"
)
if self.tomo_type == 3:
print(
"\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle"
)
print(f"Number of projections per sub-tomogram: {180/self.tomo_angle_stepsize}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
if self.golden_max_number_of_projections > 0:
print(f"ending after {self.golden_max_number_of_projections} projections")
else:
print("ending by manual interruption")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"repeating prjections at 0 degrees at the beginning of every second subtomogram"
)
print(f"\nSample name: {self.sample_name}\n")
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("... excellent!")
else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)
self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float)
self.fovx = self._get_val("<FOV X (max 200)> um", self.fovx, float)
self.fovy = self._get_val("<FOV Y (max 100)> um", self.fovy, float)
if self.single_point_instead_of_fermat_scan:
print(
"Stitching is disabled while single point instead of fermat scan is"
" active; stitch X/Y forced to 0."
)
self.stitch_x = 0
self.stitch_y = 0
else:
self.stitch_x = self._get_val("<stitch X>", self.stitch_x, int)
self.stitch_y = self._get_val("<stitch Y>", self.stitch_y, int)
self.ptycho_reconstruct_foldername = self._get_val(
"Reconstruction queue ", self.ptycho_reconstruct_foldername, str
)
self.frames_per_trigger = self._get_val(
"Frames per trigger (burst)", self.frames_per_trigger, int
)
self.single_point_instead_of_fermat_scan = bool(
self._get_val(
"Single point instead of fermat scan (acquire at angle) 1/0?",
int(self.single_point_instead_of_fermat_scan),
int,
)
)
if self.single_point_instead_of_fermat_scan and (
self.stitch_x != 0 or self.stitch_y != 0
):
print(
"Stitching is not supported with single point instead of fermat scan;"
" stitch X/Y forced to 0."
)
self.stitch_x = 0
self.stitch_y = 0
print("Tomography type:")
print(" 1: 8 equally spaced sub-tomograms")
print(" 2: Golden ratio tomography")
print(" 3: Equally spaced tomography, golden ratio starting angle")
self.tomo_type = self._get_val("Tomography type", self.tomo_type, int)
if self.tomo_type == 1:
self.tomo_angle_range = self._get_val(
"Angular range (180 or 360)", self.tomo_angle_range, int
)
tomo_numberofprojections = self._get_val(
"Total number of projections",
(self.tomo_angle_range / self.tomo_angle_stepsize) * 8,
int,
)
self.tomo_angle_stepsize = (self.tomo_angle_range / tomo_numberofprojections) * 8
print(
f"The angular step within a sub-tomogram will be {self.tomo_angle_stepsize} degrees"
)
print(
"The angular step of the final (combined) tomogram will be"
f" {self.tomo_angle_range / tomo_numberofprojections} degrees"
)
if self.tomo_type == 2:
self.golden_ratio_bunch_size = self._get_val(
"Number of projections sorted per bunch (default 20)",
self.golden_ratio_bunch_size,
int,
)
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (zero for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0 ?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
if self.tomo_type == 3:
numprj = self._get_val(
"Number of projections per sub-tomogram",
int(180 / self.tomo_angle_stepsize),
int,
)
self.tomo_angle_stepsize = 180 / numprj
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (zero for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
@staticmethod
def _get_val(msg: str, default_value, data_type):
return data_type(input(f"{msg} ({default_value}): ") or default_value)
def rt_off(self):
dev.rtx.enabled = False
dev.rty.enabled = False
dev.rtz.enabled = False
def rt_on(self):
dev.rtx.enabled = True
dev.rty.enabled = True
dev.rtz.enabled = True
if dev.rtx.enabled == True:
print("rt is enabled")
else:
print("failed to enable rt")
[docs]
def write_pdf_report(self):
"""create and write the pdf report with the current flomni settings"""
dev = builtins.__dict__.get("dev")
# header = ""
header = (
" \n" * 3
+ " .d888 888 .d88888b. 888b d888 888b 888 8888888 \n"
+ ' d88P" 888 d88P" "Y88b 8888b d8888 8888b 888 888 \n'
+ " 888 888 888 888 88888b.d88888 88888b 888 888 \n"
+ " 888888 888 888 888 888Y88888P888 888Y88b 888 888 \n"
+ " 888 888 888 888 888 Y888P 888 888 Y88b888 888 \n"
+ " 888 888 888 888 888 Y8P 888 888 Y88888 888 \n"
+ ' 888 888 Y88b. .d88P 888 " 888 888 Y8888 888 \n'
+ ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n'
)
padding = 20
fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}"
stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}"
dataset_id = str(self.client.queue.next_dataset_number)
account = bec.active_account
content = [
f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n",
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(account):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int((self.tomo_angle_range / self.tomo_angle_stepsize) * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int((self.tomo_angle_range / self.tomo_angle_stepsize) * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}To be implemented\n",
# f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
]
content = "".join(content)
user_target = os.path.expanduser(
f"~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf"
)
with PDFWriter(user_target) as file:
file.write(header)
file.write(content)
# subprocess.run(
# "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
# )
# status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True)
# msg = bec.tomo_progress.tomo_progressMessage()
# logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
# msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
# ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "flOMNI", self.sample_name]
# )
# self.client.tomo_progress.send_tomo_progress_message("~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf").send()
import csaxs_bec
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
logo_file_rel = "flOMNI.png"
# Build the absolute path correctly
logo_file = (
csaxs_bec_basepath.parent / "bec_ipython_client" / "plugins" / "flomni" / logo_file_rel
).resolve()
print(logo_file)
bec.messaging.scilog.new().add_attachment(logo_file, width=200).add_text(
content.replace("\n", "<br>")
).add_tags("tomoscan").send()
if __name__ == "__main__":
import builtins
from bec_ipython_client import BECIPythonClient
bec = BECIPythonClient()
bec.start()
scans = bec.scans
dev = bec.device_manager.devices
builtins.__dict__["scans"] = scans
builtins.__dict__["dev"] = dev
builtins.__dict__["bec"] = bec
builtins.__dict__["umv"] = umv
flomni = Flomni(bec)
flomni.start_x_ray_eye_alignment()