Source code for csaxs_bec.bec_ipython_client.plugins.omny.omny

import builtins
import datetime
import os
import subprocess
import time
from pathlib import Path

import numpy as np
from bec_lib import bec_logger
from bec_lib.alarm_handler import AlarmBase
from bec_lib.pdf_writer import PDFWriter
from typeguard import typechecked

from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.omny.gui_tools import OMNYGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_alignment_mixin import OMNYAlignmentMixin
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_optics_mixin import OMNYOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.omny.omny_rt import OMNY_rt_client
from csaxs_bec.bec_ipython_client.plugins.omny.omny_sample_transfer_mixin import (
    OMNYSampleTransferMixin,
)
from csaxs_bec.bec_ipython_client.plugins.omny.x_ray_eye_align import XrayEyeAlign

logger = bec_logger.logger

if builtins.__dict__.get("bec") is not None:
    bec = builtins.__dict__.get("bec")
    dev = builtins.__dict__.get("dev")
    scans = builtins.__dict__.get("scans")

[docs] def umv(*args): return scans.umv(*args, relative=False)
[docs] class OMNYInitError(Exception): pass
[docs] class OMNYError(Exception): pass
[docs] class OMNYInitStagesMixin: def enable_all_devices(self): dev.ofzpx.enabled = True dev.ofzpy.enabled = True dev.ofzpz.enabled = True dev.otransx.enabled = True dev.otransy.enabled = True dev.otransz.enabled = True dev.osamx.enabled = True dev.osamz.enabled = True dev.oosay.enabled = True dev.oosax.enabled = True dev.oosaz.enabled = True dev.oparkz.enabled = True dev.oshuttleopen.enabled = True dev.oshuttlealign.enabled = True dev.osamy.enabled = True dev.otracky.enabled = True dev.osamroy.enabled = True dev.otrackz.enabled = True dev.oeyex.enabled = True dev.oeyez.enabled = True dev.oeyey.enabled = True def _omny_find_reference_loop(self, direction_endswitch, device, auto_retry=0): retry_counter = 0 limits = device.get_motor_limit_switch() if not limits[0] and not limits[1]: raise OMNYInitError( "Expect to be in a limit switch when calling the method find reference loop." ) if limits[0] and limits[1]: raise OMNYInitError("Both limit switches are triggered. This indicates an error.") if limits[0]: direction_endswitch_start = "reverse" if limits[1]: direction_endswitch_start = "forward" if direction_endswitch_start != direction_endswitch: raise OMNYInitError("Expect current endswitch to match the call of the method.") if device.axis_is_referenced(): raise OMNYInitError("This axis is already referenced") while not device.axis_is_referenced(): self._check_for_folerr_and_reset(device) device.find_reference(raise_error=0) time.sleep(1) if not device.axis_is_referenced(): if auto_retry == 0: if self.OMNYTools.yesno( "Did not reference successfully. Try again to move to endswitch and find reference?" ): self._check_for_folerr_and_reset(device) device.drive_axis_to_limit(direction_endswitch_start) time.sleep(1) elif auto_retry > 0: if retry_counter < auto_retry: self.OMNYTools.printgreen( f"Did not reference successfully. Try again to move to endswitch and find reference. Attempt {retry_counter+1} out of max {auto_retry}" ) self._check_for_folerr_and_reset(device) device.drive_axis_to_limit(direction_endswitch_start) time.sleep(1) retry_counter += 1 temperature = device.get_motor_temperature() while temperature > 100: self.OMNYTools.printgreen( f"The temperature of the motor is {temperature}, and larger than the threshold of 100 degC. Waiting 60 s for cool down." ) time.sleep(60) temperature = device.get_motor_temperature() if retry_counter == auto_retry: self.OMNYTools.printgreen( "Maximum automatic init retries reached. Setting auto to 0 offering manual continuation." ) auto_retry = 0 def _check_for_folerr_and_reset(self, device): # if device.folerr_status(): # user_input = (f"There is a following error in axis {device}. Reset and ignore to continue? y/n") # if user_input == "y": # for now we just reset and ignore. because it might just occur at the beginning of move to ES device._ogalil_folerr_reset_and_ignore() time.sleep(0.3) def omny_init_stages(self, autoconfirm=0, autoretry=0): if self.OMNYTools.yesno( "Starting initialization of OMNY stages.", "y", autoconfirm=autoconfirm ): self.OMNYTools.printgreen("starting...") else: return # #maybe replace 2 by something # axis_id = dev.fheater._config["deviceConfig"].get("axis_Id") # axis_id_numeric = self.axis_id_to_numeric(axis_id) if not dev.oshield.controller.axis_is_referenced(2): if self.OMNYTools.yesno( "The smaract stage of the cryo shield has to be referenced first. Continue?", "y" ): self.omny_init_smaract() else: return if self.check_all_axes_of_OMNY_referenced(): if self.OMNYTools.yesno( "All axes are referenced already. Reset and reference again?", "n" ): self.OMNYTools.printgreen("ok then...") else: return if np.fabs(dev.oshield.get().readback) > 0.1: if self.OMNYTools.yesno( "oshield is not around pos 0. Can the OSA be moved to -z limit then -x limit. Risk of collition!" ): dev.oosaz.drive_axis_to_limit("forward") dev.oosax.drive_axis_to_limit("forward") else: raise OMNYInitError("Automatic init not possible. Initialize manually") umv(dev.oshield, 0) dev.ofzpx.controller.socket_put_confirmed("XQ#CRESET") dev.osamx.controller.socket_put_confirmed("XQ#CRESET") dev.osamy.controller.socket_put_confirmed("XQ#CRESET") time.sleep(1) if self.OMNYTools.yesno( "drive fzp to -Z,-X,+Y limit and reference?", "y", autoconfirm=autoconfirm ): dev.ofzpz.drive_axis_to_limit("forward") self.OMNYTools.printgreen("ofzpz reached limit") dev.ofzpy.drive_axis_to_limit("forward") self.OMNYTools.printgreen("ofzpy reached limit") dev.ofzpx.drive_axis_to_limit("reverse") self.OMNYTools.printgreen("ofzpx reached limit") # dev.ofzpz.find_reference() self._omny_find_reference_loop("forward", dev.ofzpz, auto_retry=autoretry) self.OMNYTools.printgreen("ofzpz referenced") # dev.ofzpy.find_reference() self._omny_find_reference_loop("forward", dev.ofzpy, auto_retry=autoretry) self.OMNYTools.printgreen("ofzpy referenced") # dev.ofzpx.find_reference() self._omny_find_reference_loop("reverse", dev.ofzpx, auto_retry=autoretry) self.OMNYTools.printgreen("ofzpx referenced") # ranges: # X -4.9 to 7.2 # Y -3.5 to 7.9 # Z -3 to 95.6 else: return if self.OMNYTools.yesno("drive otransy stage to +Y limit?", "y"): dev.otransy.drive_axis_to_limit("forward") self.OMNYTools.printgreen("otrany reached limit") else: return self.OMNYTools.printgreen( "For the next step the gripper has to be at a good z position. Risk of collision!" ) if self.OMNYTools.yesno("drive otransx stage to +X limit and find reference?"): dev.otransx.drive_axis_to_limit("forward") self.OMNYTools.printgreen("otransx reached limit") # dev.otransx.find_reference() self._omny_find_reference_loop("forward", dev.otransx, auto_retry=autoretry) self.OMNYTools.printgreen("otransx referenced") else: return if self.OMNYTools.yesno( "drive otransz stage to +Z limit and reference?", "y", autoconfirm=autoconfirm ): dev.otransz.drive_axis_to_limit("forward") self.OMNYTools.printgreen("otransz reached limit") # dev.otransz.find_reference() self._omny_find_reference_loop("forward", dev.otransz, auto_retry=autoretry) self.OMNYTools.printgreen("otransz referenced") else: return # get xeye out of the way if self.OMNYTools.yesno("drive oeyey to +Y limit?", "y"): dev.oeyey.drive_axis_to_limit("forward") self.OMNYTools.printgreen("oeyey reached limit") else: return if self.OMNYTools.yesno("drive oeyex to -X limit?", "y", autoconfirm=autoconfirm): dev.oeyex.drive_axis_to_limit("reverse") self.OMNYTools.printgreen("oeyex reached limit") else: return if self.OMNYTools.yesno("drive oeyez to +Z limit?", "y", autoconfirm=autoconfirm): dev.oeyez.drive_axis_to_limit("forward") self.OMNYTools.printgreen("oeyez reached limit") else: return if self.OMNYTools.yesno("reference oeyex, then oeyez?", "y", autoconfirm=autoconfirm): # dev.oeyex.find_reference() self._omny_find_reference_loop("reverse", dev.oeyex, auto_retry=autoretry) self.OMNYTools.printgreen("oeyex referenced") # dev.oeyez.find_reference() self._omny_find_reference_loop("forward", dev.oeyez, auto_retry=autoretry) self.OMNYTools.printgreen("oeyez referenced") else: return if self.OMNYTools.yesno("find reference mark of oeyey?", "y", autoconfirm=autoconfirm): # print("oeyey now in limit") dev.oeyez.drive_axis_to_limit("forward") dev.oeyey.controller.socket_put_confirmed("indspeed[7]=2000") # dev.oeyey.find_reference() self._omny_find_reference_loop("forward", dev.oeyey, auto_retry=autoretry) self.OMNYTools.printgreen("oeyey is now referenced") # ensure closed shuttle and shuttle aligner down if not dev.oshuttleopen.get_motor_limit_switch()[0]: dev.oshuttleopen.drive_axis_to_limit("reverse") if not dev.oshuttleopen.get_motor_limit_switch()[0]: raise OMNYInitError("Failed to drive shuttle opener to reverse limit") else: self.OMNYTools.printgreen("shuttle opener is down, i.e. in safe position to continue.") # move shuttle aligner down # self._otransfer_shuttle_aligner_down() # the above is ins sample transfer mixin. can it be accessible from here? so code below if not dev.oshuttlealign.get_motor_limit_switch()[1]: dev.oshuttlealign.drive_axis_to_limit("forward") if not dev.oshuttlealign.get_motor_limit_switch()[1]: raise OMNYInitError("Failed to drive shuttle opener to forward limit") else: self.OMNYTools.printgreen("shuttle aligner is down, i.e. in safe position to continue.") if self.OMNYTools.yesno( "drive parking stage to +Z limit and reference?", "y", autoconfirm=autoconfirm ): dev.oparkz.drive_axis_to_limit("forward") self.OMNYTools.printgreen("oparkz is in limit") # dev.oparkz.find_reference() self._omny_find_reference_loop("forward", dev.oparkz, auto_retry=autoretry) self.OMNYTools.printgreen("oparkz is referenced") else: return if self.OMNYTools.yesno( "OK, tricky part. Can the OSA be moved to -z limit then -x limit. Risk of collition!" ): dev.oosaz.drive_axis_to_limit("forward") dev.oosax.drive_axis_to_limit("forward") else: raise OMNYInitError( "Automatic init not possible in this case. Please initialize manually." ) if self.OMNYTools.yesno("drive otrackz to -Z limit?", "y", autoconfirm=autoconfirm): dev.otrackz.drive_axis_to_limit("forward") self.OMNYTools.printgreen("otrackz is in limit") else: return if self.OMNYTools.yesno("drive osamx to +X limit?", "y", autoconfirm=autoconfirm): dev.osamx.drive_axis_to_limit("forward") self.OMNYTools.printgreen("osamx is in limit") else: return if self.OMNYTools.yesno("drive oosay to +Y limit?", "y", autoconfirm=autoconfirm): dev.oosay.drive_axis_to_limit("forward") self.OMNYTools.printgreen("oosay is in limit") else: return if self.OMNYTools.yesno("find reference mark of osamx?", "y", autoconfirm=autoconfirm): self._omny_find_reference_loop("forward", dev.osamx, auto_retry=autoretry) # dev.osamx.find_reference() self.OMNYTools.printgreen("osamx is referenced") # osamx range limit -2 to 7 good for cryolink # if osamz is ever needed, here is how it could be initialized (in spec code) # if (!_ogalil_axis_is_referenced_mne(osamx) || fabs(A[osamx]) > 0.1 ) # { # printf("The osamx stage is not referenced or is not at the reference position. Aborting.\n") # exit # } # drive osamz to -Z limit # +Z direction is critical because of cryolink connection # if(!yesno("drive osamz to -Z limit and perform reference search?",1)) # exit # osamz to limit # _ogalil_drive_to_limit_mne(osamz,1) # _ogalil_find_reference_mark_mne(osamz) # set_lm osamz -.5 .5 else: return if self.OMNYTools.yesno("drive osamy to -Y limit?", "y"): dev.osamy.drive_axis_to_limit("reverse") self.OMNYTools.printgreen("osamy is in limit") else: return if self.OMNYTools.yesno("perform reference search of oosax?", "y"): self._omny_find_reference_loop("forward", dev.oosax) # dev.oosax.find_reference() self.OMNYTools.printgreen("oosax is referenced") else: return if self.OMNYTools.yesno("find reference mark of oosay?", "y"): self._omny_find_reference_loop("forward", dev.oosay) # dev.oosay.find_reference() self.OMNYTools.printgreen("oosay is referenced") else: return if self.OMNYTools.yesno("find reference mark of osamy?", "y"): self._omny_find_reference_loop("reverse", dev.osamy) # dev.osamy.find_reference() self.OMNYTools.printgreen("osamy is referenced") else: return if self.OMNYTools.yesno("find reference mark of oosaz?", "y"): self._omny_find_reference_loop("forward", dev.oosaz) # dev.oosaz.find_reference() self.OMNYTools.printgreen("oosaz is referenced") else: return if self.OMNYTools.yesno( "find endswitch and reference mark of osamroy?", "y", autoconfirm=autoconfirm ): dev.osamroy.drive_axis_to_limit("reverse") self.OMNYTools.printgreen("osamroy is in limit") self._omny_find_reference_loop("reverse", dev.osamroy, auto_retry=autoretry) # dev.osamroy.find_reference() self.OMNYTools.printgreen("osamroy is referenced") else: return if self.OMNYTools.yesno("find reference mark of transfer Y?", autoconfirm=autoconfirm): self._omny_find_reference_loop("forward", dev.otransy, auto_retry=autoretry) # dev.otransy.find_reference() self.OMNYTools.printgreen("otransy is referenced") else: return if self.OMNYTools.yesno( "find endswitches and reference marks of tracking stage system?", "y", autoconfirm=autoconfirm, ): dev.otracky.drive_axis_to_limit("forward") self.OMNYTools.printgreen("otracky is in limit") self._omny_find_reference_loop("forward", dev.otracky, auto_retry=autoretry) # dev.otracky.find_reference() self.OMNYTools.printgreen("otracky is referenced") self._omny_find_reference_loop("forward", dev.otrackz, auto_retry=autoretry) # dev.otrackz.find_reference() self.OMNYTools.printgreen("otrackz is referenced") else: return # all three controllers dev.ofzpx.controller._ogalil_folerr_not_ignore() dev.osamx.controller._ogalil_folerr_not_ignore() dev.osamy.controller._ogalil_folerr_not_ignore() # adjust acceleration of tracking stages dev.osamy.controller.socket_put_confirmed("ACE=11264") dev.osamy.controller.socket_put_confirmed("ACB=11264") dev.osamy.controller.socket_put_confirmed("DCE=11264") dev.osamy.controller.socket_put_confirmed("DCB=11264") dev.osamy.controller.galil_show_all() self._initial_alignment() self._setalllimitsomny() def omny_init_smaract(self): # _smar_rt_set_max_closed_loop_frequency(0,0,3000) # _smar_rt_set_max_closed_loop_frequency(0,1,3000) # _smar_rt_set_max_closed_loop_frequency(0,2,3000) dev.oshield.controller.find_reference_mark(1, 0, 1000, 1) time.sleep(2) dev.oshield.controller.find_reference_mark(0, 0, 1000, 1) time.sleep(2) if not dev.otransy.get_motor_limit_switch()[1]: if self.OMNYTools.yesno("drive otransy stage to +Y limit?", "y"): dev.otransy.drive_axis_to_limit("forward") self.OMNYTools.printgreen("otransy reached limit") else: self.OMNYTools.printgreen("otransy is in upper limit, i.e. safe condition") if self.OMNYTools.yesno( "Problematic part: Can the OSA be moved to -z limit then -x limit. Risk of collition!" ): dev.oosaz.drive_axis_to_limit("forward") dev.oosax.drive_axis_to_limit("forward") else: raise OMNYInitError("Automatic init not possible.") dev.oshield.controller.find_reference_mark(2, 0, 1000, 1) time.sleep(2) dev.oshield.limits = [-14.5, 15.8] dev.ocsx.limits = [-2, 5.2] dev.ocsy.limits = [-1, 3] dev.oshield.controller.set_closed_loop_move_speed(0, 2) dev.oshield.controller.set_closed_loop_move_speed(1, 2) dev.oshield.controller.set_closed_loop_move_speed(2, 2) # enable sensor for repeatable open loop motion dev.oshield.controller.socket_put_and_receive("SSE1") if self.OMNYTools.yesno("Move CS out of the beam path?", "y"): umv(dev.ocsx, 5.2, dev.ocsy, 2) self.OMNYTools.printgreen("Finished initialization of OMNY smaract system") def _setalllimitsomny(self): # dev.rtx.limits = [-250, 250] # dev.rty.limits = [-250, 250] # dev.rtz.limits = [-250, 250] dev.osamx.limits = [-1, 1] dev.osamy.limits = [2.3000, 3.0000] dev.osamroy.limits = [-2.0000, 362.0000] dev.oeyex.limits = [-48.0000, 10.0000] dev.oeyez.limits = [-85.0000, 10.0000] dev.oeyey.limits = [-61.0000, 1.0000] dev.oosay.limits = [0.0800, 1.0000] dev.oosax.limits = [-3.0000, 3.7000] dev.oosaz.limits = [6.4000, 6.6000] dev.ofzpx.limits = [-2.0000, 2.0000] dev.ofzpy.limits = [-1.0000, 3.0000] dev.ofzpz.limits = [-2.5000, 60.0000] dev.otransx.limits = [-459.0000, 33.0000] dev.otransy.limits = [-41.0000, 1.0000] dev.otransz.limits = [-70.5000, 11.0000] dev.oparkz.limits = [-165.0000, 1.0000] dev.otracky.limits = [-3.0000, -7.0000] dev.otrackz.limits = [-2.0000, 1.0000] dev.ocsy.limits = [-2.0000, 2.0000] dev.ocsx.limits = [-2.0000, 2.0000] dev.oshield.limits = [-14.5000, 15.8000] def _initial_alignment(self): if self.OMNYTools.yesno("Start moving stages to default initial positions?"): self.OMNYTools.printgreen("Start moving stages...") else: self.OMNYTools.printgreen("Stopping.") return self.oeye_cam_in() umv(dev.oosay, 0.5) dev.oosay.limits = [0.4, 0.6] dev.osamy.limits = [-1, 3] umv(dev.osamy, 2.7, dev.ofzpx, -0.8, dev.ofzpy, 1.6) dev.osamy.limits = [2.5, 2.9] dev.ofzpx.limits = [-2, 0] dev.ofzpy.limits = [-0, 3] dev.ofzpz.limits = [-2.5, 60] if "rtx" in dev and dev.rtx.enabled: dev.rtx.controller.feedback_disable() self._oshield_ST_close() dev.otracky.limits = [-7, -3] dev.otrackz.limits = [-2, 1] otracky_start_pos = self.OMNYTools._get_user_param_safe("otracky", "start_pos") otrackz_start_pos = self.OMNYTools._get_user_param_safe("otrackz", "start_pos") umv(dev.otracky, otracky_start_pos, dev.otrackz, otrackz_start_pos) # adjust acceleration and speed of osamx dev.osamx.controller.socket_put_confirmed("SPA=100") dev.osamx.controller.socket_put_confirmed("ACA=1878") dev.osamx.controller.socket_put_confirmed("DCA=187") # adjust acceleration of tracking stages dev.osamy.controller.socket_put_confirmed("ACE=11264") dev.osamy.controller.socket_put_confirmed("DCE=11264") dev.osamy.controller.socket_put_confirmed("DCB=11264") def check_all_axes_of_OMNY_referenced(self) -> bool: if dev.ofzpx.controller.all_axes_referenced(): self.OMNYTools.printgreen("All axes of OMNY are referenced.") return True else: return False
[docs] class OMNY( OMNYInitStagesMixin, OMNYSampleTransferMixin, OMNYAlignmentMixin, OMNYOpticsMixin, cSAXSBeamlineChecks, OMNY_rt_client, OMNYGuiTools, ): def __init__(self, client): super().__init__() self.client = client self.device_manager = client.device_manager self.check_shutter = False self.check_light_available = False self.check_fofb = False self._check_msgs = [] self.tomo_id = -1 self.special_angles = [] self.special_angle_repeats = 20 self.special_angle_tolerance = 20 self._current_special_angles = [] self._beam_is_okay = True self._stop_beam_check_event = None self.beam_check_thread = None self.corr_pos_y = [] self.corr_angle_y = [] self.corr_pos_y_2 = [] self.corr_angle_y_2 = [] self.progress = {} self.progress["subtomo"] = 0 self.progress["subtomo_projection"] = 0 self.progress["subtomo_total_projections"] = 1 self.progress["projection"] = 0 self.progress["total_projections"] = 1 self.progress["angle"] = 0 self.progress["tomo_type"] = 0 self.OMNYTools = OMNYTools(self.client) OMNY_rt_client.__init__(self) self.align = XrayEyeAlign(self.client, self) OMNYGuiTools.__init__(self, self.client) def start_x_ray_eye_alignment(self): if self.OMNYTools.yesno( "Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y", ): self.align = XrayEyeAlign(self.client, self) try: self.align.align() except KeyboardInterrupt as exc: osamx_in = self._get_user_param_safe(dev.osamx, "in") if np.isclose(osamx_in, dev.osamx.readback.get(), 0.5): self.OMNYTools.printgreen("Stopping alignment. Returning to osamx in position.") self.feedback_disable() umv(dev.osamx, osamx_in) raise exc def xrayeye_update_frame(self): self.align.update_frame() def xrayeye_alignment_start(self): self.start_x_ray_eye_alignment() # def drive_axis_to_limit(self, device, direction): # axis_id = device._config["deviceConfig"].get("axis_Id") # axis_id_numeric = self.axis_id_to_numeric(axis_id) # device.controller.drive_axis_to_limit(axis_id_numeric, direction) def axis_id_to_numeric(self, axis_id) -> int: return ord(axis_id.lower()) - 97 def get_beamline_checks_enabled(self): print( f"Shutter: {self.check_shutter}\nFOFB: {self.check_fofb}\nLight available:" f" {self.check_light_available}" ) @property def beamline_checks_enabled(self): return { "shutter": self.check_shutter, "fofb": self.check_fofb, "light available": self.check_light_available, } @beamline_checks_enabled.setter def beamline_checks_enabled(self, val: bool): self.check_shutter = val self.check_light_available = val self.check_fofb = val self.get_beamline_checks_enabled()
[docs] def set_special_angles(self, angles: list, repeats: int = 20, tolerance: float = 0.5): """Set the special angles for a tomo Args: angles (list): List of special angles repeats (int, optional): Number of repeats at a special angle. Defaults to 20. tolerance (float, optional): Number of repeats at a special angle. Defaults to 0.5. """ self.special_angles = angles self.special_angle_repeats = repeats self.special_angle_tolerance = tolerance
[docs] def remove_special_angles(self): """Remove the special angles and set the number of repeats to 1""" self.special_angles = [] self.special_angle_repeats = 1
@property def near_field(self): val = self.client.get_global_var("near_field") if val is None: return False return val @near_field.setter def near_field(self, val: bool): self.client.set_global_var("near_field", val) @property def tomo_shellstep(self): val = self.client.get_global_var("tomo_shellstep") if val is None: return 1 return val @tomo_shellstep.setter def tomo_shellstep(self, val: float): self.client.set_global_var("tomo_shellstep", val) @property def tomo_countingtime(self): val = self.client.get_global_var("tomo_countingtime") if val is None: return 0.1 return val @tomo_countingtime.setter def tomo_countingtime(self, val: float): self.client.set_global_var("tomo_countingtime", val) @property def manual_shift_y(self): val = self.client.get_global_var("manual_shift_y") if val is None: return 0.0 return val @manual_shift_y.setter def manual_shift_y(self, val: float): self.client.set_global_var("manual_shift_y", val) @property def fovx(self): val = self.client.get_global_var("fovx") if val is None: return 20 return val @fovx.setter def fovx(self, val: float): if val > 200: raise ValueError("FOV cannot be larger than 200 um.") self.client.set_global_var("fovx", val) @property def fovy(self): val = self.client.get_global_var("fovy") if val is None: return 20 return val @fovy.setter def fovy(self, val: float): if val > 100: raise ValueError("FOV cannot be larger than 100 um.") self.client.set_global_var("fovy", val) @property def tomo_type(self): val = self.client.get_global_var("tomo_type") if val is None: return 1 return val @tomo_type.setter def tomo_type(self, val: float): if val == 1: # equally spaced tomography with 2 sub tomograms self.client.set_global_var("tomo_type", val) elif val == 2: # golden ratio tomography (sorted bunches) self.client.set_global_var("tomo_type", val) elif val == 3: # equally spaced tomography with starting angles shifted by golden ratio self.client.set_global_var("tomo_type", val) else: raise ValueError("Unknown tomo_type.") @property def corridor_size(self): val = self.client.get_global_var("corridor_size") if val is None: val = -1 return val @corridor_size.setter def corridor_size(self, val: float): self.client.set_global_var("corridor_size", val) @property def stitch_x(self): val = self.client.get_global_var("stitch_x") if val is None: return 0 return val @stitch_x.setter @typechecked def stitch_x(self, val: int): self.client.set_global_var("stitch_x", val) @property def stitch_y(self): val = self.client.get_global_var("stitch_y") if val is None: return 0 return val @stitch_y.setter @typechecked def stitch_y(self, val: int): self.client.set_global_var("stitch_y", val) @property def ptycho_reconstruct_foldername(self): val = self.client.get_global_var("ptycho_reconstruct_foldername") if val is None: return "ptycho_reconstruct" return val @ptycho_reconstruct_foldername.setter def ptycho_reconstruct_foldername(self, val: str): self.client.set_global_var("ptycho_reconstruct_foldername", val) @property def tomo_angle_stepsize(self): val = self.client.get_global_var("tomo_angle_stepsize") if val is None: return 10.0 return val @tomo_angle_stepsize.setter def tomo_angle_stepsize(self, val: float): self.client.set_global_var("tomo_angle_stepsize", val) @property def golden_max_number_of_projections(self): val = self.client.get_global_var("golden_max_number_of_projections") if val is None: return 1000.0 return val @golden_max_number_of_projections.setter def golden_max_number_of_projections(self, val: float): self.client.set_global_var("golden_max_number_of_projections", val) @property def tomo_stitch_overlap(self): val = self.client.get_global_var("tomo_stitch_overlap") if val is None: return 0.2 return val @tomo_stitch_overlap.setter def tomo_stitch_overlap(self, val: float): self.client.set_global_var("tomo_stitch_overlap", val) @property def golden_projections_at_0_deg_for_damage_estimation(self): val = self.client.get_global_var("golden_projections_at_0_deg_for_damage_estimation") if val is None: return 0 return val @golden_projections_at_0_deg_for_damage_estimation.setter def golden_projections_at_0_deg_for_damage_estimation(self, val: float): self.client.set_global_var("golden_projections_at_0_deg_for_damage_estimation", val) @property def golden_ratio_bunch_size(self): val = self.client.get_global_var("golden_ratio_bunch_size") if val is None: return 20 return val @golden_ratio_bunch_size.setter def golden_ratio_bunch_size(self, val: float): self.client.set_global_var("golden_ratio_bunch_size", val) @property def sample_name(self): return dev.omny_samples.get_sample_name_in_samplestage() def write_to_scilog(self, content, tags: list = None): try: if tags is not None: tags.append("BEC") else: tags = ["BEC"] msg = bec.logbook.LogbookMessage() msg.add_text(content).add_tag(tags) self.client.logbook.send_logbook_message(msg) except Exception: logger.warning("Failed to write to scilog.")
[docs] def tomo_alignment_scan(self): """ Performs a tomogram alignment scan. """ if self.get_alignment_offset(0) == (0, 0, 0): print("It appears that the xrayeye alignment was not performend or loaded. Aborting.") return dev = builtins.__dict__.get("dev") bec = builtins.__dict__.get("bec") tags = ["BEC_alignment_tomo", self.sample_name] self.write_to_scilog( f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags ) start_angle = 0 angle_end = start_angle + 180 for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True): successful = False error_caught = False if 0 <= angle < 180.05: print(f"Starting OMNY scan for angle {angle}") while not successful: self._start_beam_check() try: start_scan_number = bec.queue.next_scan_number self.tomo_scan_projection(angle) self.tomo_reconstruct() error_caught = False except AlarmBase as exc: if exc.alarm_type == "TimeoutError": bec.queue.request_queue_reset() time.sleep(2) error_caught = True else: raise exc if self._was_beam_okay() and not error_caught: successful = True else: self._wait_for_beamline_checks() end_scan_number = bec.queue.next_scan_number for scan_nr in range(start_scan_number, end_scan_number): self._write_tomo_scan_number(scan_nr, angle, 0) print("Alignment scan finished. Please run SPEC_ptycho_align and load the new fit.") umv(dev.osamroy, 0)
def _write_subtomo_to_scilog(self, subtomo_number): dev = builtins.__dict__.get("dev") bec = builtins.__dict__.get("bec") if self.tomo_id > 0: tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"] else: tags = ["BEC_subtomo", self.sample_name] self.write_to_scilog( f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.", tags, )
[docs] def sub_tomo_scan(self, subtomo_number, start_angle=None): """ Performs a sub tomogram scan. Args: subtomo_number (int): The sub tomogram number. start_angle (float, optional): The start angle of the scan. Defaults to None. """ self._write_subtomo_to_scilog(subtomo_number) if start_angle is None: if subtomo_number == 1: start_angle = 0 elif subtomo_number == 2: start_angle = self.tomo_angle_stepsize / 2.0 # _tomo_shift_angles (potential global variable) _tomo_shift_angles = 0 angle_end = start_angle + 180 angles = np.linspace( start_angle + _tomo_shift_angles, angle_end, num=int(180 / self.tomo_angle_stepsize) + 1, endpoint=True, ) # reverse even sub-tomogram if not (subtomo_number % 2): angles = np.flip(angles) for angle in angles: self.progress["subtomo"] = subtomo_number self.progress["subtomo_projection"] = np.where(angles == angle)[0][0] self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize self.progress["projection"] = (subtomo_number - 1) * self.progress[ "subtomo_total_projections" ] + self.progress["subtomo_projection"] self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 2 self.progress["angle"] = angle self._tomo_scan_at_angle(angle, subtomo_number)
def _tomo_scan_at_angle(self, angle, subtomo_number): successful = False error_caught = False if 0 <= angle < 180.05: print(f"Starting OMNY scan for angle {angle} in subtomo {subtomo_number}") self._print_progress() self.omnygui_show_progress() while not successful: self._start_beam_check() if not self.special_angles: self._current_special_angles = [] if self._current_special_angles: next_special_angle = self._current_special_angles[0] if np.isclose(angle, next_special_angle, atol=0.5): self._current_special_angles.pop(0) num_repeats = self.special_angle_repeats else: num_repeats = 1 try: start_scan_number = bec.queue.next_scan_number for i in range(num_repeats): self._at_each_angle(angle) error_caught = False except AlarmBase as exc: if exc.alarm_type == "TimeoutError": bec.queue.request_queue_reset() time.sleep(2) error_caught = True else: raise exc if self._was_beam_okay() and not error_caught: successful = True else: self._wait_for_beamline_checks() end_scan_number = bec.queue.next_scan_number for scan_nr in range(start_scan_number, end_scan_number): self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
[docs] def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None): """start a tomo scan""" bec = builtins.__dict__.get("bec") scans = builtins.__dict__.get("scans") self._current_special_angles = self.special_angles.copy() # a new tomo scan was started if ( (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None) or (self.tomo_type == 2 and projection_number == None) or (self.tomo_type == 3 and projection_number == None) ): # pylint: disable=undefined-variable if bec.active_account != "": self.tomo_id = self.add_sample_database( self.sample_name, str(datetime.date.today()), bec.active_account.decode(), bec.queue.next_scan_number, "OMNY", "test additional info", "BEC", ) self.write_pdf_report() else: self.tomo_id = 0 with scans.dataset_id_on_hold: if self.tomo_type == 1: # 2 equally spaced sub-tomograms self.progress["tomo_type"] = "Equally spaced sub-tomograms" for ii in range(subtomo_start, 3): self.sub_tomo_scan(ii, start_angle=start_angle) start_angle = None elif self.tomo_type == 2: # Golden ratio tomography previous_subtomo_number = -1 if projection_number == None: ii = 0 else: ii = projection_number while True: angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1) if previous_subtomo_number != subtomo_number: self._write_subtomo_to_scilog(subtomo_number) if ( subtomo_number % 2 == 1 and ii > 10 and self.golden_projections_at_0_deg_for_damage_estimation == 1 ): self._tomo_scan_at_angle(0, subtomo_number) previous_subtomo_number = subtomo_number self.progress["tomo_type"] = "Golden ratio tomography" self.progress["subtomo"] = subtomo_number self.progress["projection"] = ii self.progress["angle"] = angle if self.golden_ratio_bunch_size > 0: self.progress["subtomo_total_projections"] = self.golden_ratio_bunch_size self.progress["subtomo_projection"] = ( ii - (subtomo_number - 1) * self.golden_ratio_bunch_size ) else: self.progress["subtomo_total_projections"] = 0 self.progress["subtomo_projection"] = 0 if self.golden_max_number_of_projections > 0: self.progress["total_projections"] = self.golden_max_number_of_projections else: self.progress["total_projections"] = 0 self._tomo_scan_at_angle(angle, subtomo_number) ii += 1 if ( ii > self.golden_max_number_of_projections and self.golden_max_number_of_projections > 0 ): print( f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections" ) break elif self.tomo_type == 3: # Equally spaced tomography, golden ratio starting angle previous_subtomo_number = -1 if projection_number == None: ii = 0 else: ii = projection_number while True: angle, subtomo_number = self._golden_equally_spaced( ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0 ) if previous_subtomo_number != subtomo_number: self._write_subtomo_to_scilog(subtomo_number) if ( subtomo_number % 2 == 1 and ii > 10 and self.golden_projections_at_0_deg_for_damage_estimation == 1 ): self._tomo_scan_at_angle(0, subtomo_number) previous_subtomo_number = subtomo_number self.progress["tomo_type"] = ( "Equally spaced tomography, golden ratio starting angle" ) self.progress["subtomo"] = subtomo_number self.progress["projection"] = ii self.progress["angle"] = angle self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize self.progress["subtomo_projection"] = ( ii - (subtomo_number - 1) * self.progress["subtomo_total_projections"] ) if self.golden_max_number_of_projections > 0: self.progress["total_projections"] = self.golden_max_number_of_projections else: self.progress["total_projections"] = 0 self._tomo_scan_at_angle(angle, subtomo_number) ii += 1 if ( ii > self.golden_max_number_of_projections and self.golden_max_number_of_projections > 0 ): print( f"Golden ratio tomography stopped automatically after the requested {self.golden_max_number_of_projections} projections" ) break else: raise OMNYError("undefined tomo type")
def _print_progress(self): print("\x1b[95mProgress report:") print(f"Tomo type: ....................... {self.progress['tomo_type']}") print(f"Projection: ...................... {self.progress['projection']:.0f}") print(f"Total projections expected ....... {self.progress['total_projections']}") print(f"Angle: ........................... {self.progress['angle']}") print(f"Current subtomo: ................. {self.progress['subtomo']}") print(f"Current projection within subtomo: {self.progress['subtomo_projection']}") print( f"Total projections per subtomo: ... {self.progress['subtomo_total_projections']}\x1b[0m" )
[docs] def add_sample_database( self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user ): """Add a sample to the omny sample database. This also retrieves the tomo id.""" subprocess.run( f"wget --user=omny --password=samples -q -O /tmp/currsamplesnr.txt 'https://omny.web.psi.ch/samples/newmeasurement.php?sample={samplename}&date={date}&eaccount={eaccount}&scannr={scan_number}&setup={setup}&additional={sample_additional_info}&user={user}'", shell=True, ) with open("/tmp/currsamplesnr.txt") as tomo_number_file: tomo_number = int(tomo_number_file.read()) return tomo_number
def _at_each_angle(self, angle: float) -> None: if "omny_at_each_angle" in builtins.__dict__: # pylint: disable=undefined-variable omny_at_each_angle(self, angle) return self.tomo_scan_projection(angle) self.tomo_reconstruct() def _golden(self, ii, howmany_sorted, maxangle, reverse=False): """returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number""" golden = [] # occupy array with the range of golden angles for iji in range( (ii - (ii % howmany_sorted)), (ii - (ii % howmany_sorted)) + howmany_sorted, 1 ): golden.append( ((iji * maxangle * (1 + pow(5, 0.5)) / 2) * 1000 % (maxangle * 1000)) / 1000 ) golden.sort() subtomo_number = int(ii / howmany_sorted) + 1 if reverse and not subtomo_number % 2: golden.reverse() return (golden[ii % howmany_sorted], subtomo_number) def _golden_equally_spaced( self, ii, number_of_projections_per_subtomo, maxangle, reverse, verbose ): """returns angles for equally spaced tomography with starting angles of sub tomograms shifted according to golden ratio""" """ii is projection number starting at 1, reverse will execute the even sub tomograms in reverse direction""" # ii is projection number starting at 1 angular_step = maxangle / number_of_projections_per_subtomo subtomo_number = int(((ii - 1) * angular_step) / maxangle) + 1 start_angle = self._golden(subtomo_number - 1, 1, angular_step) projection_number_of_subtomo = ( ii - (subtomo_number - 1) * number_of_projections_per_subtomo ) - 1 if reverse: if subtomo_number % 2: angle = start_angle[0] + projection_number_of_subtomo * angular_step else: angle = ( start_angle[0] + (number_of_projections_per_subtomo - 1) * angular_step - projection_number_of_subtomo * angular_step ) else: angle = start_angle[0] + projection_number_of_subtomo * angular_step if verbose: print( f"Equally spaced golden ratio tomography.\nAngular step: {angular_step}\nSubtomo Number: {subtomo_number}\nAngle: {angle}" ) return angle, subtomo_number
[docs] def tomo_reconstruct(self, base_path="~/Data10/specES1"): """write the tomo reconstruct file for the reconstruction queue""" bec = builtins.__dict__.get("bec") base_path = os.path.expanduser(base_path) ptycho_queue_path = Path(os.path.join(base_path, self.ptycho_reconstruct_foldername)) ptycho_queue_path.mkdir(parents=True, exist_ok=True) # pylint: disable=undefined-variable last_scan_number = bec.queue.next_scan_number - 1 ptycho_queue_file = os.path.abspath( os.path.join(ptycho_queue_path, f"scan_{last_scan_number:05d}.dat") ) with open(ptycho_queue_file, "w") as queue_file: scans = " ".join([str(scan) for scan in self._current_scan_list]) queue_file.write(f"p.scan_number {scans}\n") queue_file.write("p.check_nextscan_started 1\n")
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: tomo_scan_numbers_file = os.path.expanduser( "~/Data10/specES1/dat-files/tomography_scannumbers.txt" ) with open(tomo_scan_numbers_file, "a+") as out_file: # pylint: disable=undefined-variable out_file.write( f"{scan_number} {angle} {dev.osamroy.read()['osamroy']['value']:.3f} {self.tomo_id} {subtomo_number} {0} {self.sample_name}\n" ) def tomo_scan_projection(self, angle: float): scans = builtins.__dict__.get("scans") # additional_correction = self.align.compute_additional_correction(angle) # additional_correction_2 = self.align.compute_additional_correction_2(angle) # correction_xeye_mu = self.align.lamni_compute_additional_correction_xeye_mu(angle) offsets = self.get_alignment_offset(angle) sum_offset_x = offsets[0] - self.compute_additional_correction_x(angle) sum_offset_y = ( offsets[1] - self.compute_additional_correction_y(angle) - self.compute_additional_correction_y_2(angle) ) sum_offset_z = offsets[2] self._current_scan_list = [] for stitch_x in range(-self.stitch_x, self.stitch_x + 1): for stitch_y in range(-self.stitch_y, self.stitch_y + 1): # pylint: disable=undefined-variable corridor_size = self.corridor_size if self.corridor_size > 0 else None self._current_scan_list.append(bec.queue.next_scan_number) cenx = sum_offset_x + stitch_x * (self.fovx - self.tomo_stitch_overlap) ceny = sum_offset_y + stitch_y * (self.fovy - self.tomo_stitch_overlap) logger.info( f"scans.omny_fermat_scan(fovx={self.fovx}, fovy={self.fovy}," f" step={self.tomo_shellstep}, cenx={cenx}, ceny={ceny}," f" zshift={sum_offset_z}, angle={angle}," f" exp_time={self.tomo_countingtime}, corridor_size={corridor_size})" ) log_message = ( f"{str(datetime.datetime.now())}: omny scan projection at angle {angle}, scan" f" number {bec.queue.next_scan_number}.\n" ) # self.write_to_scilog(log_message, ["BEC_scans", self.sample_name]) scans.omny_fermat_scan( fovx=self.fovx, fovy=self.fovy, step=self.tomo_shellstep, cenx=cenx, ceny=ceny, zshift=sum_offset_z, angle=angle, exp_time=self.tomo_countingtime, corridor_size=corridor_size, )
[docs] def tomo_parameters(self): """print and update the tomo parameters""" print("Current settings:") print(f"Counting time <ctime> = {self.tomo_countingtime} s") print(f"Stepsize microns <step> = {self.tomo_shellstep}") print(f"FOV (200/100) <microns> = {self.fovx}, {self.fovy}") print(f"Stitching number x,y = {self.stitch_x}, {self.stitch_y}") print(f"Stitching overlap = {self.tomo_stitch_overlap}") print(f"Reconstruction queue name = {self.ptycho_reconstruct_foldername}") print(f" _manual_shift_y <mm> = {self.manual_shift_y}") print("") if self.tomo_type == 1: print("\x1b[1mTomo type 1:\x1b[0m 2 equally spaced sub-tomograms") print(f"Total number of projections: {180/self.tomo_angle_stepsize*2}") print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") if self.tomo_type == 2: print("\x1b[1mTomo type 2:\x1b[0m Golden ratio tomography") print(f"Sorted in bunches of: {self.golden_ratio_bunch_size}") if self.golden_max_number_of_projections > 0: print(f"ending after {self.golden_max_number_of_projections} projections") else: print("ending by manual interruption") if self.golden_projections_at_0_deg_for_damage_estimation == 1: print( "repeating prjections at 0 degrees at the beginning of every second subtomogram" ) if self.tomo_type == 3: print( "\x1b[1mTomo type 3:\x1b[0m Equally spaced tomography, golden ratio starting angle" ) print(f"Number of projections per sub-tomogram: {180/self.tomo_angle_stepsize}") print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees") if self.golden_max_number_of_projections > 0: print(f"ending after {self.golden_max_number_of_projections} projections") else: print("ending by manual interruption") if self.golden_projections_at_0_deg_for_damage_estimation == 1: print( "repeating prjections at 0 degrees at the beginning of every second subtomogram" ) print(f"\nSample name: {self.sample_name}\n") if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"): print("... excellent!") else: self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float) self.tomo_shellstep = self._get_val("<step size> um", self.tomo_shellstep, float) self.fovx = self._get_val("<FOV X (max 200)> um", self.fovx, float) self.fovy = self._get_val("<FOV Y (max 100)> um", self.fovy, float) self.stitch_x = self._get_val("<stitch X>", self.stitch_x, int) self.stitch_y = self._get_val("<stitch Y>", self.stitch_y, int) self.ptycho_reconstruct_foldername = self._get_val( "Reconstruction queue ", self.ptycho_reconstruct_foldername, str ) print("Tomography type:") print(" 1: 2 equally spaced sub-tomograms") print(" 2: Golden ratio tomography") print(" 3: Equally spaced tomography, golden ratio starting angle") self.tomo_type = self._get_val("Tomography type", self.tomo_type, int) if self.tomo_type == 1: tomo_numberofprojections = self._get_val( "Total number of projections", 180 / self.tomo_angle_stepsize * 2, int ) print(f"The angular step will be {180/tomo_numberofprojections}") self.tomo_angle_stepsize = 180 / tomo_numberofprojections * 2 print(f"The angular step in a subtomogram it will be {self.tomo_angle_stepsize}") if self.tomo_type == 2: code = self._get_val( "This mode causes significant wear in OMNY. Enter authorization code.", 0, str ) if code == "x12sa": self.golden_ratio_bunch_size = self._get_val( "Number of projections sorted per bunch (minimum 100)", self.golden_ratio_bunch_size, int, ) if self.golden_ratio_bunch_size < 100: print("Minimum of 100 selected.") self.golden_ratio_bunch_size = 100 self.golden_max_number_of_projections = self._get_val( "Stop after number of projections (zero for endless)", self.golden_max_number_of_projections, int, ) self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( "Repeat projections at 0 deg every second subtomo 1/0 ?", self.golden_projections_at_0_deg_for_damage_estimation, int, ) else: print( "Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected." ) self.tomo_type = 1 if self.tomo_type == 3: code = self._get_val( "This mode causes significant wear in OMNY. Enter authorization code.", 0, str ) if code == "x12sa": numprj = self._get_val( "Number of projections per sub-tomogram (minimum 100)", int(180 / self.tomo_angle_stepsize), int, ) if numprj < 100: numprj = 100 print("Minimum of 100 selected.") self.tomo_angle_stepsize = 180 / numprj self.golden_max_number_of_projections = self._get_val( "Stop after number of projections (zero for endless)", self.golden_max_number_of_projections, int, ) self.golden_projections_at_0_deg_for_damage_estimation = self._get_val( "Repeat projections at 0 deg every second subtomo", self.golden_projections_at_0_deg_for_damage_estimation, int, ) else: print( "Wrong authorization code. Tomo type 1 selected: 2 sub-tomograms selected." ) self.tomo_type = 1
@staticmethod def _get_val(msg: str, default_value, data_type): return data_type(input(f"{msg} ({default_value}): ") or default_value) def rt_off(self): dev.rtx.enabled = False dev.rty.enabled = False dev.rtz.enabled = False def rt_on(self): dev.rtx.enabled = True dev.rty.enabled = True dev.rtz.enabled = True if dev.rtx.enabled == True: print("rt is enabled") else: print("failed to enable rt")
[docs] def write_pdf_report(self): """create and write the pdf report with the current OMNY settings""" dev = builtins.__dict__.get("dev") # header = "" header = ( " \n" * 3 + " ,o888888o. ,8. ,8. b. 8 `8.`8888. ,8' \n" + " . 8888 `88. ,888. ,888. 888o. 8 `8.`8888. ,8' \n" + ",8 8888 `8b .`8888. .`8888. Y88888o. 8 `8.`8888. ,8' \n" + "88 8888 `8b ,8.`8888. ,8.`8888. .`Y888888o. 8 `8.`8888.,8' \n" + "88 8888 88 ,8'8.`8888,8^8.`8888. 8o. `Y888888o. 8 `8.`88888' \n" + "88 8888 88 ,8' `8.`8888' `8.`8888. 8`Y8o. `Y88888o8 `8. 8888 \n" + "88 8888 ,8P ,8' `8.`88' `8.`8888. 8 `Y8o. `Y8888 `8 8888 \n" + "`8 8888 ,8P ,8' `8.`' `8.`8888. 8 `Y8o. `Y8 8 8888 \n" + " ` 8888 ,88' ,8' `8 `8.`8888. 8 `Y8o.` 8 8888 \n" + " `8888888P' ,8' ` `8.`8888. 8 `Yo 8 8888 \n" ) padding = 20 fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}" stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}" dataset_id = str(self.client.queue.next_dataset_number) content = [ f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n", f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n", f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n", f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n", f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n", f"{'Number of projections:':<{padding}}{int(180 / self.tomo_angle_stepsize * 8):>{padding}}\n", f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n", f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(180 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n", f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n", f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n", f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n", f"{'FOV:':<{padding}}{fovxy:>{padding}}\n", f"{'Stitching:':<{padding}}{stitching:>{padding}}\n", f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n", f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n", "Add current temperature and pressure status", ] content = "".join(content) user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf") with PDFWriter(user_target) as file: file.write(header) file.write(content) subprocess.run( "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True ) # status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True) msg = bec.logbook.LogbookMessage() logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png") msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag( ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name] ) self.client.logbook.send_logbook_message(msg)
def vcs_show_all(): dev.omny_vcs.show_all() def vcs_valves_in_measurement_position(): dev.omny_vcs.valves_in_measurement_position() def temperatures_show_all(): dev.omny_temperatures.show_all() def dewar_show_all(): dev.omny_dewar.show_all()
if __name__ == "__main__": import builtins from bec_ipython_client import BECIPythonClient bec = BECIPythonClient() bec.start() scans = bec.scans dev = bec.device_manager.devices builtins.__dict__["scans"] = scans builtins.__dict__["dev"] = dev builtins.__dict__["bec"] = bec builtins.__dict__["umv"] = umv omny = OMNY(bec) omny.start_x_ray_eye_alignment()