import builtins
import datetime
import os
import subprocess
import time
from pathlib import Path
import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.omny.gui_tools import OMNYGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_alignment_mixin import OMNYAlignmentMixin
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_optics_mixin import OMNYOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.omny.omny_rt import OMNY_rt_client
from csaxs_bec.bec_ipython_client.plugins.omny.omny_sample_transfer_mixin import (
OMNYSampleTransferMixin,
)
from csaxs_bec.bec_ipython_client.plugins.omny.x_ray_eye_align import XrayEyeAlign
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
scans = builtins.__dict__.get("scans")
[docs]
def umv(*args):
return scans.umv(*args, relative=False)
[docs]
class OMNYInitError(Exception):
pass
[docs]
class OMNYError(Exception):
pass
[docs]
class OMNYInitStagesMixin:
def enable_all_devices(self):
dev.ofzpx.enabled = True
dev.ofzpy.enabled = True
dev.ofzpz.enabled = True
dev.otransx.enabled = True
dev.otransy.enabled = True
dev.otransz.enabled = True
dev.osamx.enabled = True
dev.osamz.enabled = True
dev.oosay.enabled = True
dev.oosax.enabled = True
dev.oosaz.enabled = True
dev.oparkz.enabled = True
dev.oshuttleopen.enabled = True
dev.oshuttlealign.enabled = True
dev.osamy.enabled = True
dev.otracky.enabled = True
dev.osamroy.enabled = True
dev.otrackz.enabled = True
dev.oeyex.enabled = True
dev.oeyez.enabled = True
dev.oeyey.enabled = True
def _omny_find_reference_loop(self, direction_endswitch, device, auto_retry=0):
retry_counter = 0
limits = device.get_motor_limit_switch()
if not limits[0] and not limits[1]:
raise OMNYInitError(
"Expect to be in a limit switch when calling the method find reference loop."
)
if limits[0] and limits[1]:
raise OMNYInitError("Both limit switches are triggered. This indicates an error.")
if limits[0]:
direction_endswitch_start = "reverse"
if limits[1]:
direction_endswitch_start = "forward"
if direction_endswitch_start != direction_endswitch:
raise OMNYInitError("Expect current endswitch to match the call of the method.")
if device.axis_is_referenced():
raise OMNYInitError("This axis is already referenced")
while not device.axis_is_referenced():
self._check_for_folerr_and_reset(device)
device.find_reference(raise_error=0)
time.sleep(1)
if not device.axis_is_referenced():
if auto_retry == 0:
if self.OMNYTools.yesno(
"Did not reference successfully. Try again to move to endswitch and find reference?"
):
self._check_for_folerr_and_reset(device)
device.drive_axis_to_limit(direction_endswitch_start)
time.sleep(1)
elif auto_retry > 0:
if retry_counter < auto_retry:
self.OMNYTools.printgreen(
f"Did not reference successfully. Try again to move to endswitch and find reference. Attempt {retry_counter+1} out of max {auto_retry}"
)
self._check_for_folerr_and_reset(device)
device.drive_axis_to_limit(direction_endswitch_start)
time.sleep(1)
retry_counter += 1
temperature = device.get_motor_temperature()
while temperature > 100:
self.OMNYTools.printgreen(
f"The temperature of the motor is {temperature}, and larger than the threshold of 100 degC. Waiting 60 s for cool down."
)
time.sleep(60)
temperature = device.get_motor_temperature()
if retry_counter == auto_retry:
self.OMNYTools.printgreen(
"Maximum automatic init retries reached. Setting auto to 0 offering manual continuation."
)
auto_retry = 0
def _check_for_folerr_and_reset(self, device):
# if device.folerr_status():
# user_input = (f"There is a following error in axis {device}. Reset and ignore to continue? y/n")
# if user_input == "y":
# for now we just reset and ignore. because it might just occur at the beginning of move to ES
device._ogalil_folerr_reset_and_ignore()
time.sleep(0.3)
def omny_init_stages(self, autoconfirm=0, autoretry=0):
if self.OMNYTools.yesno(
"Starting initialization of OMNY stages.", "y", autoconfirm=autoconfirm
):
self.OMNYTools.printgreen("starting...")
else:
return
# #maybe replace 2 by something
# axis_id = dev.fheater._config["deviceConfig"].get("axis_Id")
# axis_id_numeric = self.axis_id_to_numeric(axis_id)
if not dev.oshield.controller.axis_is_referenced(2):
if self.OMNYTools.yesno(
"The smaract stage of the cryo shield has to be referenced first. Continue?", "y"
):
self.omny_init_smaract()
else:
return
if self.check_all_axes_of_OMNY_referenced():
if self.OMNYTools.yesno(
"All axes are referenced already. Reset and reference again?", "n"
):
self.OMNYTools.printgreen("ok then...")
else:
return
if np.fabs(dev.oshield.get().readback) > 0.1:
if self.OMNYTools.yesno(
"oshield is not around pos 0. Can the OSA be moved to -z limit then -x limit. Risk of collition!"
):
dev.oosaz.drive_axis_to_limit("forward")
dev.oosax.drive_axis_to_limit("forward")
else:
raise OMNYInitError("Automatic init not possible. Initialize manually")
umv(dev.oshield, 0)
dev.ofzpx.controller.socket_put_confirmed("XQ#CRESET")
dev.osamx.controller.socket_put_confirmed("XQ#CRESET")
dev.osamy.controller.socket_put_confirmed("XQ#CRESET")
time.sleep(1)
if self.OMNYTools.yesno(
"drive fzp to -Z,-X,+Y limit and reference?", "y", autoconfirm=autoconfirm
):
dev.ofzpz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("ofzpz reached limit")
dev.ofzpy.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("ofzpy reached limit")
dev.ofzpx.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("ofzpx reached limit")
# dev.ofzpz.find_reference()
self._omny_find_reference_loop("forward", dev.ofzpz, auto_retry=autoretry)
self.OMNYTools.printgreen("ofzpz referenced")
# dev.ofzpy.find_reference()
self._omny_find_reference_loop("forward", dev.ofzpy, auto_retry=autoretry)
self.OMNYTools.printgreen("ofzpy referenced")
# dev.ofzpx.find_reference()
self._omny_find_reference_loop("reverse", dev.ofzpx, auto_retry=autoretry)
self.OMNYTools.printgreen("ofzpx referenced")
# ranges:
# X -4.9 to 7.2
# Y -3.5 to 7.9
# Z -3 to 95.6
else:
return
if self.OMNYTools.yesno("drive otransy stage to +Y limit?", "y"):
dev.otransy.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otrany reached limit")
else:
return
self.OMNYTools.printgreen(
"For the next step the gripper has to be at a good z position. Risk of collision!"
)
if self.OMNYTools.yesno("drive otransx stage to +X limit and find reference?"):
dev.otransx.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otransx reached limit")
# dev.otransx.find_reference()
self._omny_find_reference_loop("forward", dev.otransx, auto_retry=autoretry)
self.OMNYTools.printgreen("otransx referenced")
else:
return
if self.OMNYTools.yesno(
"drive otransz stage to +Z limit and reference?", "y", autoconfirm=autoconfirm
):
dev.otransz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otransz reached limit")
# dev.otransz.find_reference()
self._omny_find_reference_loop("forward", dev.otransz, auto_retry=autoretry)
self.OMNYTools.printgreen("otransz referenced")
else:
return
# get xeye out of the way
if self.OMNYTools.yesno("drive oeyey to +Y limit?", "y"):
dev.oeyey.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oeyey reached limit")
else:
return
if self.OMNYTools.yesno("drive oeyex to -X limit?", "y", autoconfirm=autoconfirm):
dev.oeyex.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("oeyex reached limit")
else:
return
if self.OMNYTools.yesno("drive oeyez to +Z limit?", "y", autoconfirm=autoconfirm):
dev.oeyez.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oeyez reached limit")
else:
return
if self.OMNYTools.yesno("reference oeyex, then oeyez?", "y", autoconfirm=autoconfirm):
# dev.oeyex.find_reference()
self._omny_find_reference_loop("reverse", dev.oeyex, auto_retry=autoretry)
self.OMNYTools.printgreen("oeyex referenced")
# dev.oeyez.find_reference()
self._omny_find_reference_loop("forward", dev.oeyez, auto_retry=autoretry)
self.OMNYTools.printgreen("oeyez referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of oeyey?", "y", autoconfirm=autoconfirm):
# print("oeyey now in limit")
dev.oeyez.drive_axis_to_limit("forward")
dev.oeyey.controller.socket_put_confirmed("indspeed[7]=2000")
# dev.oeyey.find_reference()
self._omny_find_reference_loop("forward", dev.oeyey, auto_retry=autoretry)
self.OMNYTools.printgreen("oeyey is now referenced")
# ensure closed shuttle and shuttle aligner down
if not dev.oshuttleopen.get_motor_limit_switch()[0]:
dev.oshuttleopen.drive_axis_to_limit("reverse")
if not dev.oshuttleopen.get_motor_limit_switch()[0]:
raise OMNYInitError("Failed to drive shuttle opener to reverse limit")
else:
self.OMNYTools.printgreen("shuttle opener is down, i.e. in safe position to continue.")
# move shuttle aligner down
# self._otransfer_shuttle_aligner_down()
# the above is ins sample transfer mixin. can it be accessible from here? so code below
if not dev.oshuttlealign.get_motor_limit_switch()[1]:
dev.oshuttlealign.drive_axis_to_limit("forward")
if not dev.oshuttlealign.get_motor_limit_switch()[1]:
raise OMNYInitError("Failed to drive shuttle opener to forward limit")
else:
self.OMNYTools.printgreen("shuttle aligner is down, i.e. in safe position to continue.")
if self.OMNYTools.yesno(
"drive parking stage to +Z limit and reference?", "y", autoconfirm=autoconfirm
):
dev.oparkz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oparkz is in limit")
# dev.oparkz.find_reference()
self._omny_find_reference_loop("forward", dev.oparkz, auto_retry=autoretry)
self.OMNYTools.printgreen("oparkz is referenced")
else:
return
if self.OMNYTools.yesno(
"OK, tricky part. Can the OSA be moved to -z limit then -x limit. Risk of collition!"
):
dev.oosaz.drive_axis_to_limit("forward")
dev.oosax.drive_axis_to_limit("forward")
else:
raise OMNYInitError(
"Automatic init not possible in this case. Please initialize manually."
)
if self.OMNYTools.yesno("drive otrackz to -Z limit?", "y", autoconfirm=autoconfirm):
dev.otrackz.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otrackz is in limit")
else:
return
if self.OMNYTools.yesno("drive osamx to +X limit?", "y", autoconfirm=autoconfirm):
dev.osamx.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("osamx is in limit")
else:
return
if self.OMNYTools.yesno("drive oosay to +Y limit?", "y", autoconfirm=autoconfirm):
dev.oosay.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("oosay is in limit")
else:
return
if self.OMNYTools.yesno("find reference mark of osamx?", "y", autoconfirm=autoconfirm):
self._omny_find_reference_loop("forward", dev.osamx, auto_retry=autoretry)
# dev.osamx.find_reference()
self.OMNYTools.printgreen("osamx is referenced")
# osamx range limit -2 to 7 good for cryolink
# if osamz is ever needed, here is how it could be initialized (in spec code)
# if (!_ogalil_axis_is_referenced_mne(osamx) || fabs(A[osamx]) > 0.1 )
# {
# printf("The osamx stage is not referenced or is not at the reference position. Aborting.\n")
# exit
# }
# drive osamz to -Z limit
# +Z direction is critical because of cryolink connection
# if(!yesno("drive osamz to -Z limit and perform reference search?",1))
# exit
# osamz to limit
# _ogalil_drive_to_limit_mne(osamz,1)
# _ogalil_find_reference_mark_mne(osamz)
# set_lm osamz -.5 .5
else:
return
if self.OMNYTools.yesno("drive osamy to -Y limit?", "y"):
dev.osamy.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("osamy is in limit")
else:
return
if self.OMNYTools.yesno("perform reference search of oosax?", "y"):
self._omny_find_reference_loop("forward", dev.oosax)
# dev.oosax.find_reference()
self.OMNYTools.printgreen("oosax is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of oosay?", "y"):
self._omny_find_reference_loop("forward", dev.oosay)
# dev.oosay.find_reference()
self.OMNYTools.printgreen("oosay is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of osamy?", "y"):
self._omny_find_reference_loop("reverse", dev.osamy)
# dev.osamy.find_reference()
self.OMNYTools.printgreen("osamy is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of oosaz?", "y"):
self._omny_find_reference_loop("forward", dev.oosaz)
# dev.oosaz.find_reference()
self.OMNYTools.printgreen("oosaz is referenced")
else:
return
if self.OMNYTools.yesno(
"find endswitch and reference mark of osamroy?", "y", autoconfirm=autoconfirm
):
dev.osamroy.drive_axis_to_limit("reverse")
self.OMNYTools.printgreen("osamroy is in limit")
self._omny_find_reference_loop("reverse", dev.osamroy, auto_retry=autoretry)
# dev.osamroy.find_reference()
self.OMNYTools.printgreen("osamroy is referenced")
else:
return
if self.OMNYTools.yesno("find reference mark of transfer Y?", autoconfirm=autoconfirm):
self._omny_find_reference_loop("forward", dev.otransy, auto_retry=autoretry)
# dev.otransy.find_reference()
self.OMNYTools.printgreen("otransy is referenced")
else:
return
if self.OMNYTools.yesno(
"find endswitches and reference marks of tracking stage system?",
"y",
autoconfirm=autoconfirm,
):
dev.otracky.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otracky is in limit")
self._omny_find_reference_loop("forward", dev.otracky, auto_retry=autoretry)
# dev.otracky.find_reference()
self.OMNYTools.printgreen("otracky is referenced")
self._omny_find_reference_loop("forward", dev.otrackz, auto_retry=autoretry)
# dev.otrackz.find_reference()
self.OMNYTools.printgreen("otrackz is referenced")
else:
return
# all three controllers
dev.ofzpx.controller._ogalil_folerr_not_ignore()
dev.osamx.controller._ogalil_folerr_not_ignore()
dev.osamy.controller._ogalil_folerr_not_ignore()
# adjust acceleration of tracking stages
dev.osamy.controller.socket_put_confirmed("ACE=11264")
dev.osamy.controller.socket_put_confirmed("ACB=11264")
dev.osamy.controller.socket_put_confirmed("DCE=11264")
dev.osamy.controller.socket_put_confirmed("DCB=11264")
dev.osamy.controller.galil_show_all()
self._initial_alignment()
self._setalllimitsomny()
def omny_init_smaract(self):
# _smar_rt_set_max_closed_loop_frequency(0,0,3000)
# _smar_rt_set_max_closed_loop_frequency(0,1,3000)
# _smar_rt_set_max_closed_loop_frequency(0,2,3000)
dev.oshield.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(2)
dev.oshield.controller.find_reference_mark(0, 0, 1000, 1)
time.sleep(2)
if not dev.otransy.get_motor_limit_switch()[1]:
if self.OMNYTools.yesno("drive otransy stage to +Y limit?", "y"):
dev.otransy.drive_axis_to_limit("forward")
self.OMNYTools.printgreen("otransy reached limit")
else:
self.OMNYTools.printgreen("otransy is in upper limit, i.e. safe condition")
if self.OMNYTools.yesno(
"Problematic part: Can the OSA be moved to -z limit then -x limit. Risk of collition!"
):
dev.oosaz.drive_axis_to_limit("forward")
dev.oosax.drive_axis_to_limit("forward")
else:
raise OMNYInitError("Automatic init not possible.")
dev.oshield.controller.find_reference_mark(2, 0, 1000, 1)
time.sleep(2)
dev.oshield.limits = [-14.5, 15.8]
dev.ocsx.limits = [-2, 5.2]
dev.ocsy.limits = [-1, 3]
dev.oshield.controller.set_closed_loop_move_speed(0, 2)
dev.oshield.controller.set_closed_loop_move_speed(1, 2)
dev.oshield.controller.set_closed_loop_move_speed(2, 2)
# enable sensor for repeatable open loop motion
dev.oshield.controller.socket_put_and_receive("SSE1")
if self.OMNYTools.yesno("Move CS out of the beam path?", "y"):
umv(dev.ocsx, 5.2, dev.ocsy, 2)
self.OMNYTools.printgreen("Finished initialization of OMNY smaract system")
def _setalllimitsomny(self):
# dev.rtx.limits = [-250, 250]
# dev.rty.limits = [-250, 250]
# dev.rtz.limits = [-250, 250]
dev.osamx.limits = [-1, 1]
dev.osamy.limits = [2.3000, 3.0000]
dev.osamroy.limits = [-2.0000, 362.0000]
dev.oeyex.limits = [-48.0000, 10.0000]
dev.oeyez.limits = [-85.0000, 10.0000]
dev.oeyey.limits = [-61.0000, 1.0000]
dev.oosay.limits = [0.0800, 1.0000]
dev.oosax.limits = [-3.0000, 3.7000]
dev.oosaz.limits = [6.4000, 6.6000]
dev.ofzpx.limits = [-2.0000, 2.0000]
dev.ofzpy.limits = [-1.0000, 3.0000]
dev.ofzpz.limits = [-2.5000, 60.0000]
dev.otransx.limits = [-459.0000, 33.0000]
dev.otransy.limits = [-41.0000, 1.0000]
dev.otransz.limits = [-70.5000, 11.0000]
dev.oparkz.limits = [-165.0000, 1.0000]
dev.otracky.limits = [-3.0000, -7.0000]
dev.otrackz.limits = [-2.0000, 1.0000]
dev.ocsy.limits = [-2.0000, 2.0000]
dev.ocsx.limits = [-2.0000, 2.0000]
dev.oshield.limits = [-14.5000, 15.8000]
def _initial_alignment(self):
if self.OMNYTools.yesno("Start moving stages to default initial positions?"):
self.OMNYTools.printgreen("Start moving stages...")
else:
self.OMNYTools.printgreen("Stopping.")
return
self.oeye_cam_in()
umv(dev.oosay, 0.5)
dev.oosay.limits = [0.4, 0.6]
dev.osamy.limits = [-1, 3]
umv(dev.osamy, 2.7, dev.ofzpx, -0.8, dev.ofzpy, 1.6)
dev.osamy.limits = [2.5, 2.9]
dev.ofzpx.limits = [-2, 0]
dev.ofzpy.limits = [-0, 3]
dev.ofzpz.limits = [-2.5, 60]
if "rtx" in dev and dev.rtx.enabled:
dev.rtx.controller.feedback_disable()
self._oshield_ST_close()
dev.otracky.limits = [-7, -3]
dev.otrackz.limits = [-2, 1]
otracky_start_pos = self.OMNYTools._get_user_param_safe("otracky", "start_pos")
otrackz_start_pos = self.OMNYTools._get_user_param_safe("otrackz", "start_pos")
umv(dev.otracky, otracky_start_pos, dev.otrackz, otrackz_start_pos)
# adjust acceleration and speed of osamx
dev.osamx.controller.socket_put_confirmed("SPA=100")
dev.osamx.controller.socket_put_confirmed("ACA=1878")
dev.osamx.controller.socket_put_confirmed("DCA=187")
# adjust acceleration of tracking stages
dev.osamy.controller.socket_put_confirmed("ACE=11264")
dev.osamy.controller.socket_put_confirmed("DCE=11264")
dev.osamy.controller.socket_put_confirmed("DCB=11264")
def check_all_axes_of_OMNY_referenced(self) -> bool:
if dev.ofzpx.controller.all_axes_referenced():
self.OMNYTools.printgreen("All axes of OMNY are referenced.")
return True
else:
return False
[docs]
class OMNY(
OMNYInitStagesMixin,
OMNYSampleTransferMixin,
OMNYAlignmentMixin,
OMNYOpticsMixin,
cSAXSBeamlineChecks,
OMNY_rt_client,
OMNYGuiTools,
):
def __init__(self, client):
super().__init__()
self.client = client
self.device_manager = client.device_manager
self.check_shutter = False
self.check_light_available = False
self.check_fofb = False
self._check_msgs = []
self.tomo_id = -1
self.special_angles = []
self.special_angle_repeats = 20
self.special_angle_tolerance = 20
self._current_special_angles = []
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.corr_pos_y = []
self.corr_angle_y = []
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
OMNY_rt_client.__init__(self)
self.align = XrayEyeAlign(self.client, self)
OMNYGuiTools.__init__(self, self.client)
def start_x_ray_eye_alignment(self):
if self.OMNYTools.yesno(
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.",
"y",
):
self.align = XrayEyeAlign(self.client, self)
try:
self.align.align()
except KeyboardInterrupt as exc:
osamx_in = self._get_user_param_safe(dev.osamx, "in")
if np.isclose(osamx_in, dev.osamx.readback.get(), 0.5):
self.OMNYTools.printgreen("Stopping alignment. Returning to osamx in position.")
self.feedback_disable()
umv(dev.osamx, osamx_in)
raise exc
def xrayeye_update_frame(self):
self.align.update_frame()
def xrayeye_alignment_start(self):
self.start_x_ray_eye_alignment()
# def drive_axis_to_limit(self, device, direction):
# axis_id = device._config["deviceConfig"].get("axis_Id")
# axis_id_numeric = self.axis_id_to_numeric(axis_id)
# device.controller.drive_axis_to_limit(axis_id_numeric, direction)
def axis_id_to_numeric(self, axis_id) -> int:
return ord(axis_id.lower()) - 97
def get_beamline_checks_enabled(self):
print(
f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:"
f" {self.check_light_available}"
)
@property
def beamline_checks_enabled(self):
return {
"shutter": self.check_shutter,
"fofb": self.check_fofb,
"light available": self.check_light_available,
}
@beamline_checks_enabled.setter
def beamline_checks_enabled(self, val: bool):
self.check_shutter = val
self.check_light_available = val
self.check_fofb = val
self.get_beamline_checks_enabled()
[docs]
def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5):
"""Set the special angles for a tomo
Args:
angles (list): List of special angles
repeats (int, optional): Number of repeats at a special angle. Defaults to 20.
tolerance (float, optional): Number of repeats at a special angle. Defaults to 0.5.
"""
self.special_angles = angles
self.special_angle_repeats = repeats
self.special_angle_tolerance = tolerance
[docs]
def remove_special_angles(self):
"""Remove the special angles and set the number of repeats to 1"""
self.special_angles = []
self.special_angle_repeats = 1
@property
def near_field(self):
val = self.client.get_global_var("near_field")
if val is None:
return False
return val
@near_field.setter
def near_field(self, val: bool):
self.client.set_global_var("near_field", val)
@property
def tomo_shellstep(self):
val = self.client.get_global_var("tomo_shellstep")
if val is None:
return 1
return val
@tomo_shellstep.setter
def tomo_shellstep(self, val: float):
self.client.set_global_var("tomo_shellstep", val)
@property
def tomo_countingtime(self):
val = self.client.get_global_var("tomo_countingtime")
if val is None:
return 0.1
return val
@tomo_countingtime.setter
def tomo_countingtime(self, val: float):
self.client.set_global_var("tomo_countingtime", val)
@property
def manual_shift_y(self):
val = self.client.get_global_var("manual_shift_y")
if val is None:
return 0.0
return val
@manual_shift_y.setter
def manual_shift_y(self, val: float):
self.client.set_global_var("manual_shift_y", val)
@property
def fovx(self):
val = self.client.get_global_var("fovx")
if val is None:
return 20
return val
@fovx.setter
def fovx(self, val: float):
if val > 200:
raise ValueError("FOV cannot be larger than 200 um.")
self.client.set_global_var("fovx", val)
@property
def fovy(self):
val = self.client.get_global_var("fovy")
if val is None:
return 20
return val
@fovy.setter
def fovy(self, val: float):
if val > 100:
raise ValueError("FOV cannot be larger than 100 um.")
self.client.set_global_var("fovy", val)
@property
def tomo_type(self):
val = self.client.get_global_var("tomo_type")
if val is None:
return 1
return val
@tomo_type.setter
def tomo_type(self, val: float):
if val == 1:
# equally spaced tomography with 2 sub tomograms
self.client.set_global_var("tomo_type", val)
elif val == 2:
# golden ratio tomography (sorted bunches)
self.client.set_global_var("tomo_type", val)
elif val == 3:
# equally spaced tomography with starting angles shifted by golden ratio
self.client.set_global_var("tomo_type", val)
else:
raise ValueError("Unknown tomo_type.")
@property
def corridor_size(self):
val = self.client.get_global_var("corridor_size")
if val is None:
val = -1
return val
@corridor_size.setter
def corridor_size(self, val: float):
self.client.set_global_var("corridor_size", val)
@property
def stitch_x(self):
val = self.client.get_global_var("stitch_x")
if val is None:
return 0
return val
@stitch_x.setter
@typechecked
def stitch_x(self, val: int):
self.client.set_global_var("stitch_x", val)
@property
def stitch_y(self):
val = self.client.get_global_var("stitch_y")
if val is None:
return 0
return val
@stitch_y.setter
@typechecked
def stitch_y(self, val: int):
self.client.set_global_var("stitch_y", val)
@property
def ptycho_reconstruct_foldername(self):
val = self.client.get_global_var("ptycho_reconstruct_foldername")
if val is None:
return "ptycho_reconstruct"
return val
@ptycho_reconstruct_foldername.setter
def ptycho_reconstruct_foldername(self, val: str):
self.client.set_global_var("ptycho_reconstruct_foldername", val)
@property
def tomo_angle_stepsize(self):
val = self.client.get_global_var("tomo_angle_stepsize")
if val is None:
return 10.0
return val
@tomo_angle_stepsize.setter
def tomo_angle_stepsize(self, val: float):
self.client.set_global_var("tomo_angle_stepsize", val)
@property
def golden_max_number_of_projections(self):
val = self.client.get_global_var("golden_max_number_of_projections")
if val is None:
return 1000.0
return val
@golden_max_number_of_projections.setter
def golden_max_number_of_projections(self, val: float):
self.client.set_global_var("golden_max_number_of_projections", val)
@property
def tomo_stitch_overlap(self):
val = self.client.get_global_var("tomo_stitch_overlap")
if val is None:
return 0.2
return val
@tomo_stitch_overlap.setter
def tomo_stitch_overlap(self, val: float):
self.client.set_global_var("tomo_stitch_overlap", val)
@property
def golden_projections_at_0_deg_for_damage_estimation(self):
val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation")
if val is None:
return 0
return val
@golden_projections_at_0_deg_for_damage_estimation.setter
def golden_projections_at_0_deg_for_damage_estimation(self, val: float):
self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val)
@property
def golden_ratio_bunch_size(self):
val = self.client.get_global_var("golden_ratio_bunch_size")
if val is None:
return 20
return val
@golden_ratio_bunch_size.setter
def golden_ratio_bunch_size(self, val: float):
self.client.set_global_var("golden_ratio_bunch_size", val)
@property
def sample_name(self):
return dev.omny_samples.get_sample_name_in_samplestage()
def write_to_scilog(self, content, tags: list = None):
try:
if tags is not None:
tags.append("BEC")
else:
tags = ["BEC"]
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(tags)
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to write to scilog.")
[docs]
def tomo_alignment_scan(self):
"""
Performs a tomogram alignment scan.
"""
if self.get_alignment_offset(0) == (0, 0, 0):
print("It appears that the xrayeye alignment was not performend or loaded. Aborting.")
return
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
tags = ["BEC_alignment_tomo", self.sample_name]
self.write_to_scilog(
f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags
)
start_angle = 0
angle_end = start_angle + 180
for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True):
successful = False
error_caught = False
if 0 <= angle < 180.05:
print(f"Starting OMNY scan for angle {angle}")
while not successful:
self._start_beam_check()
try:
start_scan_number = bec.queue.next_scan_number
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, 0)
print("Alignment scan finished. Please run SPEC_ptycho_align and load the new fit.")
umv(dev.osamroy, 0)
def _write_subtomo_to_scilog(self, subtomo_number):
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
if self.tomo_id > 0:
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
else:
tags = ["BEC_subtomo", self.sample_name]
self.write_to_scilog(
f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
tags,
)
[docs]
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""
Performs a sub tomogram scan.
Args:
subtomo_number (int): The sub tomogram number.
start_angle (float, optional): The start angle of the scan. Defaults to None.
"""
self._write_subtomo_to_scilog(subtomo_number)
if start_angle is None:
if subtomo_number == 1:
start_angle = 0
elif subtomo_number == 2:
start_angle = self.tomo_angle_stepsize / 2.0
# _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0
angle_end = start_angle + 180
angles = np.linspace(
start_angle + _tomo_shift_angles,
angle_end,
num=int(180 / self.tomo_angle_stepsize) + 1,
endpoint=True,
)
# reverse even sub-tomogram
if not (subtomo_number % 2):
angles = np.flip(angles)
for angle in angles:
self.progress["subtomo"] = subtomo_number
self.progress["subtomo_projection"] = np.where(angles == angle)[0][0]
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 2
self.progress["angle"] = angle
self._tomo_scan_at_angle(angle, subtomo_number)
def _tomo_scan_at_angle(self, angle, subtomo_number):
successful = False
error_caught = False
if 0 <= angle < 180.05:
print(f"Starting OMNY scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
self.omnygui_show_progress()
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
[docs]
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""start a tomo scan"""
bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
# a new tomo scan was started
if (
(self.tomo_type == 1 and subtomo_start == 1 and start_angle is None)
or (self.tomo_type == 2 and projection_number == None)
or (self.tomo_type == 3 and projection_number == None)
):
# pylint: disable=undefined-variable
if bec.active_account != "":
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"OMNY",
"test additional info",
"BEC",
)
self.write_pdf_report()
else:
self.tomo_id = 0
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
# 2 equally spaced sub-tomograms
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
for ii in range(subtomo_start, 3):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
elif self.tomo_type == 2:
# Golden ratio tomography
previous_subtomo_number = -1
if projection_number == None:
ii = 0
else:
ii = projection_number
while True:
angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["tomo_type"] = "Golden ratio tomography"
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
if self.golden_ratio_bunch_size > 0:
self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.golden_ratio_bunch_size
)
else:
self.progress["subtomo_total_projections"] = 0
self.progress["subtomo_projection"] = 0
if self.golden_max_number_of_projections > 0:
self.progress["total_projections"] = self.golden_max_number_of_projections
else:
self.progress["total_projections"] = 0
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
ii > self.golden_max_number_of_projections
and self.golden_max_number_of_projections > 0
):
print(
f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections"
)
break
elif self.tomo_type == 3:
# Equally spaced tomography, golden ratio starting angle
previous_subtomo_number = -1
if projection_number == None:
ii = 0
else:
ii = projection_number
while True:
angle, subtomo_number = self._golden_equally_spaced(
ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
and self.golden_projections_at_0_deg_for_damage_estimation == 1
):
self._tomo_scan_at_angle(0, subtomo_number)
previous_subtomo_number = subtomo_number
self.progress["tomo_type"] = (
"Equally spaced tomography, golden ratio starting angle"
)
self.progress["subtomo"] = subtomo_number
self.progress["projection"] = ii
self.progress["angle"] = angle
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
self.progress["subtomo_projection"] = (
ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"]
)
if self.golden_max_number_of_projections > 0:
self.progress["total_projections"] = self.golden_max_number_of_projections
else:
self.progress["total_projections"] = 0
self._tomo_scan_at_angle(angle, subtomo_number)
ii += 1
if (
ii > self.golden_max_number_of_projections
and self.golden_max_number_of_projections > 0
):
print(
f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections"
)
break
else:
raise OMNYError("undefined tomo type")
def _print_progress(self):
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']:.0f}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}")
print(
f"Total projections per subtomo: ... {self.progress['subtomo_total_projections']}\x1b[0m"
)
[docs]
def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
):
"""Add a sample to the omny sample database. This also retrieves the tomo id."""
subprocess.run(
f"wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}&additional={sample_additional_info}&user={user}'",
shell=True,
)
with open("/tmp/currsamplesnr.txt") as tomo_number_file:
tomo_number = int(tomo_number_file.read())
return tomo_number
def _at_each_angle(self, angle: float) -> None:
if "omny_at_each_angle" in builtins.__dict__:
# pylint: disable=undefined-variable
omny_at_each_angle(self, angle)
return
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
def _golden(self, ii, howmany_sorted, maxangle, reverse=False):
"""returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number"""
golden = []
# occupy array with the range of golden angles
for iji in range(
(ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1
):
golden.append(
((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000
)
golden.sort()
subtomo_number = int(ii / howmany_sorted) + 1
if reverse and not subtomo_number % 2:
golden.reverse()
return (golden[ii % howmany_sorted], subtomo_number)
def _golden_equally_spaced(
self, ii, number_of_projections_per_subtomo, maxangle, reverse, verbose
):
"""returns angles for equally spaced tomography with starting angles of sub tomograms shifted according to golden ratio"""
"""ii is projection number starting at 1, reverse will execute the even sub tomograms in reverse direction"""
# ii is projection number starting at 1
angular_step = maxangle / number_of_projections_per_subtomo
subtomo_number = int(((ii - 1) * angular_step) / maxangle) + 1
start_angle = self._golden(subtomo_number - 1, 1, angular_step)
projection_number_of_subtomo = (
ii - (subtomo_number - 1) * number_of_projections_per_subtomo
) - 1
if reverse:
if subtomo_number % 2:
angle = start_angle[0] + projection_number_of_subtomo * angular_step
else:
angle = (
start_angle[0]
+ (number_of_projections_per_subtomo - 1) * angular_step
- projection_number_of_subtomo * angular_step
)
else:
angle = start_angle[0] + projection_number_of_subtomo * angular_step
if verbose:
print(
f"Equally spaced golden ratio tomography.\nAngular step: {angular_step}\nSubtomo Number: {subtomo_number}\nAngle: {angle}"
)
return angle, subtomo_number
[docs]
def tomo_reconstruct(self, base_path="~/Data10/specES1"):
"""write the tomo reconstruct file for the reconstruction queue"""
bec = builtins.__dict__.get("bec")
base_path = os.path.expanduser(base_path)
ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername))
ptycho_queue_path.mkdir(parents=True, exist_ok=True)
# pylint: disable=undefined-variable
last_scan_number = bec.queue.next_scan_number - 1
ptycho_queue_file = os.path.abspath(
os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat")
)
with open(ptycho_queue_file, "w") as queue_file:
scans = " ".join([str(scan) for scan in self._current_scan_list])
queue_file.write(f"p.scan_number {scans}\n")
queue_file.write("p.check_nextscan_started 1\n")
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
)
with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable
out_file.write(
f"{scan_number} {angle} {dev.osamroy.read()['osamroy']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {self.sample_name}\n"
)
def tomo_scan_projection(self, angle: float):
scans = builtins.__dict__.get("scans")
# additional_correction = self.align.compute_additional_correction(angle)
# additional_correction_2 = self.align.compute_additional_correction_2(angle)
# correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle)
offsets = self.get_alignment_offset(angle)
sum_offset_x = offsets[0] - self.compute_additional_correction_x(angle)
sum_offset_y = (
offsets[1]
- self.compute_additional_correction_y(angle)
- self.compute_additional_correction_y_2(angle)
)
sum_offset_z = offsets[2]
self._current_scan_list = []
for stitch_x in range(-self.stitch_x, self.stitch_x + 1):
for stitch_y in range(-self.stitch_y, self.stitch_y + 1):
# pylint: disable=undefined-variable
corridor_size = self.corridor_size if self.corridor_size > 0 else None
self._current_scan_list.append(bec.queue.next_scan_number)
cenx = sum_offset_x + stitch_x * (self.fovx - self.tomo_stitch_overlap)
ceny = sum_offset_y + stitch_y * (self.fovy - self.tomo_stitch_overlap)
logger.info(
f"scans.omny_fermat_scan(fovx={self.fovx}, fovy={self.fovy},"
f" step={self.tomo_shellstep}, cenx={cenx}, ceny={ceny},"
f" zshift={sum_offset_z}, angle={angle},"
f" exp_time={self.tomo_countingtime}, corridor_size={corridor_size})"
)
log_message = (
f"{str(datetime.datetime.now())}: omny scan projection at angle {angle}, scan"
f" number {bec.queue.next_scan_number}.\n"
)
# self.write_to_scilog(log_message, ["BEC_scans", self.sample_name])
scans.omny_fermat_scan(
fovx=self.fovx,
fovy=self.fovy,
step=self.tomo_shellstep,
cenx=cenx,
ceny=ceny,
zshift=sum_offset_z,
angle=angle,
exp_time=self.tomo_countingtime,
corridor_size=corridor_size,
)
[docs]
def tomo_parameters(self):
"""print and update the tomo parameters"""
print("Current settings:")
print(f"Counting time <ctime> = {self.tomo_countingtime} s")
print(f"Stepsize microns <step> = {self.tomo_shellstep}")
print(f"FOV (200/100) <microns> = {self.fovx}, {self.fovy}")
print(f"Stitching number x,y = {self.stitch_x}, {self.stitch_y}")
print(f"Stitching overlap = {self.tomo_stitch_overlap}")
print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print("")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 2 equally spaced sub-tomograms")
print(f"Total number of projections: {180/self.tomo_angle_stepsize*2}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
if self.tomo_type == 2:
print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography")
print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}")
if self.golden_max_number_of_projections > 0:
print(f"ending after {self.golden_max_number_of_projections} projections")
else:
print("ending by manual interruption")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"repeating prjections at 0 degrees at the beginning of every second subtomogram"
)
if self.tomo_type == 3:
print(
"\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle"
)
print(f"Number of projections per sub-tomogram: {180/self.tomo_angle_stepsize}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
if self.golden_max_number_of_projections > 0:
print(f"ending after {self.golden_max_number_of_projections} projections")
else:
print("ending by manual interruption")
if self.golden_projections_at_0_deg_for_damage_estimation == 1:
print(
"repeating prjections at 0 degrees at the beginning of every second subtomogram"
)
print(f"\nSample name: {self.sample_name}\n")
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
print("... excellent!")
else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)
self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float)
self.fovx = self._get_val("<FOV X (max 200)> um", self.fovx, float)
self.fovy = self._get_val("<FOV Y (max 100)> um", self.fovy, float)
self.stitch_x = self._get_val("<stitch X>", self.stitch_x, int)
self.stitch_y = self._get_val("<stitch Y>", self.stitch_y, int)
self.ptycho_reconstruct_foldername = self._get_val(
"Reconstruction queue ", self.ptycho_reconstruct_foldername, str
)
print("Tomography type:")
print(" 1: 2 equally spaced sub-tomograms")
print(" 2: Golden ratio tomography")
print(" 3: Equally spaced tomography, golden ratio starting angle")
self.tomo_type = self._get_val("Tomography type", self.tomo_type, int)
if self.tomo_type == 1:
tomo_numberofprojections = self._get_val(
"Total number of projections", 180 / self.tomo_angle_stepsize * 2, int
)
print(f"The angular step will be {180/tomo_numberofprojections}")
self.tomo_angle_stepsize = 180 / tomo_numberofprojections * 2
print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}")
if self.tomo_type == 2:
code = self._get_val(
"This mode causes significant wear in OMNY. Enter authorization code.", 0, str
)
if code == "x12sa":
self.golden_ratio_bunch_size = self._get_val(
"Number of projections sorted per bunch (minimum 100)",
self.golden_ratio_bunch_size,
int,
)
if self.golden_ratio_bunch_size < 100:
print("Minimum of 100 selected.")
self.golden_ratio_bunch_size = 100
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (zero for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo 1/0 ?",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
else:
print(
"Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected."
)
self.tomo_type = 1
if self.tomo_type == 3:
code = self._get_val(
"This mode causes significant wear in OMNY. Enter authorization code.", 0, str
)
if code == "x12sa":
numprj = self._get_val(
"Number of projections per sub-tomogram (minimum 100)",
int(180 / self.tomo_angle_stepsize),
int,
)
if numprj < 100:
numprj = 100
print("Minimum of 100 selected.")
self.tomo_angle_stepsize = 180 / numprj
self.golden_max_number_of_projections = self._get_val(
"Stop after number of projections (zero for endless)",
self.golden_max_number_of_projections,
int,
)
self.golden_projections_at_0_deg_for_damage_estimation = self._get_val(
"Repeat projections at 0 deg every second subtomo",
self.golden_projections_at_0_deg_for_damage_estimation,
int,
)
else:
print(
"Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected."
)
self.tomo_type = 1
@staticmethod
def _get_val(msg: str, default_value, data_type):
return data_type(input(f"{msg} ({default_value}): ") or default_value)
def rt_off(self):
dev.rtx.enabled = False
dev.rty.enabled = False
dev.rtz.enabled = False
def rt_on(self):
dev.rtx.enabled = True
dev.rty.enabled = True
dev.rtz.enabled = True
if dev.rtx.enabled == True:
print("rt is enabled")
else:
print("failed to enable rt")
[docs]
def write_pdf_report(self):
"""create and write the pdf report with the current OMNY settings"""
dev = builtins.__dict__.get("dev")
# header = ""
header = (
" \n" * 3
+ " ,o888888o. ,8. ,8. b. 8 `8.`8888. ,8' \n"
+ " . 8888 `88. ,888. ,888. 888o. 8 `8.`8888. ,8' \n"
+ ",8 8888 `8b .`8888. .`8888. Y88888o. 8 `8.`8888. ,8' \n"
+ "88 8888 `8b ,8.`8888. ,8.`8888. .`Y888888o. 8 `8.`8888.,8' \n"
+ "88 8888 88 ,8'8.`8888,8^8.`8888. 8o. `Y888888o. 8 `8.`88888' \n"
+ "88 8888 88 ,8' `8.`8888' `8.`8888. 8`Y8o. `Y88888o8 `8. 8888 \n"
+ "88 8888 ,8P ,8' `8.`88' `8.`8888. 8 `Y8o. `Y8888 `8 8888 \n"
+ "`8 8888 ,8P ,8' `8.`' `8.`8888. 8 `Y8o. `Y8 8 8888 \n"
+ " ` 8888 ,88' ,8' `8 `8.`8888. 8 `Y8o.` 8 8888 \n"
+ " `8888888P' ,8' ` `8.`8888. 8 `Yo 8 8888 \n"
)
padding = 20
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
dataset_id = str(self.client.queue.next_dataset_number)
content = [
f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n",
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int(180 / self.tomo_angle_stepsize * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(180 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
"Add current temperature and pressure status",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
with PDFWriter(user_target) as file:
file.write(header)
file.write(content)
subprocess.run(
"xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
)
# status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True)
msg = bec.logbook.LogbookMessage()
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
)
self.client.logbook.send_logbook_message(msg)
def vcs_show_all():
dev.omny_vcs.show_all()
def vcs_valves_in_measurement_position():
dev.omny_vcs.valves_in_measurement_position()
def temperatures_show_all():
dev.omny_temperatures.show_all()
def dewar_show_all():
dev.omny_dewar.show_all()
if __name__ == "__main__":
import builtins
from bec_ipython_client import BECIPythonClient
bec = BECIPythonClient()
bec.start()
scans = bec.scans
dev = bec.device_manager.devices
builtins.__dict__["scans"] = scans
builtins.__dict__["dev"] = dev
builtins.__dict__["bec"] = bec
builtins.__dict__["umv"] = umv
omny = OMNY(bec)
omny.start_x_ray_eye_alignment()