Source code for csaxs_bec.bec_ipython_client.plugins.LamNI.alignment

import builtins
import time
from collections import defaultdict

import numpy as np
from bec_lib import bec_logger
from typeguard import typechecked

from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen

logger = bec_logger.logger

if builtins.__dict__.get("bec") is not None:
    bec = builtins.__dict__.get("bec")
    dev = builtins.__dict__.get("dev")
    umv = builtins.__dict__.get("umv")
    umvr = builtins.__dict__.get("umvr")


[docs] class XrayEyeAlign: # pixel calibration, multiply to get mm # PIXEL_CALIBRATION = 0.2/209 #.2 with binning PIXEL_CALIBRATION = 0.2 / 218 # .2 with binning def __init__(self, client, lamni) -> None: self.client = client self.lamni = lamni self.device_manager = client.device_manager self.scans = client.scans self.alignment_values = defaultdict(list) self._reset_init_values() self.corr_pos_x = [] self.corr_pos_y = [] self.corr_angle = [] self.corr_pos_x_2 = [] self.corr_pos_y_2 = [] self.corr_angle_2 = [] # ------------------------------------------------------------------ # Correction reset # ------------------------------------------------------------------ def reset_correction(self): self.corr_pos_x = [] self.corr_pos_y = [] self.corr_angle = [] def reset_correction_2(self): self.corr_pos_x_2 = [] self.corr_pos_y_2 = [] self.corr_angle_2 = [] def reset_xray_eye_correction(self): self.client.delete_global_var("tomo_fit_xray_eye") # ------------------------------------------------------------------ # FOV offset properties # ------------------------------------------------------------------ @property def tomo_fovx_offset(self): val = self.client.get_global_var("tomo_fov_offset") if val is None: return 0.0 return val[0] / 1000 @tomo_fovx_offset.setter @typechecked def tomo_fovx_offset(self, val: float): val_old = self.client.get_global_var("tomo_fov_offset") if val_old is None: val_old = [0.0, 0.0] self.client.set_global_var("tomo_fov_offset", [val * 1000, val_old[1]]) @property def tomo_fovy_offset(self): val = self.client.get_global_var("tomo_fov_offset") if val is None: return 0.0 return val[1] / 1000 @tomo_fovy_offset.setter @typechecked def tomo_fovy_offset(self, val: float): val_old = self.client.get_global_var("tomo_fov_offset") if val_old is None: val_old = [0.0, 0.0] self.client.set_global_var("tomo_fov_offset", [val_old[0], val * 1000]) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _reset_init_values(self): self.shift_xy = [0, 0] self._xray_fov_xy = [0, 0] def _disable_rt_feedback(self): self.device_manager.devices.rtx.controller.feedback_disable() def _enable_rt_feedback(self): self.device_manager.devices.rtx.controller.feedback_enable_with_reset() def tomo_rotate(self, val: float): # pylint: disable=undefined-variable umv(self.device_manager.devices.lsamrot, val) def get_tomo_angle(self): return self.device_manager.devices.lsamrot.readback.read()["lsamrot"]["value"] # ------------------------------------------------------------------ # X-ray eye camera control # ------------------------------------------------------------------ def save_frame(self): epics_put("XOMNYI-XEYE-SAVFRAME:0", 1) def update_frame(self): epics_put("XOMNYI-XEYE-ACQDONE:0", 0) # start live epics_put("XOMNYI-XEYE-ACQ:0", 1) # wait for start live while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: time.sleep(0.5) print("waiting for live view to start...") fshopen() epics_put("XOMNYI-XEYE-ACQDONE:0", 0) while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: print("waiting for new frame...") time.sleep(0.5) time.sleep(0.5) # stop live view epics_put("XOMNYI-XEYE-ACQ:0", 0) time.sleep(1) print("got new frame") def update_fov(self, k: int): self._xray_fov_xy[0] = max(epics_get(f"XOMNYI-XEYE-XWIDTH_X:{k}"), self._xray_fov_xy[0]) self._xray_fov_xy[1] = max(0, self._xray_fov_xy[0]) @property def movement_buttons_enabled(self): return [epics_get("XOMNYI-XEYE-ENAMVX:0"), epics_get("XOMNYI-XEYE-ENAMVY:0")] @movement_buttons_enabled.setter def movement_buttons_enabled(self, enabled: bool): enabled = int(enabled) epics_put("XOMNYI-XEYE-ENAMVX:0", enabled) epics_put("XOMNYI-XEYE-ENAMVY:0", enabled) def send_message(self, msg: str): epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg) # ------------------------------------------------------------------ # Alignment procedure # ------------------------------------------------------------------ def align(self): self._reset_init_values() self.reset_correction() self.reset_correction_2() self._disable_rt_feedback() epics_put("XOMNYI-XEYE-PIXELSIZE:0", self.PIXEL_CALIBRATION) self._enable_rt_feedback() self.movement_buttons_enabled = False epics_put("XOMNYI-XEYE-ACQ:0", 0) self.send_message("please wait...") epics_put("XOMNYI-XEYE-SAMPLENAME:0.DESC", "Let us LAMNI...") self._disable_rt_feedback() k = 0 self.lamni.lfzp_in() self.update_frame() self.movement_buttons_enabled = False epics_put("XOMNYI-XEYE-SUBMIT:0", 0) epics_put("XOMNYI-XEYE-STEP:0", 0) self.send_message("Submit center value of FZP.") while True: if epics_get("XOMNYI-XEYE-SUBMIT:0") == 1: val_x = epics_get(f"XOMNYI-XEYE-XVAL_X:{k}") * self.PIXEL_CALIBRATION # in mm val_y = epics_get(f"XOMNYI-XEYE-YVAL_Y:{k}") * self.PIXEL_CALIBRATION # in mm self.alignment_values[k] = [val_x, val_y] print( f"Clicked position {k}: x {self.alignment_values[k][0]}, y" f" {self.alignment_values[k][1]}" ) if k == 0: # received center value of FZP self.send_message("please wait ...") self.lamni.loptics_out() epics_put("XOMNYI-XEYE-SUBMIT:0", -1) self.movement_buttons_enabled = False print("Moving sample in, FZP out") self._disable_rt_feedback() time.sleep(0.3) self._enable_rt_feedback() time.sleep(0.3) self.update_frame() self.send_message("Go and find the sample") epics_put("XOMNYI-XEYE-SUBMIT:0", 0) self.movement_buttons_enabled = True elif k == 1: # received sample center value at samrot 0 msg = ( f"Base shift values from movement are x {self.shift_xy[0]}, y" f" {self.shift_xy[1]}" ) print(msg) logger.info(msg) self.shift_xy[0] += ( self.alignment_values[0][0] - self.alignment_values[1][0] ) * 1000 self.shift_xy[1] += ( self.alignment_values[1][1] - self.alignment_values[0][1] ) * 1000 print( "Base shift values from movement and clicked position are x" f" {self.shift_xy[0]}, y {self.shift_xy[1]}" ) self.scans.lamni_move_to_scan_center( self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() ).wait() self.send_message("please wait ...") epics_put("XOMNYI-XEYE-SUBMIT:0", -1) self.movement_buttons_enabled = False time.sleep(1) self.scans.lamni_move_to_scan_center( self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() ).wait() epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) self.update_frame() self.send_message("Submit sample center and FOV (0 deg)") epics_put("XOMNYI-XEYE-SUBMIT:0", 0) self.update_fov(k) elif 1 < k < 10: # received sample center value at samrot 0 ... 315 self.send_message("please wait ...") epics_put("XOMNYI-XEYE-SUBMIT:0", -1) self._disable_rt_feedback() self.tomo_rotate((k - 1) * 45 - 45 / 2) self.scans.lamni_move_to_scan_center( self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() ).wait() self._disable_rt_feedback() self.tomo_rotate((k - 1) * 45) self.scans.lamni_move_to_scan_center( self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() ).wait() epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) self.update_frame() self.send_message("Submit sample center") epics_put("XOMNYI-XEYE-SUBMIT:0", 0) epics_put("XOMNYI-XEYE-ENAMVX:0", 1) self.update_fov(k) elif k == 10: # received sample center value at samrot 270, done self.send_message("done...") epics_put("XOMNYI-XEYE-SUBMIT:0", -1) self.movement_buttons_enabled = False self.update_fov(k) break k += 1 epics_put("XOMNYI-XEYE-STEP:0", k) if k < 2: _xrayeyalignmvx = epics_get("XOMNYI-XEYE-MVX:0") _xrayeyalignmvy = epics_get("XOMNYI-XEYE-MVY:0") if _xrayeyalignmvx != 0 or _xrayeyalignmvy != 0: self.shift_xy[0] = self.shift_xy[0] + _xrayeyalignmvx self.shift_xy[1] = self.shift_xy[1] + _xrayeyalignmvy self.scans.lamni_move_to_scan_center( self.shift_xy[0] / 1000, self.shift_xy[1] / 1000, self.get_tomo_angle() ).wait() print( f"Current center horizontal {self.shift_xy[0]} vertical {self.shift_xy[1]}" ) epics_put("XOMNYI-XEYE-MVY:0", 0) epics_put("XOMNYI-XEYE-MVX:0", 0) self.update_frame() time.sleep(0.2) self.write_output() fovx = self._xray_fov_xy[0] * self.PIXEL_CALIBRATION * 1000 / 2 fovy = self._xray_fov_xy[1] * self.PIXEL_CALIBRATION * 1000 / 2 print( f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns," f" fovy = {fovy:.0f} microns" ) print("Use matlab routine to fit the current alignment...") print( "This additional shift is applied to the base shift values\n which are x" f" {self.shift_xy[0]}, y {self.shift_xy[1]}" ) self._disable_rt_feedback() self.tomo_rotate(0) print( "\n\nNEXT LOAD ALIGNMENT PARAMETERS\nby running" " lamni.align.read_xray_eye_correction()\n" ) self.client.set_global_var("tomo_fov_offset", self.shift_xy) # ------------------------------------------------------------------ # Alignment output # ------------------------------------------------------------------ def write_output(self): import os with open( os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues"), "w" ) as alignment_values_file: alignment_values_file.write("angle\thorizontal\tvertical\n") for k in range(2, 11): fovx_offset = (self.alignment_values[0][0] - self.alignment_values[k][0]) * 1000 fovy_offset = (self.alignment_values[k][1] - self.alignment_values[0][1]) * 1000 print( f"Writing to file new alignment: number {k}, value x {fovx_offset}, y" f" {fovy_offset}" ) alignment_values_file.write(f"{(k-2)*45}\t{fovx_offset}\t{fovy_offset}\n") # ------------------------------------------------------------------ # X-ray eye sinusoidal correction (loaded from MATLAB fit files) # ------------------------------------------------------------------ def read_xray_eye_correction(self, dir_path=None): import os if dir_path is None: dir_path = os.path.expanduser("~/Data10/specES1/internal/") tomo_fit_xray_eye = np.zeros((2, 3)) for i, axis in enumerate(["x", "y"]): for j, coeff in enumerate(["A", "B", "C"]): with open(os.path.join(dir_path, f"ptychotomoalign_{coeff}{axis}.txt"), "r") as f: tomo_fit_xray_eye[i][j] = f.readline() self.client.set_global_var("tomo_fit_xray_eye", tomo_fit_xray_eye.tolist()) # x amp, phase, offset, y amp, phase, offset # 0 0 0 1 0 2 1 0 1 1 1 2 print("New alignment parameters loaded from X-ray eye") print( f"X Amplitude {tomo_fit_xray_eye[0][0]}, " f"X Phase {tomo_fit_xray_eye[0][1]}, " f"X Offset {tomo_fit_xray_eye[0][2]}, " f"Y Amplitude {tomo_fit_xray_eye[1][0]}, " f"Y Phase {tomo_fit_xray_eye[1][1]}, " f"Y Offset {tomo_fit_xray_eye[1][2]}" )
[docs] def lamni_compute_additional_correction_xeye_mu(self, angle): """Compute sinusoidal correction from the X-ray eye fit for the given angle.""" tomo_fit_xray_eye = self.client.get_global_var("tomo_fit_xray_eye") if tomo_fit_xray_eye is None: print("Not applying any additional correction. No x-ray eye data available.\n") return (0, 0) # x amp, phase, offset, y amp, phase, offset # 0 0 0 1 0 2 1 0 1 1 1 2 correction_x = ( tomo_fit_xray_eye[0][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[0][1]) + tomo_fit_xray_eye[0][2] ) / 1000 correction_y = ( tomo_fit_xray_eye[1][0] * np.sin(np.radians(angle) + tomo_fit_xray_eye[1][1]) + tomo_fit_xray_eye[1][2] ) / 1000 print(f"Xeye correction x {correction_x}, y {correction_y} for angle {angle}\n") return (correction_x, correction_y)
# ------------------------------------------------------------------ # Additional lookup-table corrections (iteration 1 and 2) # ------------------------------------------------------------------ def read_additional_correction(self, correction_file: str): self.corr_pos_x, self.corr_pos_y, self.corr_angle = self._read_correction_file_xy( correction_file ) def read_additional_correction_2(self, correction_file: str): self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2 = self._read_correction_file_xy( correction_file ) def _read_correction_file_xy(self, correction_file: str): """Parse a correction file that contains corr_pos_x, corr_pos_y and corr_angle entries.""" with open(correction_file, "r") as f: num_elements = f.readline() int_num_elements = int(num_elements.split(" ")[2]) print(int_num_elements) corr_pos_x = [] corr_pos_y = [] corr_angle = [] for j in range(0, int_num_elements * 3): line = f.readline() value = line.split(" ")[2] name = line.split(" ")[0].split("[")[0] if name == "corr_pos_x": corr_pos_x.append(float(value) / 1000) elif name == "corr_pos_y": corr_pos_y.append(float(value) / 1000) elif name == "corr_angle": corr_angle.append(float(value)) return corr_pos_x, corr_pos_y, corr_angle def compute_additional_correction(self, angle): return self._compute_correction_xy( angle, self.corr_pos_x, self.corr_pos_y, self.corr_angle, label="1" ) def compute_additional_correction_2(self, angle): return self._compute_correction_xy( angle, self.corr_pos_x_2, self.corr_pos_y_2, self.corr_angle_2, label="2" ) def _compute_correction_xy(self, angle, corr_pos_x, corr_pos_y, corr_angle, label=""): """Find the correction for the closest angle in the lookup table.""" if not corr_pos_x: print(f"Not applying additional correction {label}. No data available.\n") return (0, 0) shift_x = corr_pos_x[0] shift_y = corr_pos_y[0] angledelta = np.fabs(corr_angle[0] - angle) for j in range(1, len(corr_pos_x)): newangledelta = np.fabs(corr_angle[j] - angle) if newangledelta < angledelta: shift_x = corr_pos_x[j] shift_y = corr_pos_y[j] angledelta = newangledelta if shift_x == 0 and angle < corr_angle[0]: shift_x = corr_pos_x[0] shift_y = corr_pos_y[0] if shift_x == 0 and angle > corr_angle[-1]: shift_x = corr_pos_x[-1] shift_y = corr_pos_y[-1] print(f"Additional correction shifts {label}: {shift_x} {shift_y}") return (shift_x, shift_y)