Source code for csaxs_bec.bec_ipython_client.plugins.LamNI.lamni

import builtins
import datetime
import os
import subprocess
import time
from pathlib import Path

import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked

from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
    OMNYTools,
    PtychoReconstructor,
    TomoIDManager,
)
from csaxs_bec.bec_ipython_client.plugins.LamNI.gui_tools import LamniGuiTools

from .alignment import XrayEyeAlign
from .lamni_optics_mixin import LaMNIInitStages, LamNIOpticsMixin

logger = bec_logger.logger

if builtins.__dict__.get("bec") is not None:
    bec = builtins.__dict__.get("bec")
    dev = builtins.__dict__.get("dev")
    umv = builtins.__dict__.get("umv")
    mv = builtins.__dict__.get("mv")
    umvr = builtins.__dict__.get("umvr")


[docs] class LamNI(LamNIOpticsMixin, LamniGuiTools): def __init__(self, client): super().__init__() self.client = client self.device_manager = client.device_manager self.align = XrayEyeAlign(client, self) self.init = LaMNIInitStages(client) # Extracted collaborators self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername) self.tomo_id_manager = TomoIDManager() self.OMNYTools = OMNYTools(self.client) self.tomo_id = -1 self.special_angles = [] self.special_angle_repeats = 20 self.special_angle_tolerance = 20 self._current_special_angles = [] # Progress tracking self.progress = {} self.progress["tomo_type"] = "Equally spaced sub-tomograms" self.progress["subtomo"] = 0 self.progress["subtomo_projection"] = 0 self.progress["subtomo_total_projections"] = 1 self.progress["projection"] = 0 self.progress["total_projections"] = 1 self.progress["angle"] = 0 # ------------------------------------------------------------------ # Special angles # ------------------------------------------------------------------
[docs] def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5): """Set the special angles for a tomo. Args: angles (list): List of special angles. repeats (int, optional): Number of repeats at a special angle. Defaults to 20. tolerance (float, optional): Angle tolerance in degrees. Defaults to 0.5. """ self.special_angles = angles self.special_angle_repeats = repeats self.special_angle_tolerance = tolerance
[docs] def remove_special_angles(self): """Remove the special angles and reset repeats to 1.""" self.special_angles = [] self.special_angle_repeats = 1
# ------------------------------------------------------------------ # RT feedback / interferometer helpers # ------------------------------------------------------------------ def rt_off(self): dev.rtx.enabled = False dev.rty.enabled = False def rt_on(self): dev.rtx.enabled = True dev.rty.enabled = True if dev.rtx.enabled == True: print("rt is enabled") else: print("failed to enable rt") def feedback_enable_with_reset(self): self.device_manager.devices.rtx.controller.feedback_enable_with_reset() self.feedback_status() def feedback_enable_without_reset(self): self.device_manager.devices.rtx.controller.feedback_enable_without_reset() self.feedback_status() def feedback_disable(self): self.device_manager.devices.rtx.controller.feedback_disable() self.feedback_status() def feedback_disable_and_reset_angle(self): self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer() self.feedback_status() def feedback_status(self): self.device_manager.devices.rtx.controller.show_feedback_status() def show_interferometer_positions(self): self.device_manager.devices.rtx.controller.show_feedback_status() def show_signal_strength(self): self.device_manager.devices.rtx.controller.show_signal_strength_interferometer() def show_analog_signals(self): return self.device_manager.devices.rtx.controller.show_analog_signals() def lights_off(self): self.device_manager.devices.lsamx.controller.lights_off() def lights_on(self): self.device_manager.devices.lsamx.controller.lights_on() # ------------------------------------------------------------------ # Global parameters (backed by BEC global vars) # ------------------------------------------------------------------ @property def tomo_shellstep(self): val = self.client.get_global_var("tomo_shellstep") if val is None: return 1 return val @tomo_shellstep.setter def tomo_shellstep(self, val: float): self.client.set_global_var("tomo_shellstep", val) @property def tomo_circfov(self): val = self.client.get_global_var("tomo_circfov") if val is None: return 0.0 return val @tomo_circfov.setter def tomo_circfov(self, val: float): self.client.set_global_var("tomo_circfov", val) @property def tomo_type(self): val = self.client.get_global_var("tomo_type") if val is None: return 1 return val @tomo_type.setter def tomo_type(self, val: int): if val not in (1, 2, 3): raise ValueError("Unknown tomo_type. Must be 1, 2 or 3.") self.client.set_global_var("tomo_type", val) @property def tomo_countingtime(self): val = self.client.get_global_var("tomo_countingtime") if val is None: return 0.1 return val @tomo_countingtime.setter def tomo_countingtime(self, val: float): self.client.set_global_var("tomo_countingtime", val) @property def manual_shift_x(self): val = self.client.get_global_var("manual_shift_x") if val is None: return 0.0 return val @manual_shift_x.setter def manual_shift_x(self, val: float): self.client.set_global_var("manual_shift_x", val) @property def manual_shift_y(self): val = self.client.get_global_var("manual_shift_y") if val is None: return 0.0 return val @manual_shift_y.setter def manual_shift_y(self, val: float): self.client.set_global_var("manual_shift_y", val) @property def lamni_piezo_range_x(self): val = self.client.get_global_var("lamni_piezo_range_x") if val is None: return 20 return val @lamni_piezo_range_x.setter def lamni_piezo_range_x(self, val: float): if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True): self.client.set_global_var("lamni_piezo_range_x", val) return if val > 80: raise ValueError("Piezo range cannot be larger than 80 um.") self.client.set_global_var("lamni_piezo_range_x", val) @property def lamni_piezo_range_y(self): val = self.client.get_global_var("lamni_piezo_range_y") if val is None: return 20 return val @lamni_piezo_range_y.setter def lamni_piezo_range_y(self, val: float): if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True): self.client.set_global_var("lamni_piezo_range_y", val) return if val > 80: raise ValueError("Piezo range cannot be larger than 80 um.") self.client.set_global_var("lamni_piezo_range_y", val) @property def corridor_size(self): val = self.client.get_global_var("corridor_size") if val is None: val = -1 return val @corridor_size.setter def corridor_size(self, val: float): self.client.set_global_var("corridor_size", val) @property def lamni_stitch_x(self): val = self.client.get_global_var("lamni_stitch_x") if val is None: return 0 return val @lamni_stitch_x.setter @typechecked def lamni_stitch_x(self, val: int): self.client.set_global_var("lamni_stitch_x", val) @property def lamni_stitch_y(self): val = self.client.get_global_var("lamni_stitch_y") if val is None: return 0 return val @lamni_stitch_y.setter @typechecked def lamni_stitch_y(self, val: int): self.client.set_global_var("lamni_stitch_y", val) @property def ptycho_reconstruct_foldername(self): val = self.client.get_global_var("ptycho_reconstruct_foldername") if val is None: return "ptycho_reconstruct" return val @ptycho_reconstruct_foldername.setter def ptycho_reconstruct_foldername(self, val: str): self.client.set_global_var("ptycho_reconstruct_foldername", val) self.reconstructor.folder_name = val # keep reconstructor in sync @property def tomo_angle_stepsize(self): val = self.client.get_global_var("tomo_angle_stepsize") if val is None: return 10.0 return val @tomo_angle_stepsize.setter def tomo_angle_stepsize(self, val: float): self.client.set_global_var("tomo_angle_stepsize", val) @property def tomo_stitch_overlap(self): val = self.client.get_global_var("tomo_stitch_overlap") if val is None: return 0.2 return val @tomo_stitch_overlap.setter def tomo_stitch_overlap(self, val: float): self.client.set_global_var("tomo_stitch_overlap", val) @property def golden_max_number_of_projections(self): val = self.client.get_global_var("golden_max_number_of_projections") if val is None: return 1000.0 return val @golden_max_number_of_projections.setter def golden_max_number_of_projections(self, val: float): self.client.set_global_var("golden_max_number_of_projections", val) @property def golden_ratio_bunch_size(self): val = self.client.get_global_var("golden_ratio_bunch_size") if val is None: return 20 return val @golden_ratio_bunch_size.setter def golden_ratio_bunch_size(self, val: int): if val < 20: raise ValueError("golden_ratio_bunch_size must be at least 20.") self.client.set_global_var("golden_ratio_bunch_size", val) @property def golden_projections_at_0_deg_for_damage_estimation(self): val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation") if val is None: return 0 return val @golden_projections_at_0_deg_for_damage_estimation.setter def golden_projections_at_0_deg_for_damage_estimation(self, val: int): self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val) @property def sample_name(self): val = self.client.get_global_var("sample_name") if val is None: return "bec_test_sample" return val @sample_name.setter @typechecked def sample_name(self, val: str): self.client.set_global_var("sample_name", val) # ------------------------------------------------------------------ # Logging helpers # ------------------------------------------------------------------ def write_to_scilog(self, content, tags: list = None): try: if tags is not None: tags.append("BEC") else: tags = ["BEC"] msg = bec.logbook.LogbookMessage() msg.add_text(content).add_tag(tags) self.client.logbook.send_logbook_message(msg) except Exception: logger.warning("Failed to write to scilog.") def _write_subtomo_to_scilog(self, subtomo_number): bec = builtins.__dict__.get("bec") if self.tomo_id > 0: tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"] else: tags = ["BEC_subtomo", self.sample_name] self.write_to_scilog( f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.", tags, ) def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: tomo_scan_numbers_file = os.path.expanduser( "~/Data10/specES1/dat-files/tomography_scannumbers.txt" ) with open(tomo_scan_numbers_file, "a+") as out_file: out_file.write( f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f}" f" {self.tomo_id} {subtomo_number} {0} {'lamni'}\n" ) # ------------------------------------------------------------------ # Sample database — delegated to TomoIDManager in omny general tools # ------------------------------------------------------------------
[docs] def add_sample_database( self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user ): """Add a sample to the OMNY sample database and retrieve the tomo id.""" return self.tomo_id_manager.register( sample_name=samplename, date=date, eaccount=eaccount, scan_number=scan_number, setup=setup, additional_info=sample_additional_info, user=user, )
# ------------------------------------------------------------------ # Scan projection # ------------------------------------------------------------------ def tomo_scan_projection(self, angle: float): scans = builtins.__dict__.get("scans") additional_correction = self.align.compute_additional_correction(angle) additional_correction_2 = self.align.compute_additional_correction_2(angle) correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle) self._current_scan_list = [] for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1): for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1): self._current_scan_list.append(bec.queue.next_scan_number) log_message = ( f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle}," f" scan number {bec.queue.next_scan_number}.\n" ) corridor_size = self.corridor_size if self.corridor_size > 0 else None scans.lamni_fermat_scan( fov_size=[self.lamni_piezo_range_x, self.lamni_piezo_range_y], step=self.tomo_shellstep, stitch_x=stitch_x, stitch_y=stitch_y, stitch_overlap=self.tomo_stitch_overlap, center_x=self.align.tomo_fovx_offset, center_y=self.align.tomo_fovy_offset, shift_x=( self.manual_shift_x + correction_xeye_mu[0] - additional_correction[0] - additional_correction_2[0] ), shift_y=( self.manual_shift_y + correction_xeye_mu[1] - additional_correction[1] - additional_correction_2[1] ), fov_circular=self.tomo_circfov, angle=angle, scan_type="fly", exp_time=self.tomo_countingtime, optim_trajectory_corridor=corridor_size, )
[docs] def tomo_reconstruct(self, base_path="~/Data10/specES1"): """Write the tomo reconstruct file for the reconstruction queue.""" bec = builtins.__dict__.get("bec") self.reconstructor.write( scan_list=self._current_scan_list, next_scan_number=bec.queue.next_scan_number, base_path=base_path, )
def _at_each_angle(self, angle: float) -> None: self.tomo_scan_projection(angle) self.tomo_reconstruct() # ------------------------------------------------------------------ # Progress reporting # ------------------------------------------------------------------ def _print_progress(self): print("\x1b[95mProgress report:") print(f"Tomo type: ....................... {self.progress['tomo_type']}") print(f"Projection: ...................... {self.progress['projection']}") print(f"Total projections expected ....... {self.progress['total_projections']}") print(f"Angle: ........................... {self.progress['angle']}") print(f"Current subtomo: ................. {self.progress['subtomo']}") print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m") # ------------------------------------------------------------------ # Tomo scan orchestration # ------------------------------------------------------------------
[docs] def sub_tomo_scan(self, subtomo_number, start_angle=None): """Perform one sub-tomogram (tomo_type 1 only).""" self._write_subtomo_to_scilog(subtomo_number) if start_angle is None: offsets = {1: 0, 2: 4, 3: 2, 4: 6, 5: 1, 6: 5, 7: 3, 8: 7} start_angle = self.tomo_angle_stepsize / 8.0 * offsets[subtomo_number] angle_end = start_angle + 360 angles = np.linspace( start_angle, angle_end, num=int(360 / self.tomo_angle_stepsize) + 1, endpoint=True, ) if not (subtomo_number % 2): angles = np.flip(angles) for angle in angles: self.progress["tomo_type"] = "Equally spaced sub-tomograms" self.progress["subtomo"] = subtomo_number self.progress["subtomo_projection"] = np.where(angles == angle)[0][0] self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize self.progress["projection"] = ( (subtomo_number - 1) * self.progress["subtomo_total_projections"] + self.progress["subtomo_projection"] ) self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8 self.progress["angle"] = angle self._tomo_scan_at_angle(angle, subtomo_number)
def _tomo_scan_at_angle(self, angle, subtomo_number): successful = False error_caught = False if 0 <= angle < 360.05: print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}") self._print_progress() while not successful: if not self.special_angles: self._current_special_angles = [] if self._current_special_angles: next_special_angle = self._current_special_angles[0] if np.isclose(angle, next_special_angle, atol=0.5): self._current_special_angles.pop(0) num_repeats = self.special_angle_repeats else: num_repeats = 1 try: start_scan_number = bec.queue.next_scan_number for i in range(num_repeats): self._at_each_angle(angle) error_caught = False except AlarmBase as exc: if exc.alarm_type == "TimeoutError": bec.queue.request_queue_reset() time.sleep(2) error_caught = True else: raise exc end_scan_number = bec.queue.next_scan_number for scan_nr in range(start_scan_number, end_scan_number): self._write_tomo_scan_number(scan_nr, angle, subtomo_number) #todo here bl chk, if ok then successfull true successful = True def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False): """Return the ii-th golden ratio angle within sorted bunches and its subtomo number.""" golden = [] for iji in range( (ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1 ): golden.append( ((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000 ) golden.sort() subtomo_number = int(ii / howmany_sorted) + 1 if reverse and not subtomo_number % 2: golden.reverse() return (golden[ii % howmany_sorted], subtomo_number) def _golden_equally_spaced( self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False ): """Return angles for equally spaced tomography with golden ratio sub-tomogram starting angles.""" angular_step = maxangle / number_of_projections_per_subtomo subtomo_number = int((ii * angular_step) / maxangle) + 1 start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0] projection_number_of_subtomo = ( ii - (subtomo_number - 1) * number_of_projections_per_subtomo ) if reverse: if subtomo_number % 2: angle = start_angle + projection_number_of_subtomo * angular_step else: angle = ( start_angle + (number_of_projections_per_subtomo - 1) * angular_step - projection_number_of_subtomo * angular_step ) else: angle = start_angle + projection_number_of_subtomo * angular_step if verbose: print( f"Equally spaced golden ratio tomography.\n" f"Angular step: {angular_step}\n" f"Subtomo Number: {subtomo_number}\n" f"Angle: {angle}" ) return angle, subtomo_number
[docs] def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None): """Start a tomo scan. Args: subtomo_start (int): For tomo_type 1, the sub-tomogram to start from. Defaults to 1. start_angle (float, optional): Override starting angle of the first sub-tomogram. projection_number (int, optional): For tomo_types 2 and 3, resume from this index. """ bec = builtins.__dict__.get("bec") scans = builtins.__dict__.get("scans") self._current_special_angles = self.special_angles.copy() if ( (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None) or (self.tomo_type == 2 and projection_number is None) or (self.tomo_type == 3 and projection_number is None) ): self.tomo_id = self.add_sample_database( self.sample_name, str(datetime.date.today()), bec.active_account.decode(), bec.queue.next_scan_number, "lamni", "test additional info", "BEC", ) self.write_pdf_report() with scans.dataset_id_on_hold: if self.tomo_type == 1: self.progress["tomo_type"] = "Equally spaced sub-tomograms" for ii in range(subtomo_start, 9): self.sub_tomo_scan(ii, start_angle=start_angle) start_angle = None elif self.tomo_type == 2: self.progress["tomo_type"] = "Golden ratio tomography" previous_subtomo_number = -1 ii = 0 if projection_number is None else projection_number while True: angle, subtomo_number = self._golden( ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True ) if previous_subtomo_number != subtomo_number: self._write_subtomo_to_scilog(subtomo_number) if ( subtomo_number % 2 == 1 and ii > 10 and self.golden_projections_at_0_deg_for_damage_estimation == 1 ): self._tomo_scan_at_angle(0, subtomo_number) previous_subtomo_number = subtomo_number self.progress["subtomo"] = subtomo_number self.progress["projection"] = ii self.progress["angle"] = angle self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size self.progress["subtomo_projection"] = ( ii - (subtomo_number - 1) * self.golden_ratio_bunch_size ) self.progress["total_projections"] = self.golden_max_number_of_projections self._tomo_scan_at_angle(angle, subtomo_number) ii += 1 if ( self.golden_max_number_of_projections > 0 and ii > self.golden_max_number_of_projections ): print( f"Golden ratio tomography stopped after" f" {self.golden_max_number_of_projections} projections." ) break elif self.tomo_type == 3: self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles" previous_subtomo_number = -1 ii = 0 if projection_number is None else projection_number while True: angle, subtomo_number = self._golden_equally_spaced( ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True ) if previous_subtomo_number != subtomo_number: self._write_subtomo_to_scilog(subtomo_number) if ( subtomo_number % 2 == 1 and ii > 10 and self.golden_projections_at_0_deg_for_damage_estimation == 1 ): self._tomo_scan_at_angle(0, subtomo_number) previous_subtomo_number = subtomo_number self.progress["subtomo"] = subtomo_number self.progress["projection"] = ii self.progress["angle"] = angle self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize self.progress["subtomo_projection"] = ( ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"] ) self.progress["total_projections"] = self.golden_max_number_of_projections self._tomo_scan_at_angle(angle, subtomo_number) ii += 1 if ( self.golden_max_number_of_projections > 0 and ii > self.golden_max_number_of_projections ): print( f"Golden ratio tomography stopped after" f" {self.golden_max_number_of_projections} projections." ) break else: raise ValueError(f"Unknown tomo_type: {self.tomo_type}.")
# ------------------------------------------------------------------ # Parameter display and interactive update # ------------------------------------------------------------------
[docs] def tomo_parameters(self): """Print and interactively update the tomo parameters.""" print("Current settings:") print(f"Counting time <ctime> = {self.tomo_countingtime} s") print(f"Stepsize microns <step> = {self.tomo_shellstep}") print( f"Piezo range (max 80) <microns> = {self.lamni_piezo_range_x}," f" {self.lamni_piezo_range_y}" ) print(f"Stitching number x,y = {self.lamni_stitch_x}, {self.lamni_stitch_y}") print(f"Stitching overlap = {self.tomo_stitch_overlap}") print(f"Circular FOV diam <microns> = {self.tomo_circfov}") print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}") print("FOV offset rotates to find the ROI; manual shift moves the rotation center.") print(f" _tomo_fovx_offset <mm> = {self.align.tomo_fovx_offset}") print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}") print(f" _manual_shift_x <mm> = {self.manual_shift_x}") print(f" _manual_shift_y <mm> = {self.manual_shift_y}") print("") if self.tomo_type == 1: print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms (360 deg)") print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}") elif self.tomo_type == 2: print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography") print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}") if self.golden_max_number_of_projections > 0: print(f"Ending after {self.golden_max_number_of_projections} projections.") else: print("Ending by manual interruption.") if self.golden_projections_at_0_deg_for_damage_estimation == 1: print("Repeating projections at 0 deg at start of every second subtomogram.") elif self.tomo_type == 3: print("\x1b[1mTomo type 3:\x1b[0m Equally spaced, golden ratio starting angles") print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") print(f"Number of projections per sub-tomogram: {360/self.tomo_angle_stepsize}") if self.golden_max_number_of_projections > 0: print(f"Ending after {self.golden_max_number_of_projections} projections.") else: print("Ending by manual interruption.") if self.golden_projections_at_0_deg_for_damage_estimation == 1: print("Repeating projections at 0 deg at start of every second subtomogram.") print(f"\nSample name: {self.sample_name}\n") user_input = input("Are these parameters correctly set for your scan? ") if user_input == "y": print("OK. continue.") return self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float) self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float) self.lamni_piezo_range_x = self._get_val( "<piezo range X (max 80)> um", self.lamni_piezo_range_x, float ) self.lamni_piezo_range_y = self._get_val( "<piezo range Y (max 80)> um", self.lamni_piezo_range_y, float ) self.lamni_stitch_x = self._get_val("<stitch X>", self.lamni_stitch_x, int) self.lamni_stitch_y = self._get_val("<stitch Y>", self.lamni_stitch_y, int) self.tomo_circfov = self._get_val("<circular FOV> um", self.tomo_circfov, float) self.ptycho_reconstruct_foldername = self._get_val( "Reconstruction queue", self.ptycho_reconstruct_foldername, str ) print("Tomography type:") print(" 1: 8 equally spaced sub-tomograms (360 deg)") print(" 2: Golden ratio tomography") print(" 3: Equally spaced tomography, golden ratio starting angle") self.tomo_type = self._get_val("Tomography type", self.tomo_type, int) if self.tomo_type == 1: tomo_numberofprojections = self._get_val( "Number of projections", 360 / self.tomo_angle_stepsize * 8, int ) self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8 print(f"Angular step in a subtomogram: {self.tomo_angle_stepsize}") elif self.tomo_type == 2: while True: bunch_size = self._get_val( "Number of projections sorted per bunch (minimum 20)", self.golden_ratio_bunch_size, int, ) if bunch_size >= 20: self.golden_ratio_bunch_size = bunch_size break print("Bunch size must be at least 20. Please try again.") self.golden_max_number_of_projections = self._get_val( "Stop after number of projections (0 for endless)", self.golden_max_number_of_projections, int, ) self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( "Repeat projections at 0 deg every second subtomo 1/0?", self.golden_projections_at_0_deg_for_damage_estimation, int, ) elif self.tomo_type == 3: numprj = self._get_val( "Number of projections per sub-tomogram", int(360 / self.tomo_angle_stepsize), int, ) self.tomo_angle_stepsize = 360 / numprj self.golden_max_number_of_projections = self._get_val( "Stop after number of projections (0 for endless)", self.golden_max_number_of_projections, int, ) self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( "Repeat projections at 0 deg every second subtomo 1/0?", self.golden_projections_at_0_deg_for_damage_estimation, int, ) self.sample_name = self._get_val("sample name", self.sample_name, str)
@staticmethod def _get_val(msg: str, default_value, data_type): return data_type(input(f"{msg} ({default_value}): ") or default_value) # ------------------------------------------------------------------ # PDF report # ------------------------------------------------------------------
[docs] def write_pdf_report(self): """Create and write the PDF report with current LamNI settings.""" dev = builtins.__dict__.get("dev") header = ( " \n" * 3 + " ::: ::: ::: ::: :::: ::: ::::::::::: \n" + " :+: :+: :+: :+:+: :+:+: :+:+: :+: :+: \n" + " +:+ +:+ +:+ +:+ +:+:+ +:+ :+:+:+ +:+ +:+ \n" + " +#+ +#++:++#++: +#+ +:+ +#+ +#+ +:+ +#+ +#+ \n" + " +#+ +#+ +#+ +#+ +#+ +#+ +#+#+# +#+ \n" + " #+# #+# #+# #+# #+# #+# #+#+# #+# \n" + " ########## ### ### ### ### ### #### ########### \n" ) padding = 20 piezo_range = f"{self.lamni_piezo_range_x:.2f}/{self.lamni_piezo_range_y:.2f}" stitching = f"{self.lamni_stitch_x:.2f}/{self.lamni_stitch_y:.2f}" dataset_id = str(self.client.queue.next_dataset_number) content = [ f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n", f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n", f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n", f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n", f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n", f"{'Number of projections:':<{padding}}{int(360 / self.tomo_angle_stepsize * 8):>{padding}}\n", f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n", f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(360 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n", f"{'Current photon energy:':<{padding}}{dev.mokev.read(cached=True)['value']:>{padding}.4f}\n", f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n", f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n", f"{'Piezo range (FOV sample plane):':<{padding}}{piezo_range:>{padding}}\n", f"{'Restriction to circular FOV:':<{padding}}{self.tomo_circfov:>{padding}.2f}\n", f"{'Stitching:':<{padding}}{stitching:>{padding}}\n", f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n", f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n", f"{'Tomo type:':<{padding}}{self.tomo_type:>{padding}}\n", ] content = "".join(content) user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf") with PDFWriter(user_target) as file: file.write(header) file.write(content) subprocess.run( "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True ) msg = bec.logbook.LogbookMessage() logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png") msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag( ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name] ) self.client.logbook.send_logbook_message(msg)
def get_calibration_of_capstops_left_and_right(self): import time print(""" Manual on how to center the Piezo stage first. To obtain the center voltages one can move in closed loop to the interferometer vertically and observe the capacitive readback signal. Check the limits of the travel, move to center and obtain the required centering voltage. Example: At 0 deg, accessible rty -60 to 51. So the init was 5 microns off. Then this routine here will provide data for the new capstop left and right. """) angle = 0 umv(dev.lsamrot,0) print("Capstop right\nAngle, Voltage1, Voltage2") mv(dev.lsamrot,361) while angle <= 360: angle = dev.lsamrot.readback.get() voltage1=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[1]")) voltage2=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[2]")) if angle<360: print(f"{angle},{voltage1},{voltage2}") time.sleep(.3) time.sleep(10) print("\nCapstop left\nAngle, Voltage1, Voltage2") mv(dev.lsamrot,-1) while angle > 0: angle = dev.lsamrot.readback.get() voltage1=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[1]")) voltage2=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[2]")) if angle>0: print(f"{angle},{voltage1},{voltage2}") time.sleep(.3) print("Finished")