Source code for csaxs_bec.bec_ipython_client.plugins.flomni.flomni

import builtins
import datetime
import os
import subprocess
import time
from pathlib import Path

import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from bec_lib.scan_repeat import scan_repeat
from typeguard import typechecked

from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
    OMNYTools,
    PtychoReconstructor,
    TomoIDManager,
)

# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
#        FlomniWebpageGenerator,
#       VERBOSITY_SILENT,   # 0 — no output
#       VERBOSITY_NORMAL,   # 1 — startup / stop messages only  (default)
#       VERBOSITY_VERBOSE,  # 2 — one-line summary per cycle
#       VERBOSITY_DEBUG,    # 3 — full JSON payload per cycle
#    )

logger = bec_logger.logger

if builtins.__dict__.get("bec") is not None:
    bec = builtins.__dict__.get("bec")
    dev = builtins.__dict__.get("dev")
    scans = builtins.__dict__.get("scans")


[docs] def umv(*args): return scans.umv(*args, relative=False)
[docs] class FlomniToolsError(Exception): pass
[docs] class FlomniInitError(Exception): pass
[docs] class FlomniError(Exception): pass
# class FlomniTools: # def yesno(self, message: str, default="none", autoconfirm=0) -> bool: # if autoconfirm and default == "y": # self.printgreen(message + " Automatically confirming default: yes") # return True # elif autoconfirm and default == "n": # self.printgreen(message + " Automatically confirming default: no") # return False # if default == "y": # message_ending = " [Y]/n? " # elif default == "n": # message_ending = " y/[N]? " # else: # message_ending = " y/n? " # while True: # user_input = input(self.OKBLUE + message + message_ending + self.ENDC) # if ( # user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes" # ) or (default == "y" and user_input == ""): # return True # if ( # user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No" # ) or (default == "n" and user_input == ""): # return False # else: # print("Please expicitely confirm y or n.")
[docs] class FlomniInitStagesMixin: def flomni_init_stages(self): if self.OMNYTools.yesno("Starting initialization of flOMNI stages. OK?"): print("staring...") else: return if self.check_all_axes_of_fomni_referenced(): if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"): print("ok then...") else: return sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage") sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip()) if not np.isclose(sensor_voltage, sensor_voltage_target, 0.25): print( f"Sensor voltage of the gripper is {sensor_voltage}, while target from config is {sensor_voltage_target}" ) print( "Verify that the value is acceptable and update config file. Reload config and start again." ) return print("Starting to drive ftransy to +y limit") self.drive_axis_to_limit(dev.ftransy, "forward") dev.ftransy.limits = [-100, 0] print("done") print("Starting to drive ftransz to -z limit") self.drive_axis_to_limit(dev.ftransz, "reverse") dev.ftransz.limits = [0, 145] print("done") print("Starting to drive ftransx to -x limit") self.drive_axis_to_limit(dev.ftransx, "reverse") dev.ftransx.limits = [0, 50] print("done") print("Starting to drive feyey to +y limit") self.drive_axis_to_limit(dev.feyey, "forward") dev.feyey.limits = [-1, -10] print("done") print("Starting to drive feyex to +x limit") self.drive_axis_to_limit(dev.feyex, "forward") dev.feyex.limits = [-30, -1] print("done") if self.OMNYTools.yesno( "Init of foptz. Can the stage move to the upstream limit without collision?" ): print("OK. continue.") else: return print("Starting to drive foptz to -z limit") self.drive_axis_to_limit(dev.foptz, "reverse") dev.foptz.limits = [0, 27] print("done") print("Init of Smaract stages") ## smaract stages max_repeat = 100 repeat = 0 axis_id_fosaz = dev.fosaz._config["deviceConfig"].get("axis_Id") axis_id_numeric_fosaz = self.axis_id_to_numeric(axis_id_fosaz) print("Moving fosaz upstream into the light curtain") while True: curtain_is_triggered = dev.foptz.controller.fosaz_light_curtain_is_triggered() if curtain_is_triggered: break if repeat > max_repeat: raise FlomniInitError("Failed to initialize fosaz within 100 repeats.") dev.fosaz.controller.move_open_loop_steps( axis_id_numeric_fosaz, -500, amplitude=4000, frequency=2000 ) time.sleep(1) repeat += 1 print("Finding index of fosax, fosay, fosaz") for ii in range(3): dev.fosax.controller.find_reference_mark(ii, 0, 1000, 1) time.sleep(1) dev.fosax.limits = [10.2, 10.6] dev.fosay.limits = [-3.1, -2.9] dev.fosaz.limits = [-6, -4] # dev.fosax.controller.describe() print("Moving fosa stages to approximate beam positions") umv(dev.fosaz, -5) umv(dev.fosax, 10.4, dev.fosay, -3) print("done") print("Moving fheater to +y limit") self.drive_axis_to_limit(dev.fheater, "reverse") dev.fheater.limits = [-15, 0] print("done") print("Moving fsamy to -y limit") self.drive_axis_to_limit(dev.fsamy, "reverse") dev.fsamy.limits = [2, 3.1] print("done") if self.OMNYTools.yesno( "Init of tracking stages. Did you remove the outer laser flight tubes?" ): print("OK. continue.") else: print("Stopping.") return print("Moving tracky to -y limit") self.drive_axis_to_limit(dev.ftracky, "reverse") dev.ftracky.limits = [2.2, 2.8] print("done") print("Moving ftrackz to -z limit") self.drive_axis_to_limit(dev.ftrackz, "reverse") dev.ftrackz.limits = [4.5, 5.5] print("done") if self.OMNYTools.yesno("Init of sample stage. Is the piezo at about 0 deg?"): print("OK. continue.") else: print("Stopping.") return print("Moving fsamx to +x limit") self.drive_axis_to_limit(dev.fsamx, "forward") dev.fsamx.limits = [-162, 0] print("done") print("Moving ftray to IN limit") self.drive_axis_to_limit(dev.ftray, "reverse") dev.ftray.limits = [-200, 0] print("done") print("Initializing UPR stage.") if self.OMNYTools.yesno( "To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?" ): print("OK. continue.") else: print("Stopping.") return while True: low_limit, high_limit = dev.fsamroy.controller.get_motor_limit_switch("A") if not high_limit: print("Please push limit switch to the left.") time.sleep(1) continue break while True: low_limit, high_limit = dev.fsamroy.controller.get_motor_limit_switch("A") if not low_limit: print("Please push limit switch to the right.") time.sleep(1) continue break if self.OMNYTools.yesno("Shall I start the index search?"): print("OK. continue.. Starting index search.") else: print("Stopping.") return if dev.fsamroy.controller.is_motor_on("A"): raise FlomniInitError("fsamroy should be off. Something is wrong. Mirko... help!") dev.fsamroy.controller.socket_put_confirmed("XQ#MOTON") dev.fsamroy.enabled = False time.sleep(5) dev.fsamroy.enabled = True time.sleep(2) dev.fsamroy.controller.socket_put_confirmed("XQ#REFAX") while not dev.fsamroy.controller.all_axes_referenced(): print("Waiting for fsamroy to be referenced.") time.sleep(1) dev.fsamroy.limits = [-5, 365] print("done") if self.OMNYTools.yesno( "Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!" ): print("OK. continue.") else: print("Stopping.") return print("Moving foptx to +x limit") self.drive_axis_to_limit(dev.foptx, "forward") dev.foptx.limits = [-16, -14] print("done") axis_id_fopty = dev.fopty._config["deviceConfig"].get("axis_Id") while True: low_limit, high_limit = dev.fopty.controller.get_motor_limit_switch(axis_id_fopty) if not low_limit: print( "To ensure that the fopty end switch works, please push it down and hold it for" " about 1 second." ) time.sleep(1) continue break if self.OMNYTools.yesno("Start limit switch search of fopty?"): print("OK. continue.") else: print("Stopping.") return print("Moving fopty to -y limit") self.drive_axis_to_limit(dev.fopty, "reverse") dev.fopty.limits = [0, 4] print("done") dev.fsamx.controller.galil_show_all() self.set_limits() self._align_setup() def check_all_axes_of_fomni_referenced(self) -> bool: if ( dev.fosax.controller.axis_is_referenced(0) & dev.fosax.controller.axis_is_referenced(1) & dev.fosax.controller.axis_is_referenced(2) & dev.fsamx.controller.all_axes_referenced() & dev.feyex.controller.all_axes_referenced() & dev.fsamroy.controller.all_axes_referenced() & dev.fsamroy.controller.is_motor_on("A") ): print("All axes of flomni are referenced.") return True else: return False def set_limits(self): if self.OMNYTools.yesno("Set default limits for flOMNI?"): print("setting limits...") else: print("Stopping.") return dev.ftransy.limits = [-100, 0] dev.ftransz.limits = [0, 145] dev.ftransx.limits = [0, 50] dev.ftray.limits = [-200, 0] dev.fsamy.limits = [2, 3.5] dev.foptz.limits = [22.5, 28] dev.foptx.limits = [-17, -12] dev.fheater.limits = [-15, 0] dev.feyex.limits = [-18, -1] dev.feyey.limits = [-12, -1] dev.fopty.limits = [0, 4] dev.fosax.limits = [7, 10] dev.fosay.limits = [-4.2, 7] dev.fosaz.limits = [-6.5, 7.5] # dev.rtx.limits = [-220, 220] # dev.rty.limits = [-180, 180] # dev.rtz.limits = [-220, 220] dev.fsamroy.limits = [-5, 365] dev.ftracky.limits = [2.2, 2.8] dev.ftrackz.limits = [4.5, 5.5] def _align_setup(self): if self.OMNYTools.yesno("Start moving stages to default initial positions?", "y"): print("Start moving stages...") else: print("Stopping.") return dev.rtx.controller.feedback_disable() # positions for optics out and 50 mm distance to sample umv(dev.ftrackz, 4.73, dev.ftracky, 2.5170, dev.foptx, -14.3, dev.fopty, 3.87) # the fopty 3.87 should put us in place for a lower FZP on the lower FZP chip umv(dev.foptz, 23) flomni_samx_in = dev.fsamx.user_parameter.get("in") if flomni_samx_in is None: raise FlomniInitError( "Could not find a fsamx in position. Please check your device config." ) umv(dev.fsamx, flomni_samx_in) flomni_samy_in = dev.fsamy.user_parameter.get("in") if flomni_samy_in is None: raise FlomniInitError( "Could not find a fsamy in position. Please check your device config." ) umv(dev.fsamy, flomni_samy_in) # after init reduce vertical stage speed dev.fsamy.controller.socket_put_confirmed("axspeed[5]=20000") umv(dev.feyey, -8)
[docs] class FlomniSampleTransferMixin: def ensure_osa_back(self): dev.fosaz.limits = [-12.6, -12.4] umv(dev.fosaz, -12.5) curtain_is_triggered = dev.fheater.controller.fosaz_light_curtain_is_triggered() if not curtain_is_triggered: raise FlomniError("Fosaz did not reach light curtain") def move_fheater_up(self): self.ensure_fheater_up() def ensure_fheater_up(self): axis_id = dev.fheater._config["deviceConfig"].get("axis_Id") axis_id_numeric = self.axis_id_to_numeric(axis_id) low, high = dev.fheater.controller.get_motor_limit_switch(axis_id) if high: raise FlomniError("fheater in high limit. How did we get here?? Aborting.") if not low: self.ensure_osa_back() if dev.fheater.readback.get() < -0.2: umv(dev.fheater, -0.2) dev.fheater.controller.drive_axis_to_limit(axis_id_numeric, "reverse") def move_fheater_down(self): axis_id = dev.fheater._config["deviceConfig"].get("axis_Id") axis_id_numeric = self.axis_id_to_numeric(axis_id) self.ensure_osa_back() fsamx_in = dev.fsamx.user_parameter.get("in") if not np.isclose(dev.fsamx.readback.get(), fsamx_in, 0.2): raise FlomniError("fsamx not in position. Aborting.") fheater_in = dev.fheater.user_parameter.get("in") umv(dev.fheater, fheater_in) def ensure_gripper_up(self): axis_id = dev.ftransy._config["deviceConfig"].get("axis_Id") axis_id_numeric = self.axis_id_to_numeric(axis_id) low, high = dev.ftransy.controller.get_motor_limit_switch(axis_id) if low: raise FlomniError("Ftransy in low limit. How did we get here?? Aborting.") if high: return if dev.ftransy.readback.get() < -0.5: umv(dev.ftransy, -0.5) dev.ftransy.controller.drive_axis_to_limit(axis_id_numeric, "forward") def check_tray_in(self): axis_id = dev.ftray._config["deviceConfig"].get("axis_Id") low, high = dev.ftray.controller.get_motor_limit_switch(axis_id) if high: raise FlomniError("Ftray is in the 'OUT' position. Aborting.") if not low: raise FlomniError("Ftray is not at the 'IN' position. Aborting.") def ftransfer_flomni_stage_in(self): time.sleep(1) sample_in_position = dev.flomni_samples.is_sample_slot_used(0) # bool(float(dev.flomni_samples.sample_placed.sample0.get())) if not sample_in_position: raise FlomniError("There is no sample in the sample stage. Aborting.") self.reset_correction() dev.rtx.controller.feedback_disable() self.ensure_fheater_up() self.ensure_gripper_up() self.check_tray_in() fsamx_in = dev.fsamx.user_parameter.get("in") umv(dev.fsamx, fsamx_in) dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4] print("Moving X-ray eye in.") if self.OMNYTools.yesno( "Please confirm that this is ok with the flight tube. This check is to be removed after commissioning", "n", ): print("OK. continue.") else: print("Stopping.") raise FlomniError("Manual abort of x-ray eye in.") self.feye_in() print("Moving X-ray optics out.") self.foptics_out() self.xrayeye_update_frame() def laser_tracker_show_all(self): dev.rtx.controller.laser_tracker_show_all() def laser_tracker_on(self): dev.rtx.controller.laser_tracker_on() time.sleep(0.2) dev.rtx.controller.laser_tracker_check_signalstrength() def laser_tracker_off(self): dev.rtx.controller.laser_tracker_off() def show_signal_strength_interferometer(self): dev.rtx.controller.show_signal_strength_interferometer() def feedback_disable(self): self.device_manager.devices.rtx.controller.feedback_disable() def feedback_enable_with_reset(self): self.device_manager.devices.rtx.controller.feedback_enable_with_reset() self.feedback_status() def feedback_enable_without_reset(self): self.device_manager.devices.rtx.controller.feedback_enable_without_reset() self.feedback_status() def feedback_status(self): feedback_status = self.device_manager.devices.rtx.controller.feedback_is_running() if feedback_status == True: print("The rt feedback is \x1b[92mrunning\x1b[0m.") else: print("The rt feedback is \x1b[91mNOT\x1b[0m running.") def lights_off(self): self.device_manager.devices.fsamx.controller.lights_off() def lights_on(self): self.device_manager.devices.fsamx.controller.lights_on() def ftransfer_flomni_stage_out(self): self.flomnigui_show_cameras() self.flomnigui_raise() target_pos = -162 if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01): return umv(dev.fsamroy, 0) self.feedback_disable() self.ensure_fheater_up() self.ensure_gripper_up() self.check_tray_in() self.laser_tracker_off() time.sleep(0.05) fsamy_in = dev.fsamy.user_parameter.get("in") if fsamy_in is None: raise FlomniError( "Could not find an 'IN' position for fsamy. Please check your config." ) umv(dev.fsamy, fsamy_in) time.sleep(0.05) self.laser_tracker_on() time.sleep(0.05) self.laser_tracker_off() time.sleep(0.05) self.drive_axis_to_limit(dev.fsamx, "forward") dev.fsamx.limits = [-162, 0] dev.fsamx.controller.socket_put_confirmed("axspeed[4]=25*stppermm[4]") umv(dev.fsamx, target_pos) def check_sensor_connected(self): sensor_voltage_target = dev.ftransy.user_parameter.get("sensor_voltage") sensor_voltage = float(dev.ftransy.controller.socket_put_and_receive("MG@AN[1]").strip()) if not np.isclose(sensor_voltage, sensor_voltage_target, 0.5): raise FlomniError(f"Sensor voltage is {sensor_voltage}, indicates an error. Aborting.") def ftransfer_get_sample(self, position: int): self.check_position_is_valid(position) self.check_tray_in() self.check_sensor_connected() sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() if sample_in_gripper: raise FlomniError( "The gripper does carry a sample. Cannot proceed getting another sample." ) sample_in_position = dev.flomni_samples.is_sample_slot_used(position) if not sample_in_position: raise FlomniError(f"The planned pick position [{position}] does not have a sample.") self.flomnigui_show_cameras() self.ftransfer_gripper_move(position) self.ftransfer_controller_enable_mount_mode() if position == 0: sample_height = -45 + dev.fsamy.user_parameter.get("in") else: sample_height = -17.5 dev.ftransy.controller.socket_put_confirmed(f"getaprch={sample_height:.1f}") dev.ftransy.controller.socket_put_confirmed("XQ#GRGET,3") print("The unmount process started.") time.sleep(1) while True: in_progress = bool( float(dev.ftransy.controller.socket_put_and_receive("MG mntprgs").strip()) ) if not in_progress: break self.ftransfer_confirm() time.sleep(1) self.ftransfer_controller_disable_mount_mode() self.ensure_gripper_up() signal_name = getattr(dev.flomni_samples.sample_names, f"sample{position}") self.flomni_modify_storage_non_interactive(100, 1, signal_name.get()) self.flomni_modify_storage_non_interactive(position, 0, "-") def ftransfer_show_all(self): dev.flomni_samples.show_all() def ftransfer_put_sample(self, position: int): self.check_position_is_valid(position) self.check_tray_in() self.check_sensor_connected() sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() # bool(float(dev.flomni_samples.sample_in_gripper.get())) if not sample_in_gripper: raise FlomniError("The gripper does not carry a sample.") sample_in_position = dev.flomni_samples.is_sample_slot_used(position) if sample_in_position: raise FlomniError(f"The planned put position [{position}] already has a sample.") self.flomnigui_show_cameras() self.ftransfer_gripper_move(position) self.ftransfer_controller_enable_mount_mode() if position == 0: sample_height = -45 + dev.fsamy.user_parameter.get("in") else: sample_height = -17.5 dev.ftransy.controller.socket_put_confirmed(f"mntaprch={sample_height:.1f}") dev.ftransy.controller.socket_put_confirmed("XQ#GRPUT,3") print("The mount process started.") time.sleep(1) while True: in_progress = bool( float(dev.ftransy.controller.socket_put_and_receive("MG mntprgs").strip()) ) if not in_progress: break self.ftransfer_confirm() time.sleep(1) self.ftransfer_controller_disable_mount_mode() self.ensure_gripper_up() sample_name = dev.flomni_samples.sample_in_gripper_name.get() self.flomni_modify_storage_non_interactive(100, 0, "-") self.flomni_modify_storage_non_interactive(position, 1, sample_name) if position == 0: self.ftransfer_flomni_stage_in() bec.queue.next_dataset_number += 1
[docs] def sample_get_name(self, position: int = 0) -> str: """ Get the name of the sample currently in the given position. """ signal_name = getattr(dev.flomni_samples.sample_names, f"sample{position}") return signal_name.get()
def ftransfer_sample_change(self, new_sample_position: int): self.check_tray_in() # sample_in_gripper = dev.flomni_samples.sample_in_gripper.get() sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() if sample_in_gripper: raise FlomniError("There is already a sample in the gripper. Aborting.") self.check_position_is_valid(new_sample_position) if new_sample_position == 0: raise FlomniError( "The new sample to place cannot be the sample in the sample stage. Aborting." ) # sample_placed = getattr( # dev.flomni_samples.sample_placed, f"sample{new_sample_position}" # ).get() sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position) if not sample_placed: raise FlomniError( f"There is currently no sample in position [{new_sample_position}]. Aborting." ) # sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get() sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0) if sample_in_sample_stage: # find a new home for the sample... empty_slots = [] # for name, val in dev.flomni_samples.read().items(): # if "flomni_samples_sample_placed_sample" not in name: # continue # if val.get("value") == 0: # empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1])) for j in range(1, 20): if not dev.flomni_samples.is_sample_slot_used(j): empty_slots.append(j) if not empty_slots: raise FlomniError("There are no empty slots available. Aborting.") print(f"The following slots are empty: {empty_slots}.") while True: user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}] ") if user_input.strip() == "": # No entry: use default user_input = empty_slots[0] break try: user_input = int(user_input) if user_input not in empty_slots: raise ValueError break except ValueError: print("Please specify a valid number.") continue self.check_position_is_valid(user_input) self.ftransfer_get_sample(0) self.ftransfer_put_sample(user_input) self.ftransfer_get_sample(new_sample_position) self.ftransfer_put_sample(0) def ftransfer_modify_storage(self, position: int, used: int): if used: name = input("What's the name of this sample? ") else: name = "-" self.flomni_modify_storage_non_interactive(position, used, name) def flomni_modify_storage_non_interactive(self, position: int, used: int, name: str): if position == 100: dev.flomni_samples.sample_in_gripper.set(used) dev.flomni_samples.sample_in_gripper_name.set(name) else: signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}") signal.set(used) signal_name = getattr(dev.flomni_samples.sample_names, f"sample{position}") signal_name.set(name) def check_position_is_valid(self, position: int): if 0 <= position < 21: return raise FlomniError( f"The given position number [{position}] is not in the valid range of 0-21. " ) def ftransfer_controller_enable_mount_mode(self): dev.ftransy.controller.socket_put_confirmed("XQ#MNTMODE") time.sleep(0.5) if not self.ftransfer_controller_in_mount_mode(): raise FlomniError("System not switched to mount mode. Aborting.") def ftransfer_controller_disable_mount_mode(self): dev.ftransy.controller.socket_put_confirmed("XQ#POSMODE") time.sleep(0.5) if self.ftransfer_controller_in_mount_mode(): raise FlomniError("System is still in mount mode. Aborting.") def ftransfer_controller_in_mount_mode(self) -> bool: in_mount_mode = bool( float(dev.ftransy.controller.socket_put_and_receive("MG mntmod").strip()) ) return in_mount_mode def ftransfer_confirm(self): confirm = int(float(dev.ftransy.controller.socket_put_and_receive("MG confirm").strip())) if confirm != -1: return if self.OMNYTools.yesno("All OK? Continue?", "y"): print("OK. continue.") dev.ftransy.controller.socket_put_confirmed("confirm=1") else: print("Stopping.") raise FlomniError("User abort sample transfer.") def ftransfer_gripper_is_open(self) -> bool: status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip())) return status def ftransfer_gripper_open(self): sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() # dev.flomni_samples.sample_in_gripper.get() if sample_in_gripper: raise FlomniError( "Cannot open gripper. There is still a sample in the gripper! Aborting." ) if not self.ftransfer_gripper_is_open(): dev.ftransy.controller.socket_put_confirmed("XQ#GROPEN,4") def ftransfer_gripper_close(self): if self.ftransfer_gripper_is_open(): dev.ftransy.controller.socket_put_confirmed("XQ#GRCLOS,4") def ftransfer_gripper_move(self, position: int): self.check_position_is_valid(position) # this is not used for sample stage position! self._ftransfer_shiftx = -0.15 self._ftransfer_shiftz = -0.5 fsamx_pos = dev.fsamx.readback.get() if position == 0 and fsamx_pos > -160: if self.OMNYTools.yesno( "May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y", ): print("OK. continue.") self.ftransfer_flomni_stage_out() else: print("Stopping.") return self.ensure_gripper_up() self.check_tray_in() if position == 0: umv(dev.ftransx, 11, dev.ftransz, 3.5950) if position == 1: umv( dev.ftransx, 41.900 + self._ftransfer_shiftx, dev.ftransz, 74.7500 + self._ftransfer_shiftz, ) if position == 2: umv( dev.ftransx, 31.900 + self._ftransfer_shiftx, dev.ftransz, 74.7625 + self._ftransfer_shiftz, ) if position == 3: umv( dev.ftransx, 21.900 + self._ftransfer_shiftx, dev.ftransz, 74.7750 + self._ftransfer_shiftz, ) if position == 4: umv( dev.ftransx, 11.900 + self._ftransfer_shiftx, dev.ftransz, 74.7875 + self._ftransfer_shiftz, ) if position == 5: umv( dev.ftransx, 1.9000 + self._ftransfer_shiftx, dev.ftransz, 74.8000 + self._ftransfer_shiftz, ) if position == 6: umv( dev.ftransx, 41.900 + self._ftransfer_shiftx, dev.ftransz, 89.7500 + self._ftransfer_shiftz, ) if position == 7: umv( dev.ftransx, 31.900 + self._ftransfer_shiftx, dev.ftransz, 89.7625 + self._ftransfer_shiftz, ) if position == 8: umv( dev.ftransx, 21.900 + self._ftransfer_shiftx, dev.ftransz, 89.7750 + self._ftransfer_shiftz, ) if position == 9: umv( dev.ftransx, 11.900 + self._ftransfer_shiftx, dev.ftransz, 89.7875 + self._ftransfer_shiftz, ) if position == 10: umv( dev.ftransx, 1.900 + self._ftransfer_shiftx, dev.ftransz, 89.8000 + self._ftransfer_shiftz, ) if position == 11: umv( dev.ftransx, 41.95 + self._ftransfer_shiftx, dev.ftransz, 124.75 + self._ftransfer_shiftz, ) if position == 12: umv( dev.ftransx, 31.95 + self._ftransfer_shiftx, dev.ftransz, 124.7625 + self._ftransfer_shiftz, ) if position == 13: umv( dev.ftransx, 21.95 + self._ftransfer_shiftx, dev.ftransz, 124.7750 + self._ftransfer_shiftz, ) if position == 14: umv( dev.ftransx, 11.95 + self._ftransfer_shiftx, dev.ftransz, 124.7875 + self._ftransfer_shiftz, ) if position == 15: umv( dev.ftransx, 1.95 + self._ftransfer_shiftx, dev.ftransz, 124.8000 + self._ftransfer_shiftz, ) if position == 16: umv( dev.ftransx, 41.95 + self._ftransfer_shiftx, dev.ftransz, 139.7500 + self._ftransfer_shiftz, ) if position == 17: umv( dev.ftransx, 31.95 + self._ftransfer_shiftx, dev.ftransz, 139.7625 + self._ftransfer_shiftz, ) if position == 18: umv( dev.ftransx, 21.95 + self._ftransfer_shiftx, dev.ftransz, 139.7750 + self._ftransfer_shiftz, ) if position == 19: umv( dev.ftransx, 11.95 + self._ftransfer_shiftx, dev.ftransz, 139.7875 + self._ftransfer_shiftz, ) if position == 20: umv( dev.ftransx, 1.95 + self._ftransfer_shiftx, dev.ftransz, 139.8000 + self._ftransfer_shiftz, )
[docs] class FlomniAlignmentMixin: import csaxs_bec # Ensure this is a Path object, not a string csaxs_bec_basepath = Path(csaxs_bec.__file__) default_correction_file_rel = "correction_flomni_20210300_360deg.txt" # Build the absolute path correctly default_correction_file = ( csaxs_bec_basepath.parent / "bec_ipython_client" / "plugins" / "flomni" / default_correction_file_rel ).resolve()
[docs] def reset_correction(self, use_default_correction=True): """ Reset the correction to the default values. If use_default_correction is False, the correction will be set to empty values. Otherwise the default values will be loaded. Args: use_default_correction (bool, optional): If set to true, a call reset the correction to the default values. Defaults to True. """ self.corr_pos_y = [] self.corr_angle_y = [] self.corr_pos_y_2 = [] self.corr_angle_y_2 = [] if use_default_correction: try: self.read_additional_correction_y(self.default_correction_file) logger.info(f"Applying default correction from {self.default_correction_file}") except FileNotFoundError: logger.warning( f"Could not find default correction file {self.default_correction_file}." ) logger.warning("Not applying any correction.")
def reset_tomo_alignment_fit(self): self.client.delete_global_var("tomo_alignment_fit") def read_alignment_offset( self, dir_path=os.path.expanduser("~/data/raw/logs/"), setup="flomni", use_vertical_default_values=True, get_data_from_gui=False, ): tomo_alignment_fit = np.zeros((2, 5)) if not get_data_from_gui: """ Read the alignment parameters from xray eye fit. """ with open(os.path.join(dir_path, "ptychotomoalign_Ax.txt"), "r") as file: tomo_alignment_fit[0][0] = file.readline() with open(os.path.join(dir_path, "ptychotomoalign_Bx.txt"), "r") as file: tomo_alignment_fit[0][1] = file.readline() with open(os.path.join(dir_path, "ptychotomoalign_Cx.txt"), "r") as file: tomo_alignment_fit[0][2] = file.readline() with open(os.path.join(dir_path, "ptychotomoalign_Ay.txt"), "r") as file: tomo_alignment_fit[1][0] = file.readline() with open(os.path.join(dir_path, "ptychotomoalign_By.txt"), "r") as file: tomo_alignment_fit[1][1] = file.readline() with open(os.path.join(dir_path, "ptychotomoalign_Cy.txt"), "r") as file: tomo_alignment_fit[1][2] = file.readline() with open(os.path.join(dir_path, "ptychotomoalign_Ay3.txt"), "r") as file: tomo_alignment_fit[1][3] = file.readline() with open(os.path.join(dir_path, "ptychotomoalign_By3.txt"), "r") as file: tomo_alignment_fit[1][4] = file.readline() print("New alignment parameters loaded from filesystem, meaning Matlab fit:") else: params = dev.omny_xray_gui.fit_params_x.get() # amplitude tomo_alignment_fit[0][0] = params["SineModel_0_amplitude"] # phase tomo_alignment_fit[0][1] = params["SineModel_0_shift"] # offset tomo_alignment_fit[0][2] = params["LinearModel_1_intercept"] print("applying vertical default values from mirror calibration, not from fit!") tomo_alignment_fit[1][0] = 0 tomo_alignment_fit[1][1] = 0 tomo_alignment_fit[1][2] = 0 tomo_alignment_fit[1][3] = 0 tomo_alignment_fit[1][4] = 0 print("New alignment parameters loaded based on Xray eye alignment GUI:") print( f"X Amplitude {tomo_alignment_fit[0][0]}, " f"X Phase {tomo_alignment_fit[0][1]}, " f"X Offset {tomo_alignment_fit[0][2]}, " f"Y Amplitude {tomo_alignment_fit[1][0]}, " f"Y Phase {tomo_alignment_fit[1][1]}, " f"Y Offset {tomo_alignment_fit[1][2]}, " f"Y 3rd Order Amplitude {tomo_alignment_fit[1][3]}, " f"Y 3rd Order Phase {tomo_alignment_fit[1][4]} ." ) if use_vertical_default_values: print( f"Using default values for vertical alignment for setup {setup}. Optional: use_vertical_default_values=False" ) if setup == "flomni": tomo_alignment_fit[1][0] = 0 tomo_alignment_fit[1][1] = 0 tomo_alignment_fit[1][2] = 0 tomo_alignment_fit[1][3] = 0 tomo_alignment_fit[1][4] = 0 elif setup == "omny": tomo_alignment_fit[1][0] = 2.588628 tomo_alignment_fit[1][1] = -2.385422 tomo_alignment_fit[1][2] = 0 tomo_alignment_fit[1][3] = 1.010583 tomo_alignment_fit[1][4] = -1.359157 print("Follwing parameters will be used:") print( f"X Amplitude {tomo_alignment_fit[0][0]}, " f"X Phase {tomo_alignment_fit[0][1]}, " f"X Offset {tomo_alignment_fit[0][2]}, " f"Y Amplitude {tomo_alignment_fit[1][0]}, " f"Y Phase {tomo_alignment_fit[1][1]}, " f"Y Offset {tomo_alignment_fit[1][2]}, " f"Y 3rd Order Amplitude {tomo_alignment_fit[1][3]}, " f"Y 3rd Order Phase {tomo_alignment_fit[1][4]} ." ) self.client.set_global_var("tomo_alignment_fit", tomo_alignment_fit.tolist()) # x amp, phase, offset, y amp, phase, offset, 3rd order amp, 3rd order phase # 0 0 0 1 0 2 1 0 1 1 1 2 1 3 1 4
[docs] def get_alignment_offset(self, angle: float): """ Compute the alignment offset for the given angle. Args: angle (float): The angle to compute the alignment offset for. Returns: tuple: The alignment offset in x, y and z direction. """ tomo_alignment_fit = self.client.get_global_var("tomo_alignment_fit") if tomo_alignment_fit is None: print("Not applying any alignment offsets. No tomo alignment fit data available.\n") return (0, 0, 0) # x amp, phase, offset, y amp, phase, offset # 0 0 0 1 0 2 1 0 1 1 1 2 correction_x = ( tomo_alignment_fit[0][0] * np.sin(np.radians(angle) + tomo_alignment_fit[0][1]) + tomo_alignment_fit[0][2] ) correction_y = ( tomo_alignment_fit[1][0] * np.sin(np.radians(angle) + tomo_alignment_fit[1][1]) + tomo_alignment_fit[1][2] + tomo_alignment_fit[1][3] * np.sin(3 * np.radians(angle) + tomo_alignment_fit[1][4]) ) correction_z = tomo_alignment_fit[0][0] * np.sin( np.radians(angle + 90) + tomo_alignment_fit[0][1] ) print( f"Alignment offset x {correction_x}, y {correction_y}, z {correction_z} for angle" f" {angle}\n" ) return (correction_x, correction_y, correction_z)
def _read_correction_file(self, correction_file: str): with open(correction_file, "r") as f: num_elements = f.readline() int_num_elements = int(num_elements.split(" ")[2]) corr_pos = [] corr_angle = [] for j in range(int_num_elements * 2): line = f.readline() value = line.split(" ")[2] name = line.split(" ")[0].split("[")[0] if name == "corr_pos": corr_pos.append(float(value)) elif name == "corr_angle": corr_angle.append(float(value)) print( f"Loading default mirror correction from file {correction_file} containing {int_num_elements} elements." ) return corr_pos, corr_angle def read_additional_correction_y(self, correction_file: str): self.corr_pos_y, self.corr_angle_y = self._read_correction_file(correction_file) def read_additional_correction_y_2(self, correction_file: str): self.corr_pos_y_2, self.corr_angle_y_2 = self._read_correction_file(correction_file) def compute_additional_correction_y(self, angle): return self._compute_additional_correction(angle, iteration=1) def compute_additional_correction_y_2(self, angle): return self._compute_additional_correction(angle, iteration=2) def _compute_additional_correction(self, angle, iteration=1): if iteration == 1: corr_pos = self.corr_pos_y corr_angle = self.corr_angle_y elif iteration == 2: corr_pos = self.corr_pos_y_2 corr_angle = self.corr_angle_y_2 if not corr_pos: print("Not applying any additional correction. No data available.\n") return 0 # find index of closest angle for j, _ in enumerate(corr_pos): newangledelta = np.fabs(corr_angle[j] - angle) if j == 0: angledelta = newangledelta additional_correction_shift = corr_pos[j] continue if newangledelta < angledelta: additional_correction_shift = corr_pos[j] angledelta = newangledelta if additional_correction_shift == 0 and angle > corr_angle[-1]: additional_correction_shift = corr_pos[-1] print(f"Additional correction shift {iteration} in y: {additional_correction_shift}") return additional_correction_shift
class _ProgressProxy: """Dict-like proxy that persists the flOMNI progress dict as a BEC global variable. Every read (`proxy["key"]`) fetches the current dict from the global var store, and every write (`proxy["key"] = val`) fetches, updates, and saves it back. This makes the progress state visible to all BEC client sessions via ``client.get_global_var("tomo_progress")``. """ _GLOBAL_VAR_KEY = "tomo_progress" _DEFAULTS: dict = { "subtomo": 0, "subtomo_projection": 0, "subtomo_total_projections": 1, "projection": 0, "total_projections": 1, "angle": 0, "tomo_type": 0, "tomo_start_time": None, "estimated_remaining_time": None, "estimated_finish_time": None, "heartbeat": None, } def __init__(self, client): self._client = client # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _load(self) -> dict: val = self._client.get_global_var(self._GLOBAL_VAR_KEY) if val is None: return dict(self._DEFAULTS) return val def _save(self, data: dict) -> None: self._client.set_global_var(self._GLOBAL_VAR_KEY, data) # ------------------------------------------------------------------ # Dict-like interface # ------------------------------------------------------------------ def __getitem__(self, key): return self._load()[key] def __setitem__(self, key, value) -> None: data = self._load() data[key] = value self._save(data) def __repr__(self) -> str: return f"{self.__class__.__name__}({self._load()!r})" def get(self, key, default=None): return self._load().get(key, default) def update(self, *args, **kwargs) -> None: """Update multiple fields in a single round-trip.""" data = self._load() data.update(*args, **kwargs) self._save(data) def reset(self) -> None: """Reset all progress fields to their default values.""" self._save(dict(self._DEFAULTS)) def as_dict(self) -> dict: """Return a plain copy of the current progress state.""" return self._load()
[docs] class Flomni( FlomniInitStagesMixin, FlomniSampleTransferMixin, FlomniAlignmentMixin, FlomniOpticsMixin, cSAXSBeamlineChecks, flomniGuiTools, ): def __init__(self, client): super().__init__() self.client = client self.device_manager = client.device_manager self.tomo_id = -1 self.special_angles = [] self.special_angle_repeats = 20 self.special_angle_tolerance = 20 self._current_special_angles = [] self._beam_is_okay = True self.corr_pos_y = [] self.corr_angle_y = [] self.corr_pos_y_2 = [] self.corr_angle_y_2 = [] self._progress_proxy = _ProgressProxy(self.client) self._progress_proxy.reset() from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import ( FlomniWebpageGenerator, ) self._webpage_gen = FlomniWebpageGenerator( bec_client=client, output_dir="~/data/raw/webpage/", # upload_url="http://s1090968537.online.de/upload.php", # optional upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php", local_port=8080, ) self._webpage_gen.start() self.OMNYTools = OMNYTools(self.client) self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername) self.tomo_id_manager = TomoIDManager() self.align = XrayEyeAlign(self.client, self) self.set_client(client)
[docs] def set_web_password(self, password: str) -> None: """Set the web password for the current BEC account.""" self._webpage_gen.set_web_password(password)
def start_x_ray_eye_alignment(self, keep_shutter_open=False): if self.OMNYTools.yesno( "Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y", ): self.align = XrayEyeAlign(self.client, self) try: self.align.align(keep_shutter_open) except KeyboardInterrupt as exc: fsamx_in = self._get_user_param_safe(dev.fsamx, "in") if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5): print("Stopping alignment. Returning to fsamx in position.") self.feedback_disable() umv(dev.fsamx, fsamx_in) raise exc def xrayeye_update_frame(self, keep_shutter_open=False): self.align.update_frame(keep_shutter_open) def xrayeye_alignment_start(self, keep_shutter_open=False): self.start_x_ray_eye_alignment(keep_shutter_open) def drive_axis_to_limit(self, device, direction): axis_id = device._config["deviceConfig"].get("axis_Id") axis_id_numeric = self.axis_id_to_numeric(axis_id) device.controller.drive_axis_to_limit(axis_id_numeric, direction) def axis_id_to_numeric(self, axis_id) -> int: return ord(axis_id.lower()) - 97
[docs] def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5): """Set the special angles for a tomo Args: angles (list): List of special angles repeats (int, optional): Number of repeats at a special angle. Defaults to 20. tolerance (float, optional): Number of repeats at a special angle. Defaults to 0.5. """ self.special_angles = angles self.special_angle_repeats = repeats self.special_angle_tolerance = tolerance
[docs] def remove_special_angles(self): """Remove the special angles and set the number of repeats to 1""" self.special_angles = [] self.special_angle_repeats = 1
@property def progress(self) -> _ProgressProxy: """Proxy dict backed by the BEC global variable ``tomo_progress``. Readable from any BEC client session via:: client.get_global_var("tomo_progress") Individual fields can be read and written just like a regular dict:: flomni.progress["projection"] # read flomni.progress["projection"] = 42 # write (persists immediately) To update multiple fields atomically use :py:meth:`_ProgressProxy.update`:: flomni.progress.update(projection=42, angle=90.0) To reset all fields to their defaults:: flomni.progress.reset() """ return self._progress_proxy @progress.setter def progress(self, val: dict) -> None: """Replace the entire progress dict. Accepts a plain :class:`dict` and persists it to the global var store. Example:: flomni.progress = {"projection": 0, "total_projections": 100, ...} """ if not isinstance(val, dict): raise TypeError(f"progress must be a dict, got {type(val).__name__!r}") self._progress_proxy._save(val) @property def tomo_shellstep(self): val = self.client.get_global_var("tomo_shellstep") if val is None: return 1 return val @tomo_shellstep.setter def tomo_shellstep(self, val: float): self.client.set_global_var("tomo_shellstep", val) @property def tomo_countingtime(self): val = self.client.get_global_var("tomo_countingtime") if val is None: return 0.1 return val @tomo_countingtime.setter def tomo_countingtime(self, val: float): self.client.set_global_var("tomo_countingtime", val) @property def manual_shift_y(self): val = self.client.get_global_var("manual_shift_y") if val is None: return 0.0 return val @manual_shift_y.setter def manual_shift_y(self, val: float): self.client.set_global_var("manual_shift_y", val) @property def fovx(self): val = self.client.get_global_var("fovx") if val is None: return 20 return val @fovx.setter def fovx(self, val: float): if val > 200: raise ValueError("FOV cannot be larger than 200 um.") self.client.set_global_var("fovx", val) @property def fovy(self): val = self.client.get_global_var("fovy") if val is None: return 20 return val @fovy.setter def fovy(self, val: float): if val > 100: raise ValueError("FOV cannot be larger than 100 um.") self.client.set_global_var("fovy", val) @property def tomo_type(self): val = self.client.get_global_var("tomo_type") if val is None: return 1 return val @tomo_type.setter def tomo_type(self, val: float): if val == 1: # equally spaced tomography with 8 sub tomograms self.client.set_global_var("tomo_type", val) elif val == 2: # golden ratio tomography (sorted bunches) self.client.set_global_var("tomo_type", val) elif val == 3: # equally spaced tomography with starting angles shifted by golden ratio self.client.set_global_var("tomo_type", val) else: raise ValueError("Unknown tomo_type.") @property def corridor_size(self): val = self.client.get_global_var("corridor_size") if val is None: val = -1 return val @corridor_size.setter def corridor_size(self, val: float): self.client.set_global_var("corridor_size", val) @property def stitch_x(self): val = self.client.get_global_var("stitch_x") if val is None: return 0 return val @stitch_x.setter @typechecked def stitch_x(self, val: int): self.client.set_global_var("stitch_x", val) @property def stitch_y(self): val = self.client.get_global_var("stitch_y") if val is None: return 0 return val @stitch_y.setter @typechecked def stitch_y(self, val: int): self.client.set_global_var("stitch_y", val) @property def ptycho_reconstruct_foldername(self): val = self.client.get_global_var("ptycho_reconstruct_foldername") if val is None: return "ptycho_reconstruct" return val @ptycho_reconstruct_foldername.setter def ptycho_reconstruct_foldername(self, val: str): self.client.set_global_var("ptycho_reconstruct_foldername", val) self.reconstructor.folder_name = val # keep reconstructor in sync @property def tomo_angle_stepsize(self): val = self.client.get_global_var("tomo_angle_stepsize") if val is None: return 10.0 return val @tomo_angle_stepsize.setter def tomo_angle_stepsize(self, val: float): self.client.set_global_var("tomo_angle_stepsize", val) @property def golden_max_number_of_projections(self): val = self.client.get_global_var("golden_max_number_of_projections") if val is None: return 1000.0 return val @golden_max_number_of_projections.setter def golden_max_number_of_projections(self, val: float): self.client.set_global_var("golden_max_number_of_projections", val) @property def tomo_stitch_overlap(self): val = self.client.get_global_var("tomo_stitch_overlap") if val is None: return 0.2 return val @tomo_stitch_overlap.setter def tomo_stitch_overlap(self, val: float): self.client.set_global_var("tomo_stitch_overlap", val) @property def tomo_angle_range(self): """Total angular sweep in degrees for tomo_type 1 (equally spaced sub-tomograms), inclusive of the upper bound. Either 180 (default, original behaviour) or 360.""" val = self.client.get_global_var("tomo_angle_range") if val is None: return 180 return val @tomo_angle_range.setter def tomo_angle_range(self, val: float): if val not in (180, 360): raise ValueError("tomo_angle_range must be 180 or 360 degrees.") self.client.set_global_var("tomo_angle_range", val) @property def golden_projections_at_0_deg_for_damage_estimation(self): val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation") if val is None: return 0 return val @golden_projections_at_0_deg_for_damage_estimation.setter def golden_projections_at_0_deg_for_damage_estimation(self, val: float): self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val) @property def golden_ratio_bunch_size(self): val = self.client.get_global_var("golden_ratio_bunch_size") if val is None: return 20 return val @golden_ratio_bunch_size.setter def golden_ratio_bunch_size(self, val: float): self.client.set_global_var("golden_ratio_bunch_size", val) @property def frames_per_trigger(self): """Number of burst frames acquired per point/projection. Used both by scans.flomni_fermat_scan (via tomo_scan_projection) and by tomo_acquire_at_angle (scans.acquire).""" val = self.client.get_global_var("frames_per_trigger") if val is None: return 1 return val @frames_per_trigger.setter def frames_per_trigger(self, val: int): if isinstance(val, bool) or not isinstance(val, int): raise ValueError("frames_per_trigger must be a positive integer.") if val <= 0: raise ValueError("frames_per_trigger must be a positive integer.") self.client.set_global_var("frames_per_trigger", val) @property def single_point_instead_of_fermat_scan(self): """If True, tomo_scan acquires a single point (or burst) at each angle via scans.acquire instead of running scans.flomni_fermat_scan. Applies to all tomo_types, since it only changes how a given angle is acquired, not which angles are visited.""" val = self.client.get_global_var("single_point_instead_of_fermat_scan") if val is None: return False return val @single_point_instead_of_fermat_scan.setter def single_point_instead_of_fermat_scan(self, val: bool): self.client.set_global_var("single_point_instead_of_fermat_scan", val) @property def sample_name(self): return self.sample_get_name(0)
[docs] def tomo_alignment_scan(self): """ Performs a tomogram alignment scan. Collects all scan numbers acquired during the alignment, prints them at the end, and creates a BEC scilog text entry summarising the alignment scan numbers. """ if self.get_alignment_offset(0) == (0, 0, 0): print("It appears that the xrayeye alignemtn was not performend or loaded. Aborting.") return dev = builtins.__dict__.get("dev") bec = builtins.__dict__.get("bec") self.feye_out() tags = ["BEC_alignment_tomo", self.sample_name] self.write_alignment_scan_numbers(bec.queue.next_scan_number) start_angle = 0 alignment_scan_numbers = [] angle_end = start_angle + 180 for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True): successful = False error_caught = False if 0 <= angle < 180.05: print(f"Starting flOMNI scan for angle {angle}") while not successful: try: start_scan_number = bec.queue.next_scan_number self.tomo_scan_projection(angle) error_caught = False except AlarmBase as exc: if exc.alarm_type == "TimeoutError": bec.queue.request_queue_reset() time.sleep(2) error_caught = True else: raise exc # todo here was if blchk success, then setting to success true successful = True end_scan_number = bec.queue.next_scan_number for scan_nr in range(start_scan_number, end_scan_number): # self._write_tomo_scan_number(scan_nr, angle, 0) alignment_scan_numbers.append(scan_nr) umv(dev.fsamroy, 0) self.OMNYTools.printgreenbold( "\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit by flomni.read_alignment_offset() ." ) # summary of alignment scan numbers scan_list_str = ", ".join(str(s) for s in alignment_scan_numbers) # print(f"\nAlignment scan numbers ({len(alignment_scan_numbers)} total): {scan_list_str}") # BEC scilog entry (no logo) scilog_content = ( f"Alignment scan finished.\n" f"Sample: {self.sample_name}\n" f"Number of alignment scans: {len(alignment_scan_numbers)}\n" f"Alignment scan numbers: {scan_list_str}\n" ) print(scilog_content) bec.messaging.scilog.new().add_text(scilog_content.replace("\n", "<br>")).add_tags( "alignmentscan" ).send()
def write_alignment_scan_numbers(self, first_scan): import os file = os.path.expanduser("~/data/raw/logs/ptychotomoalign_scannum.txt") os.makedirs(os.path.dirname(file), exist_ok=True) scans = [first_scan + k for k in range(5)] angles = [0, 45, 90, 135, 180] x_vals = [] for angle in angles: x, y, z = self.get_alignment_offset(angle) x_vals.append(x) zeros = [0] * len(angles) with open(file, "w") as f: f.write(" ".join(map(str, scans)) + "\n") f.write(" ".join(map(str, angles)) + "\n") f.write(" ".join(f"{x:.2f}" for x in x_vals) + "\n") f.write(" ".join(map(str, x_vals)) + "\n") f.write(" ".join(map(str, zeros)) + "\n")
[docs] def sub_tomo_scan(self, subtomo_number, start_angle=None): """ Performs a sub tomogram scan. Args: subtomo_number (int): The sub tomogram number. start_angle (float, optional): The start angle of the scan. Defaults to None. """ if start_angle is not None: print(f"Sub tomo scan with start angle {start_angle} requested.") if start_angle is None: if subtomo_number == 1: start_angle = 0 elif subtomo_number == 2: start_angle = self.tomo_angle_stepsize / 8.0 * 4 elif subtomo_number == 3: start_angle = self.tomo_angle_stepsize / 8.0 * 2 elif subtomo_number == 4: start_angle = self.tomo_angle_stepsize / 8.0 * 6 elif subtomo_number == 5: start_angle = self.tomo_angle_stepsize / 8.0 * 1 elif subtomo_number == 6: start_angle = self.tomo_angle_stepsize / 8.0 * 5 elif subtomo_number == 7: start_angle = self.tomo_angle_stepsize / 8.0 * 3 elif subtomo_number == 8: start_angle = self.tomo_angle_stepsize / 8.0 * 7 # _tomo_shift_angles (potential global variable) _tomo_shift_angles = 0 # compute number of projections start = start_angle + _tomo_shift_angles if subtomo_number % 2: # odd = forward max_allowed_angle = self.tomo_angle_range + 0.05 + self.tomo_angle_stepsize proposed_end = start + self.tomo_angle_range angle_end = min(proposed_end, max_allowed_angle) span = angle_end - start else: # even = reverse min_allowed_angle = 0 proposed_end = start - self.tomo_angle_range angle_end = max(proposed_end, min_allowed_angle) span = start - angle_end # number of projections needed to maintain step size N = int(span / self.tomo_angle_stepsize) + 1 angles = np.linspace(start, angle_end, num=N, endpoint=True) if subtomo_number % 2: # odd subtomos → forward direction # clamp end angle to max allowed max_allowed_angle = self.tomo_angle_range + 0.05 + self.tomo_angle_stepsize proposed_end = start + self.tomo_angle_range angle_end = min(proposed_end, max_allowed_angle) angles = np.linspace(start, angle_end, num=N, endpoint=True) else: # even subtomos → reverse direction # go FROM start_angle down toward 0 min_allowed_angle = 0 proposed_end = start - self.tomo_angle_range angle_end = max(proposed_end, min_allowed_angle) angles = np.linspace(start, angle_end, num=N, endpoint=True) for i, angle in enumerate(angles): self.progress["subtomo"] = subtomo_number # --- NEW LOGIC FOR OFFSET WHEN start_angle IS SPECIFIED --- if i == 0: step = self.tomo_angle_stepsize sa = start_angle if start_angle is None: # normal operation: always start at zero self._subtomo_offset = 0 else: if subtomo_number % 2: # odd = forward direction self._subtomo_offset = round(sa / step) else: # even = reverse direction self._subtomo_offset = round((self.tomo_angle_range - sa) / step) # progress index must always increase self.progress["subtomo_projection"] = self._subtomo_offset + i # ------------------------------------------------------------ # existing progress fields self.progress["subtomo_total_projections"] = int( self.tomo_angle_range / self.tomo_angle_stepsize ) self.progress["projection"] = (subtomo_number - 1) * self.progress[ "subtomo_total_projections" ] + self.progress["subtomo_projection"] self.progress["total_projections"] = ( self.tomo_angle_range / self.tomo_angle_stepsize ) * 8 self.progress["angle"] = angle # finally do the scan at this angle self._tomo_scan_at_angle(angle, subtomo_number)
@scan_repeat(max_repeats=10, default=True) def _tomo_scan_at_angle(self, angle, subtomo_number): if 0 <= angle < self.tomo_angle_range + 0.05: self.progress["heartbeat"] = datetime.datetime.now().isoformat() print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}") self._print_progress() if not self.special_angles: self._current_special_angles = [] if self._current_special_angles: next_special_angle = self._current_special_angles[0] if np.isclose(angle, next_special_angle, atol=0.5): self._current_special_angles.pop(0) num_repeats = self.special_angle_repeats else: num_repeats = 1 start_scan_number = bec.queue.next_scan_number for i in range(num_repeats): self._at_each_angle(angle) end_scan_number = bec.queue.next_scan_number for scan_nr in range(start_scan_number, end_scan_number): self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
[docs] def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None): """start a tomo scan""" if not self._check_eye_out_and_optics_in(): print( "Attention: The setup is not in measurement condition.\nXray eye might be IN or the Xray optics OUT." ) if self.OMNYTools.yesno("Shall I continue?", "n"): print("OK") else: print("Stopping.") return self.flomnigui_show_progress() bec = builtins.__dict__.get("bec") scans = builtins.__dict__.get("scans") bec.builtin_actors.scan_interlock.trigger_setting = "restart_scan" bec.builtin_actors.scan_interlock.enabled = True self._current_special_angles = self.special_angles.copy() # a new tomo scan was started if ( (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None) or (self.tomo_type == 2 and projection_number == None) or (self.tomo_type == 3 and projection_number == None) ): # pylint: disable=undefined-variable if bec.active_account != "": self.tomo_id = self.add_sample_database( self.sample_name, str(datetime.date.today()), bec.active_account, bec.queue.next_scan_number, "flomni", "test additional info", "BEC", ) self.write_pdf_report() else: self.tomo_id = 0 self.write_pdf_report() self.progress["tomo_start_time"] = datetime.datetime.now().isoformat() # reset stale estimates from any previous scan, otherwise the GUI # would keep showing a leftover ETA from before this scan has # accumulated enough projections to compute a fresh one self.progress["estimated_remaining_time"] = None self.progress["estimated_finish_time"] = None with scans.dataset_id_on_hold: if self.tomo_type == 1: # 8 equally spaced sub-tomograms self.progress["tomo_type"] = "Equally spaced sub-tomograms" for ii in range(subtomo_start, 9): self.sub_tomo_scan(ii, start_angle=start_angle) start_angle = None elif self.tomo_type == 2: # Golden ratio tomography previous_subtomo_number = -1 if projection_number == None: ii = 0 else: ii = projection_number while True: angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1) if previous_subtomo_number != subtomo_number: if ( subtomo_number % 2 == 1 and ii > 10 and self.golden_projections_at_0_deg_for_damage_estimation == 1 ): self._tomo_scan_at_angle(0, subtomo_number) previous_subtomo_number = subtomo_number self.progress["tomo_type"] = "Golden ratio tomography" self.progress["subtomo"] = subtomo_number self.progress["projection"] = ii self.progress["angle"] = angle if self.golden_ratio_bunch_size > 0: self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size self.progress["subtomo_projection"] = ( ii - (subtomo_number - 1) * self.golden_ratio_bunch_size ) else: self.progress["subtomo_total_projections"] = 0 self.progress["subtomo_projection"] = 0 if self.golden_max_number_of_projections > 0: self.progress["total_projections"] = self.golden_max_number_of_projections else: self.progress["total_projections"] = 0 self._tomo_scan_at_angle(angle, subtomo_number) ii += 1 if ( ii > self.golden_max_number_of_projections and self.golden_max_number_of_projections > 0 ): print( f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections" ) break elif self.tomo_type == 3: # Equally spaced tomography, golden ratio starting angle previous_subtomo_number = -1 if projection_number == None: ii = 0 else: ii = projection_number while True: angle, subtomo_number = self._golden_equally_spaced( ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0 ) if previous_subtomo_number != subtomo_number: if ( subtomo_number % 2 == 1 and ii > 10 and self.golden_projections_at_0_deg_for_damage_estimation == 1 ): self._tomo_scan_at_angle(0, subtomo_number) previous_subtomo_number = subtomo_number self.progress["tomo_type"] = ( "Equally spaced tomography, golden ratio starting angle" ) self.progress["subtomo"] = subtomo_number self.progress["projection"] = ii self.progress["angle"] = angle self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize self.progress["subtomo_projection"] = ( ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"] ) if self.golden_max_number_of_projections > 0: self.progress["total_projections"] = self.golden_max_number_of_projections else: self.progress["total_projections"] = 0 self._tomo_scan_at_angle(angle, subtomo_number) ii += 1 if ( ii > self.golden_max_number_of_projections and self.golden_max_number_of_projections > 0 ): print( f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections" ) break else: raise FlomniError("undefined tomo type") self.progress["projection"] = self.progress["total_projections"] self.progress["subtomo_projection"] = self.progress["subtomo_total_projections"] self._print_progress() self.OMNYTools.printgreenbold("Tomoscan finished")
@staticmethod def _format_duration(seconds: float) -> str: """Format a duration in seconds as a human-readable string, e.g. '2h 03m 15s'.""" seconds = int(seconds) h, remainder = divmod(seconds, 3600) m, s = divmod(remainder, 60) if h > 0: return f"{h}h {m:02d}m {s:02d}s" if m > 0: return f"{m}m {s:02d}s" return f"{s}s" def _print_progress(self): # --- compute and store estimated remaining time ----------------------- start_str = self.progress.get("tomo_start_time") projection = self.progress["projection"] total = self.progress["total_projections"] if start_str is not None and total > 0 and projection > 9: now = datetime.datetime.now() elapsed = (now - datetime.datetime.fromisoformat(start_str)).total_seconds() rate = projection / elapsed # projections per second remaining_s = (total - projection) / rate self.progress["estimated_remaining_time"] = remaining_s eta_str = self._format_duration(remaining_s) finish_dt = now + datetime.timedelta(seconds=remaining_s) self.progress["estimated_finish_time"] = finish_dt.isoformat() finish_str = finish_dt.strftime("%Y-%m-%d %H:%M:%S") else: eta_str = "N/A" finish_str = "N/A" # ---------------------------------------------------------------------- print("\x1b[95mProgress report:") print(f"Tomo type: ....................... {self.progress['tomo_type']}") print(f"Projection: ...................... {self.progress['projection']:.0f}") print(f"Total projections expected ....... {self.progress['total_projections']}") print(f"Angle: ........................... {self.progress['angle']}") print(f"Current subtomo: ................. {self.progress['subtomo']}") print(f"Current projection within subtomo: {self.progress['subtomo_projection']}") print(f"Estimated remaining time: ........ {eta_str}") print(f"Estimated finish time: ........... {finish_str}\x1b[0m") self._flomnigui_update_progress()
[docs] def add_sample_database( self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user ): """Add a sample to the omny sample database. This also retrieves the tomo id.""" return self.tomo_id_manager.register( sample_name=samplename, date=date, eaccount=eaccount, scan_number=scan_number, setup=setup, additional_info=sample_additional_info, user=user, )
def _at_each_angle(self, angle: float) -> None: if "flomni_at_each_angle" in builtins.__dict__: # pylint: disable=undefined-variable flomni_at_each_angle(self, angle) return if self.single_point_instead_of_fermat_scan: self.tomo_acquire_at_angle(angle) return self.tomo_scan_projection(angle) def _golden(self, ii, howmany_sorted, maxangle, reverse=False): """returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number""" golden = [] # occupy array with the range of golden angles for iji in range( (ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1 ): golden.append( ((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000 ) golden.sort() subtomo_number = int(ii / howmany_sorted) + 1 if reverse and not subtomo_number % 2: golden.reverse() return (golden[ii % howmany_sorted], subtomo_number) def _golden_equally_spaced( self, ii, number_of_projections_per_subtomo, maxangle, reverse, verbose ): """returns angles for equally spaced tomography with starting angles of sub tomograms shifted according to golden ratio""" """ii is projection number starting at 1, reverse will execute the even sub tomograms in reverse direction""" # ii is projection number starting at 1 angular_step = maxangle / number_of_projections_per_subtomo subtomo_number = int(((ii - 1) * angular_step) / maxangle) + 1 start_angle = self._golden(subtomo_number - 1, 1, angular_step) projection_number_of_subtomo = ( ii - (subtomo_number - 1) * number_of_projections_per_subtomo ) - 1 if reverse: if subtomo_number % 2: angle = start_angle[0] + projection_number_of_subtomo * angular_step else: angle = ( start_angle[0] + (number_of_projections_per_subtomo - 1) * angular_step - projection_number_of_subtomo * angular_step ) else: angle = start_angle[0] + projection_number_of_subtomo * angular_step if verbose: print( f"Equally spaced golden ratio tomography.\nAngular step: {angular_step}\nSubtomo Number: {subtomo_number}\nAngle: {angle}" ) return angle, subtomo_number
[docs] def tomo_reconstruct(self, base_path="~/data/raw/logs/reconstruction_queue"): """write the tomo reconstruct file for the reconstruction queue""" bec = builtins.__dict__.get("bec") self.reconstructor.write( scan_list=self._current_scan_list, next_scan_number=bec.queue.next_scan_number, base_path=base_path, )
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: tomo_scan_numbers_file = os.path.expanduser("~/data/raw/logs/tomography_scannumbers.txt") with open(tomo_scan_numbers_file, "a+") as out_file: # pylint: disable=undefined-variable out_file.write( f"{scan_number} {angle} {dev.fsamroy.read()['fsamroy']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {self.sample_name}\n" ) def tomo_scan_projection(self, angle: float): dev.rtx.controller.laser_tracker_check_signalstrength() scans = builtins.__dict__.get("scans") # additional_correction = self.align.compute_additional_correction(angle) # additional_correction_2 = self.align.compute_additional_correction_2(angle) # correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle) offsets = self.get_alignment_offset(angle) sum_offset_x = offsets[0] sum_offset_y = ( offsets[1] - self.compute_additional_correction_y(angle) - self.compute_additional_correction_y_2(angle) + self.manual_shift_y ) sum_offset_z = offsets[2] self._current_scan_list = [] for stitch_x in range(-self.stitch_x, self.stitch_x + 1): for stitch_y in range(-self.stitch_y, self.stitch_y + 1): # pylint: disable=undefined-variable corridor_size = self.corridor_size if self.corridor_size > 0 else None self._current_scan_list.append(bec.queue.next_scan_number) cenx = sum_offset_x + stitch_x * (self.fovx - self.tomo_stitch_overlap) ceny = sum_offset_y + stitch_y * (self.fovy - self.tomo_stitch_overlap) logger.info( f"scans.flomni_fermat_scan(fovx={self.fovx}, fovy={self.fovy}," f" step={self.tomo_shellstep}, cenx={cenx}, ceny={ceny}," f" zshift={sum_offset_z}, angle={angle}," f" exp_time={self.tomo_countingtime}, corridor_size={corridor_size})" ) log_message = ( f"{str(datetime.datetime.now())}: flomni scan projection at angle {angle}, scan" f" number {bec.queue.next_scan_number}.\n" ) scan_kwargs = dict( fovx=self.fovx, fovy=self.fovy, step=self.tomo_shellstep, cenx=cenx, ceny=ceny, zshift=sum_offset_z, angle=angle, exp_time=self.tomo_countingtime, frames_per_trigger=self.frames_per_trigger, ) if self.corridor_size > 0: scan_kwargs["corridor_size"] = self.corridor_size scans.flomni_fermat_scan(**scan_kwargs) self.tomo_reconstruct()
[docs] def tomo_acquire_at_angle(self, angle: float, frames_per_trigger: int | None = None): """ Move fsamroy to `angle`, then move rtx/rty/rtz to the alignment-corrected scan center (same alignment-offset logic as tomo_scan_projection, but without stitching), and acquire a single frame or a burst via scans.acquire instead of running a fermat scan. This mirrors the positioning sequence used internally by flomni_fermat_scan (rotation, then rtx/rty/rtz with laser-tracker on/check/move-to-region), but executes it as plain blocking client-side calls, since this runs in the BEC client, not on the scan server. Args: angle (float): rotation angle [deg] to move fsamroy to. frames_per_trigger (int, optional): number of burst frames for this acquisition. Defaults to self.frames_per_trigger. """ scans = builtins.__dict__.get("scans") # --- rotation --- fsamroy_current_setpoint = dev.fsamroy.user_setpoint.get() if angle != fsamroy_current_setpoint: umv(dev.fsamroy, angle) else: print("No rotation required") # --- alignment offset (same as tomo_scan_projection, no stitching) --- offsets = self.get_alignment_offset(angle) sum_offset_x = offsets[0] sum_offset_y = ( offsets[1] - self.compute_additional_correction_y(angle) - self.compute_additional_correction_y_2(angle) + self.manual_shift_y ) sum_offset_z = offsets[2] # --- positioning + laser tracker, mirroring # flomni_fermat_scan._prepare_setup_part2 --- dev.rtx.controller.laser_tracker_on() umv(dev.rtx, sum_offset_x, dev.rty, sum_offset_y, dev.rtz, sum_offset_z) tracker_signal = dev.rtx.controller.laser_tracker_check_signalstrength() # checks that the fsamx coarse stage is at a position that leaves # sufficient piezo range on the fine (rtx) stage dev.rtx.controller.move_samx_to_scan_region(sum_offset_x) if tracker_signal == "low": logger.warning( "Signal strength of the laser tracker is low. Realignment recommended!" ) elif tracker_signal == "toolow": raise FlomniError( "Signal strength of the laser tracker is too low for scanning. Realignment required!" ) # --- acquire --- n_frames = ( frames_per_trigger if frames_per_trigger is not None else self.frames_per_trigger ) scans.acquire(exp_time=self.tomo_countingtime, frames_per_trigger=n_frames)
[docs] def tomo_parameters(self): """print and update the tomo parameters""" print("Current settings:") print(f"Counting time <ctime> = {self.tomo_countingtime} s") print(f"Stepsize microns <step> = {self.tomo_shellstep}") print(f"FOV (200/100) <microns> = {self.fovx}, {self.fovy}") print(f"Stitching number x,y = {self.stitch_x}, {self.stitch_y}") print(f"Stitching overlap = {self.tomo_stitch_overlap}") print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}") print(f" _manual_shift_y <um> = {self.manual_shift_y}") print(f"Frames per trigger (burst) = {self.frames_per_trigger}") print(f"Single point instead of fermat = {self.single_point_instead_of_fermat_scan}") print("") if self.tomo_type == 1: print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms") print(f"Angular range = {self.tomo_angle_range} degrees") print(f"Total number of projections: {(self.tomo_angle_range/self.tomo_angle_stepsize)*8}") print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") print( "Angular step of the final (combined) tomogram:" f" {self.tomo_angle_range/((self.tomo_angle_range/self.tomo_angle_stepsize)*8)} degrees" ) if self.tomo_type == 2: print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography") print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}") if self.golden_max_number_of_projections > 0: print(f"ending after {self.golden_max_number_of_projections} projections") else: print("ending by manual interruption") if self.golden_projections_at_0_deg_for_damage_estimation == 1: print( "repeating prjections at 0 degrees at the beginning of every second subtomogram" ) if self.tomo_type == 3: print( "\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle" ) print(f"Number of projections per sub-tomogram: {180/self.tomo_angle_stepsize}") print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") if self.golden_max_number_of_projections > 0: print(f"ending after {self.golden_max_number_of_projections} projections") else: print("ending by manual interruption") if self.golden_projections_at_0_deg_for_damage_estimation == 1: print( "repeating prjections at 0 degrees at the beginning of every second subtomogram" ) print(f"\nSample name: {self.sample_name}\n") if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"): print("... excellent!") else: self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float) self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float) self.fovx = self._get_val("<FOV X (max 200)> um", self.fovx, float) self.fovy = self._get_val("<FOV Y (max 100)> um", self.fovy, float) if self.single_point_instead_of_fermat_scan: print( "Stitching is disabled while single point instead of fermat scan is" " active; stitch X/Y forced to 0." ) self.stitch_x = 0 self.stitch_y = 0 else: self.stitch_x = self._get_val("<stitch X>", self.stitch_x, int) self.stitch_y = self._get_val("<stitch Y>", self.stitch_y, int) self.ptycho_reconstruct_foldername = self._get_val( "Reconstruction queue ", self.ptycho_reconstruct_foldername, str ) self.frames_per_trigger = self._get_val( "Frames per trigger (burst)", self.frames_per_trigger, int ) self.single_point_instead_of_fermat_scan = bool( self._get_val( "Single point instead of fermat scan (acquire at angle) 1/0?", int(self.single_point_instead_of_fermat_scan), int, ) ) if self.single_point_instead_of_fermat_scan and ( self.stitch_x != 0 or self.stitch_y != 0 ): print( "Stitching is not supported with single point instead of fermat scan;" " stitch X/Y forced to 0." ) self.stitch_x = 0 self.stitch_y = 0 print("Tomography type:") print(" 1: 8 equally spaced sub-tomograms") print(" 2: Golden ratio tomography") print(" 3: Equally spaced tomography, golden ratio starting angle") self.tomo_type = self._get_val("Tomography type", self.tomo_type, int) if self.tomo_type == 1: self.tomo_angle_range = self._get_val( "Angular range (180 or 360)", self.tomo_angle_range, int ) tomo_numberofprojections = self._get_val( "Total number of projections", (self.tomo_angle_range / self.tomo_angle_stepsize) * 8, int, ) self.tomo_angle_stepsize = (self.tomo_angle_range / tomo_numberofprojections) * 8 print( f"The angular step within a sub-tomogram will be {self.tomo_angle_stepsize} degrees" ) print( "The angular step of the final (combined) tomogram will be" f" {self.tomo_angle_range / tomo_numberofprojections} degrees" ) if self.tomo_type == 2: self.golden_ratio_bunch_size = self._get_val( "Number of projections sorted per bunch (default 20)", self.golden_ratio_bunch_size, int, ) self.golden_max_number_of_projections = self._get_val( "Stop after number of projections (zero for endless)", self.golden_max_number_of_projections, int, ) self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( "Repeat projections at 0 deg every second subtomo 1/0 ?", self.golden_projections_at_0_deg_for_damage_estimation, int, ) if self.tomo_type == 3: numprj = self._get_val( "Number of projections per sub-tomogram", int(180 / self.tomo_angle_stepsize), int, ) self.tomo_angle_stepsize = 180 / numprj self.golden_max_number_of_projections = self._get_val( "Stop after number of projections (zero for endless)", self.golden_max_number_of_projections, int, ) self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( "Repeat projections at 0 deg every second subtomo", self.golden_projections_at_0_deg_for_damage_estimation, int, )
@staticmethod def _get_val(msg: str, default_value, data_type): return data_type(input(f"{msg} ({default_value}): ") or default_value) def rt_off(self): dev.rtx.enabled = False dev.rty.enabled = False dev.rtz.enabled = False def rt_on(self): dev.rtx.enabled = True dev.rty.enabled = True dev.rtz.enabled = True if dev.rtx.enabled == True: print("rt is enabled") else: print("failed to enable rt")
[docs] def write_pdf_report(self): """create and write the pdf report with the current flomni settings""" dev = builtins.__dict__.get("dev") # header = "" header = ( " \n" * 3 + " .d888 888 .d88888b. 888b d888 888b 888 8888888 \n" + ' d88P" 888 d88P" "Y88b 8888b d8888 8888b 888 888 \n' + " 888 888 888 888 88888b.d88888 88888b 888 888 \n" + " 888888 888 888 888 888Y88888P888 888Y88b 888 888 \n" + " 888 888 888 888 888 Y888P 888 888 Y88b888 888 \n" + " 888 888 888 888 888 Y8P 888 888 Y88888 888 \n" + ' 888 888 Y88b. .d88P 888 " 888 888 Y8888 888 \n' + ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n' ) padding = 20 fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}" stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}" dataset_id = str(self.client.queue.next_dataset_number) account = bec.active_account content = [ f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n", f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n", f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n", f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n", f"{'e-account:':<{padding}}{str(account):>{padding}}\n", f"{'Number of projections:':<{padding}}{int((self.tomo_angle_range / self.tomo_angle_stepsize) * 8):>{padding}}\n", f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n", f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int((self.tomo_angle_range / self.tomo_angle_stepsize) * 8) + 10:>{padding}}\n", f"{'Current photon energy:':<{padding}}To be implemented\n", # f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n", f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n", f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n", f"{'FOV:':<{padding}}{fovxy:>{padding}}\n", f"{'Stitching:':<{padding}}{stitching:>{padding}}\n", f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n", f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n", ] content = "".join(content) user_target = os.path.expanduser( f"~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf" ) with PDFWriter(user_target) as file: file.write(header) file.write(content) # subprocess.run( # "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True # ) # status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True) # msg = bec.tomo_progress.tomo_progressMessage() # logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png") # msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag( # ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "flOMNI", self.sample_name] # ) # self.client.tomo_progress.send_tomo_progress_message("~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf").send() import csaxs_bec # Ensure this is a Path object, not a string csaxs_bec_basepath = Path(csaxs_bec.__file__) logo_file_rel = "flOMNI.png" # Build the absolute path correctly logo_file = ( csaxs_bec_basepath.parent / "bec_ipython_client" / "plugins" / "flomni" / logo_file_rel ).resolve() print(logo_file) bec.messaging.scilog.new().add_attachment(logo_file, width=200).add_text( content.replace("\n", "<br>") ).add_tags("tomoscan").send()
if __name__ == "__main__": import builtins from bec_ipython_client import BECIPythonClient bec = BECIPythonClient() bec.start() scans = bec.scans dev = bec.device_manager.devices builtins.__dict__["scans"] = scans builtins.__dict__["dev"] = dev builtins.__dict__["bec"] = bec builtins.__dict__["umv"] = umv flomni = Flomni(bec) flomni.start_x_ray_eye_alignment()