import builtins
import datetime
import os
import subprocess
import time
from pathlib import Path
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
OMNYTools,
PtychoReconstructor,
TomoIDManager,
)
from csaxs_bec.bec_ipython_client.plugins.LamNI.gui_tools import LamniGuiTools
from .alignment import XrayEyeAlign
from .lamni_optics_mixin import LaMNIInitStages, LamNIOpticsMixin
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
mv = builtins.__dict__.get("mv")
umvr = builtins.__dict__.get("umvr")
[docs]
class LamNI(LamNIOpticsMixin, LamniGuiTools):
def __init__(self, client):
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStages(client)
# Extracted collaborators
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
self.OMNYTools = OMNYTools(self.client)
self.tomo_id = -1
self.special_angles = []
self.special_angle_repeats = 20
self.special_angle_tolerance = 20
self._current_special_angles = []
# Progress tracking
self.progress = {}
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
# ------------------------------------------------------------------
# Special angles
# ------------------------------------------------------------------
[docs]
def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5):
"""Set the special angles for a tomo.
Args:
angles (list): List of special angles.
repeats (int, optional): Number of repeats at a special angle. Defaults to 20.
tolerance (float, optional): Angle tolerance in degrees. Defaults to 0.5.
"""
self.special_angles = angles
self.special_angle_repeats = repeats
self.special_angle_tolerance = tolerance
[docs]
def remove_special_angles(self):
"""Remove the special angles and reset repeats to 1."""
self.special_angles = []
self.special_angle_repeats = 1
# ------------------------------------------------------------------
# RT feedback / interferometer helpers
# ------------------------------------------------------------------
def rt_off(self):
dev.rtx.enabled = False
dev.rty.enabled = False
def rt_on(self):
dev.rtx.enabled = True
dev.rty.enabled = True
if dev.rtx.enabled == True:
print("rt is enabled")
else:
print("failed to enable rt")
def feedback_enable_with_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_with_reset()
self.feedback_status()
def feedback_enable_without_reset(self):
self.device_manager.devices.rtx.controller.feedback_enable_without_reset()
self.feedback_status()
def feedback_disable(self):
self.device_manager.devices.rtx.controller.feedback_disable()
self.feedback_status()
def feedback_disable_and_reset_angle(self):
self.device_manager.devices.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
self.feedback_status()
def feedback_status(self):
self.device_manager.devices.rtx.controller.show_feedback_status()
def show_interferometer_positions(self):
self.device_manager.devices.rtx.controller.show_feedback_status()
def show_signal_strength(self):
self.device_manager.devices.rtx.controller.show_signal_strength_interferometer()
def show_analog_signals(self):
return self.device_manager.devices.rtx.controller.show_analog_signals()
def lights_off(self):
self.device_manager.devices.lsamx.controller.lights_off()
def lights_on(self):
self.device_manager.devices.lsamx.controller.lights_on()
# ------------------------------------------------------------------
# Global parameters (backed by BEC global vars)
# ------------------------------------------------------------------
@property
def tomo_shellstep(self):
val = self.client.get_global_var("tomo_shellstep")
if val is None:
return 1
return val
@tomo_shellstep.setter
def tomo_shellstep(self, val: float):
self.client.set_global_var("tomo_shellstep", val)
@property
def tomo_circfov(self):
val = self.client.get_global_var("tomo_circfov")
if val is None:
return 0.0
return val
@tomo_circfov.setter
def tomo_circfov(self, val: float):
self.client.set_global_var("tomo_circfov", val)
@property
def tomo_type(self):
val = self.client.get_global_var("tomo_type")
if val is None:
return 1
return val
@tomo_type.setter
def tomo_type(self, val: int):
if val not in (1, 2, 3):
raise ValueError("Unknown tomo_type. Must be 1, 2 or 3.")
self.client.set_global_var("tomo_type", val)
@property
def tomo_countingtime(self):
val = self.client.get_global_var("tomo_countingtime")
if val is None:
return 0.1
return val
@tomo_countingtime.setter
def tomo_countingtime(self, val: float):
self.client.set_global_var("tomo_countingtime", val)
@property
def manual_shift_x(self):
val = self.client.get_global_var("manual_shift_x")
if val is None:
return 0.0
return val
@manual_shift_x.setter
def manual_shift_x(self, val: float):
self.client.set_global_var("manual_shift_x", val)
@property
def manual_shift_y(self):
val = self.client.get_global_var("manual_shift_y")
if val is None:
return 0.0
return val
@manual_shift_y.setter
def manual_shift_y(self, val: float):
self.client.set_global_var("manual_shift_y", val)
@property
def lamni_piezo_range_x(self):
val = self.client.get_global_var("lamni_piezo_range_x")
if val is None:
return 20
return val
@lamni_piezo_range_x.setter
def lamni_piezo_range_x(self, val: float):
if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True):
self.client.set_global_var("lamni_piezo_range_x", val)
return
if val > 80:
raise ValueError("Piezo range cannot be larger than 80 um.")
self.client.set_global_var("lamni_piezo_range_x", val)
@property
def lamni_piezo_range_y(self):
val = self.client.get_global_var("lamni_piezo_range_y")
if val is None:
return 20
return val
@lamni_piezo_range_y.setter
def lamni_piezo_range_y(self, val: float):
if dev.rtx.user_parameter and dev.rtx.user_parameter.get("large_range_scan", True):
self.client.set_global_var("lamni_piezo_range_y", val)
return
if val > 80:
raise ValueError("Piezo range cannot be larger than 80 um.")
self.client.set_global_var("lamni_piezo_range_y", val)
@property
def corridor_size(self):
val = self.client.get_global_var("corridor_size")
if val is None:
val = -1
return val
@corridor_size.setter
def corridor_size(self, val: float):
self.client.set_global_var("corridor_size", val)
@property
def lamni_stitch_x(self):
val = self.client.get_global_var("lamni_stitch_x")
if val is None:
return 0
return val
@lamni_stitch_x.setter
@typechecked
def lamni_stitch_x(self, val: int):
self.client.set_global_var("lamni_stitch_x", val)
@property
def lamni_stitch_y(self):
val = self.client.get_global_var("lamni_stitch_y")
if val is None:
return 0
return val
@lamni_stitch_y.setter
@typechecked
def lamni_stitch_y(self, val: int):
self.client.set_global_var("lamni_stitch_y", val)
@property
def ptycho_reconstruct_foldername(self):
val = self.client.get_global_var("ptycho_reconstruct_foldername")
if val is None:
return "ptycho_reconstruct"
return val
@ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val)
self.reconstructor.folder_name = val # keep reconstructor in sync
@property
def tomo_angle_stepsize(self):
val = self.client.get_global_var("tomo_angle_stepsize")
if val is None:
return 10.0
return val
@tomo_angle_stepsize.setter
def tomo_angle_stepsize(self, val: float):
self.client.set_global_var("tomo_angle_stepsize", val)
@property
def tomo_stitch_overlap(self):
val = self.client.get_global_var("tomo_stitch_overlap")
if val is None:
return 0.2
return val
@tomo_stitch_overlap.setter
def tomo_stitch_overlap(self, val: float):
self.client.set_global_var("tomo_stitch_overlap", val)
@property
def golden_max_number_of_projections(self):
val = self.client.get_global_var("golden_max_number_of_projections")
if val is None:
return 1000.0
return val
@golden_max_number_of_projections.setter
def golden_max_number_of_projections(self, val: float):
self.client.set_global_var("golden_max_number_of_projections", val)
@property
def golden_ratio_bunch_size(self):
val = self.client.get_global_var("golden_ratio_bunch_size")
if val is None:
return 20
return val
@golden_ratio_bunch_size.setter
def golden_ratio_bunch_size(self, val: int):
if val < 20:
raise ValueError("golden_ratio_bunch_size must be at least 20.")
self.client.set_global_var("golden_ratio_bunch_size", val)
@property
def golden_projections_at_0_deg_for_damage_estimation(self):
val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation")
if val is None:
return 0
return val
@golden_projections_at_0_deg_for_damage_estimation.setter
def golden_projections_at_0_deg_for_damage_estimation(self, val: int):
self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val)
@property
def sample_name(self):
val = self.client.get_global_var("sample_name")
if val is None:
return "bec_test_sample"
return val
@sample_name.setter
@typechecked
def sample_name(self, val: str):
self.client.set_global_var("sample_name", val)
# ------------------------------------------------------------------
# Logging helpers
# ------------------------------------------------------------------
def write_to_scilog(self, content, tags: list = None):
try:
if tags is not None:
tags.append("BEC")
else:
tags = ["BEC"]
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(tags)
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to write to scilog.")
def _write_subtomo_to_scilog(self, subtomo_number):
bec = builtins.__dict__.get("bec")
if self.tomo_id > 0:
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
else:
tags = ["BEC_subtomo", self.sample_name]
self.write_to_scilog(
f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
tags,
)
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
)
with open(tomo_scan_numbers_file, "a+") as out_file:
out_file.write(
f"{scan_number} {angle} {dev.lsamrot.read()['lsamrot']['value']:.3f}"
f" {self.tomo_id} {subtomo_number} {0} {'lamni'}\n"
)
# ------------------------------------------------------------------
# Sample database — delegated to TomoIDManager in omny general tools
# ------------------------------------------------------------------
[docs]
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
"""Add a sample to the OMNY sample database and retrieve the tomo id."""
return self.tomo_id_manager.register(
sample_name=samplename,
date=date,
eaccount=eaccount,
scan_number=scan_number,
setup=setup,
additional_info=sample_additional_info,
user=user,
)
# ------------------------------------------------------------------
# Scan projection
# ------------------------------------------------------------------
def tomo_scan_projection(self, angle: float):
scans = builtins.__dict__.get("scans")
additional_correction = self.align.compute_additional_correction(angle)
additional_correction_2 = self.align.compute_additional_correction_2(angle)
correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle)
self._current_scan_list = []
for stitch_x in range(-self.lamni_stitch_x, self.lamni_stitch_x + 1):
for stitch_y in range(-self.lamni_stitch_y, self.lamni_stitch_y + 1):
self._current_scan_list.append(bec.queue.next_scan_number)
log_message = (
f"{str(datetime.datetime.now())}: LamNI scan projection at angle {angle},"
f" scan number {bec.queue.next_scan_number}.\n"
)
corridor_size = self.corridor_size if self.corridor_size > 0 else None
scans.lamni_fermat_scan(
fov_size=[self.lamni_piezo_range_x, self.lamni_piezo_range_y],
step=self.tomo_shellstep,
stitch_x=stitch_x,
stitch_y=stitch_y,
stitch_overlap=self.tomo_stitch_overlap,
center_x=self.align.tomo_fovx_offset,
center_y=self.align.tomo_fovy_offset,
shift_x=(
self.manual_shift_x
+ correction_xeye_mu[0]
- additional_correction[0]
- additional_correction_2[0]
),
shift_y=(
self.manual_shift_y
+ correction_xeye_mu[1]
- additional_correction[1]
- additional_correction_2[1]
),
fov_circular=self.tomo_circfov,
angle=angle,
scan_type="fly",
exp_time=self.tomo_countingtime,
optim_trajectory_corridor=corridor_size,
)
[docs]
def tomo_reconstruct(self, base_path="~/Data10/specES1"):
"""Write the tomo reconstruct file for the reconstruction queue."""
bec = builtins.__dict__.get("bec")
self.reconstructor.write(
scan_list=self._current_scan_list,
next_scan_number=bec.queue.next_scan_number,
base_path=base_path,
)
def _at_each_angle(self, angle: float) -> None:
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
# ------------------------------------------------------------------
# Progress reporting
# ------------------------------------------------------------------
def _print_progress(self):
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
# ------------------------------------------------------------------
# Tomo scan orchestration
# ------------------------------------------------------------------
[docs]
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""Perform one sub-tomogram (tomo_type 1 only)."""
self._write_subtomo_to_scilog(subtomo_number)
if start_angle is None:
offsets = {1: 0, 2: 4, 3: 2, 4: 6, 5: 1, 6: 5, 7: 3, 8: 7}
start_angle = self.tomo_angle_stepsize / 8.0 * offsets[subtomo_number]
angle_end = start_angle + 360
angles = np.linspace(
start_angle,
angle_end,
num=int(360 / self.tomo_angle_stepsize) + 1,
endpoint=True,
)
if not (subtomo_number % 2):
angles = np.flip(angles)
for angle in angles:
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
self.progress["subtomo"] = subtomo_number
self.progress["subtomo_projection"] = np.where(angles == angle)[0][0]
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
self.progress["projection"] = (
(subtomo_number - 1) * self.progress["subtomo_total_projections"]
+ self.progress["subtomo_projection"]
)
self.progress["total_projections"] = 360 / self.tomo_angle_stepsize * 8
self.progress["angle"] = angle
self._tomo_scan_at_angle(angle, subtomo_number)
def _tomo_scan_at_angle(self, angle, subtomo_number):
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
#todo here bl chk, if ok then successfull true
successful = True
def _golden(self, ii, howmany_sorted, maxangle=360, reverse=False):
"""Return the ii-th golden ratio angle within sorted bunches and its subtomo number."""
golden = []
for iji in range(
(ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1
):
golden.append(
((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000
)
golden.sort()
subtomo_number = int(ii / howmany_sorted) + 1
if reverse and not subtomo_number % 2:
golden.reverse()
return (golden[ii % howmany_sorted], subtomo_number)
def _golden_equally_spaced(
self, ii, number_of_projections_per_subtomo, maxangle=360, reverse=True, verbose=False
):
"""Return angles for equally spaced tomography with golden ratio sub-tomogram starting angles."""
angular_step = maxangle / number_of_projections_per_subtomo
subtomo_number = int((ii * angular_step) / maxangle) + 1
start_angle = self._golden(subtomo_number - 1, 1, angular_step)[0]
projection_number_of_subtomo = (
ii - (subtomo_number - 1) * number_of_projections_per_subtomo
)
if reverse:
if subtomo_number % 2:
angle = start_angle + projection_number_of_subtomo * angular_step
else:
angle = (
start_angle
+ (number_of_projections_per_subtomo - 1) * angular_step
- projection_number_of_subtomo * angular_step
)
else:
angle = start_angle + projection_number_of_subtomo * angular_step
if verbose:
print(
f"Equally spaced golden ratio tomography.\n"
f"Angular step: {angular_step}\n"
f"Subtomo Number: {subtomo_number}\n"
f"Angle: {angle}"
)
return angle, subtomo_number
[docs]
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""Start a tomo scan.
Args:
subtomo_start (int): For tomo_type 1, the sub-tomogram to start from. Defaults to 1.
start_angle (float, optional): Override starting angle of the first sub-tomogram.
projection_number (int, optional): For tomo_types 2 and 3, resume from this index.
"""
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
if (
(self.tomo_type == 1 and subtomo_start == 1 and start_angle is None)
or (self.tomo_type == 2 and projection_number is None)
or (self.tomo_type == 3 and projection_number is None)
):
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
elif self.tomo_type == 2:
self.progress["tomo_type"] = "Golden ratio tomography"
previous_subtomo_number = -1
ii = 0 if projection_number is None else projection_number
while True:
angle, subtomo_number = self._golden(
ii, self.golden_ratio_bunch_size, maxangle=360, reverse=True
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.golden_ratio_bunch_size
)
self.progress["total_projections"] = self.golden_max_number_of_projections
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
self.golden_max_number_of_projections > 0
and ii > self.golden_max_number_of_projections
):
print(
f"Golden ratio tomography stopped after"
f" {self.golden_max_number_of_projections} projections."
)
break
elif self.tomo_type == 3:
self.progress["tomo_type"] = "Equally spaced, golden ratio starting angles"
previous_subtomo_number = -1
ii = 0 if projection_number is None else projection_number
while True:
angle, subtomo_number = self._golden_equally_spaced(
ii, int(360 / self.tomo_angle_stepsize), maxangle=360, reverse=True
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
self.progress["subtomo_total_projections"] = 360 / self.tomo_angle_stepsize
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"]
)
self.progress["total_projections"] = self.golden_max_number_of_projections
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
self.golden_max_number_of_projections > 0
and ii > self.golden_max_number_of_projections
):
print(
f"Golden ratio tomography stopped after"
f" {self.golden_max_number_of_projections} projections."
)
break
else:
raise ValueError(f"Unknown tomo_type: {self.tomo_type}.")
# ------------------------------------------------------------------
# Parameter display and interactive update
# ------------------------------------------------------------------
[docs]
def tomo_parameters(self):
"""Print and interactively update the tomo parameters."""
print("Current settings:")
print(f"Counting time <ctime> = {self.tomo_countingtime} s")
print(f"Stepsize microns <step> = {self.tomo_shellstep}")
print(
f"Piezo range (max 80) <microns> = {self.lamni_piezo_range_x},"
f" {self.lamni_piezo_range_y}"
)
print(f"Stitching number x,y = {self.lamni_stitch_x}, {self.lamni_stitch_y}")
print(f"Stitching overlap = {self.tomo_stitch_overlap}")
print(f"Circular FOV diam <microns> = {self.tomo_circfov}")
print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}")
print("FOV offset rotates to find the ROI; manual shift moves the rotation center.")
print(f" _tomo_fovx_offset <mm> = {self.align.tomo_fovx_offset}")
print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print("")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms (360 deg)")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
elif self.tomo_type == 2:
print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography")
print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}")
if self.golden_max_number_of_projections > 0:
print(f"Ending after {self.golden_max_number_of_projections} projections.")
else:
print("Ending by manual interruption.")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print("Repeating projections at 0 deg at start of every second subtomogram.")
elif self.tomo_type == 3:
print("\x1b[1mTomo type 3:\x1b[0m Equally spaced, golden ratio starting angles")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Number of projections per sub-tomogram: {360/self.tomo_angle_stepsize}")
if self.golden_max_number_of_projections > 0:
print(f"Ending after {self.golden_max_number_of_projections} projections.")
else:
print("Ending by manual interruption.")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print("Repeating projections at 0 deg at start of every second subtomogram.")
print(f"\nSample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? ")
if user_input == "y":
print("OK. continue.")
return
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)
self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float)
self.lamni_piezo_range_x = self._get_val(
"<piezo range X (max 80)> um", self.lamni_piezo_range_x, float
)
self.lamni_piezo_range_y = self._get_val(
"<piezo range Y (max 80)> um", self.lamni_piezo_range_y, float
)
self.lamni_stitch_x = self._get_val("<stitch X>", self.lamni_stitch_x, int)
self.lamni_stitch_y = self._get_val("<stitch Y>", self.lamni_stitch_y, int)
self.tomo_circfov = self._get_val("<circular FOV> um", self.tomo_circfov, float)
self.ptycho_reconstruct_foldername = self._get_val(
"Reconstruction queue", self.ptycho_reconstruct_foldername, str
)
print("Tomography type:")
print(" 1: 8 equally spaced sub-tomograms (360 deg)")
print(" 2: Golden ratio tomography")
print(" 3: Equally spaced tomography, golden ratio starting angle")
self.tomo_type = self._get_val("Tomography type", self.tomo_type, int)
if self.tomo_type == 1:
tomo_numberofprojections = self._get_val(
"Number of projections", 360 / self.tomo_angle_stepsize * 8, int
)
self.tomo_angle_stepsize = 360 / tomo_numberofprojections * 8
print(f"Angular step in a subtomogram: {self.tomo_angle_stepsize}")
elif self.tomo_type == 2:
while True:
bunch_size = self._get_val(
"Number of projections sorted per bunch (minimum 20)",
self.golden_ratio_bunch_size,
int,
)
if bunch_size >= 20:
self.golden_ratio_bunch_size = bunch_size
break
print("Bunch size must be at least 20. Please try again.")
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (0 for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
elif self.tomo_type == 3:
numprj = self._get_val(
"Number of projections per sub-tomogram",
int(360 / self.tomo_angle_stepsize),
int,
)
self.tomo_angle_stepsize = 360 / numprj
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (0 for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
self.sample_name = self._get_val("sample name", self.sample_name, str)
@staticmethod
def _get_val(msg: str, default_value, data_type):
return data_type(input(f"{msg} ({default_value}): ") or default_value)
# ------------------------------------------------------------------
# PDF report
# ------------------------------------------------------------------
[docs]
def write_pdf_report(self):
"""Create and write the PDF report with current LamNI settings."""
dev = builtins.__dict__.get("dev")
header = (
" \n" * 3
+ " ::: ::: ::: ::: :::: ::: ::::::::::: \n"
+ " :+: :+: :+: :+:+: :+:+: :+:+: :+: :+: \n"
+ " +:+ +:+ +:+ +:+ +:+:+ +:+ :+:+:+ +:+ +:+ \n"
+ " +#+ +#++:++#++: +#+ +:+ +#+ +#+ +:+ +#+ +#+ \n"
+ " +#+ +#+ +#+ +#+ +#+ +#+ +#+#+# +#+ \n"
+ " #+# #+# #+# #+# #+# #+# #+#+# #+# \n"
+ " ########## ### ### ### ### ### #### ########### \n"
)
padding = 20
piezo_range = f"{self.lamni_piezo_range_x:.2f}/{self.lamni_piezo_range_y:.2f}"
stitching = f"{self.lamni_stitch_x:.2f}/{self.lamni_stitch_y:.2f}"
dataset_id = str(self.client.queue.next_dataset_number)
content = [
f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n",
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int(360 / self.tomo_angle_stepsize * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(360 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}{dev.mokev.read(cached=True)['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'Piezo range (FOV sample plane):':<{padding}}{piezo_range:>{padding}}\n",
f"{'Restriction to circular FOV:':<{padding}}{self.tomo_circfov:>{padding}.2f}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
f"{'Tomo type:':<{padding}}{self.tomo_type:>{padding}}\n",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
with PDFWriter(user_target) as file:
file.write(header)
file.write(content)
subprocess.run(
"xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
)
msg = bec.logbook.LogbookMessage()
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
)
self.client.logbook.send_logbook_message(msg)
def get_calibration_of_capstops_left_and_right(self):
import time
print("""
Manual on how to center the Piezo stage first.
To obtain the center voltages one can move in closed loop to the interferometer
vertically and observe the capacitive readback signal. Check the limits of the
travel, move to center and obtain the required centering voltage.
Example: At 0 deg, accessible rty -60 to 51. So the init was 5 microns off.
Then this routine here will provide data for the new capstop left and right.
""")
angle = 0
umv(dev.lsamrot,0)
print("Capstop right\nAngle, Voltage1, Voltage2")
mv(dev.lsamrot,361)
while angle <= 360:
angle = dev.lsamrot.readback.get()
voltage1=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[1]"))
voltage2=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[2]"))
if angle<360:
print(f"{angle},{voltage1},{voltage2}")
time.sleep(.3)
time.sleep(10)
print("\nCapstop left\nAngle, Voltage1, Voltage2")
mv(dev.lsamrot,-1)
while angle > 0:
angle = dev.lsamrot.readback.get()
voltage1=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[1]"))
voltage2=float(dev.lsamrot.controller.socket_put_and_receive("MG@AN[2]"))
if angle>0:
print(f"{angle},{voltage1},{voltage2}")
time.sleep(.3)
print("Finished")