Source code for csaxs_bec.devices.omny.omny_sample_storage

import time

from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import FRAME, PrettyTable


[docs] class OMNYSampleStorageError(Exception): pass
[docs] class OMNYSampleStorage(Device): USER_ACCESS = [ "is_sample_slot_used", "is_sample_in_gripper", "set_sample_slot", "unset_sample_slot", "set_sample_in_gripper", "unset_sample_in_gripper", "set_sample_in_samplestage", "unset_sample_in_samplestage", "get_sample_name_in_samplestage", "get_sample_name_in_gripper", "get_sample_name", "is_sample_in_samplestage", "set_shuttle_slot", "unset_shuttle_slot", "get_shuttle_name_slot", "is_shuttle_slot_used", "search_shuttle_in_slot", "show_all", "help", ] SUB_VALUE = "value" _default_sub = SUB_VALUE sample_shuttle_A_placed = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {"auto_monitor": True}) for i in range(1, 7) } sample_shuttle_A_placed = Dcpt(sample_shuttle_A_placed) sample_shuttle_B_placed = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {"auto_monitor": True}) for i in range(1, 7) } sample_shuttle_B_placed = Dcpt(sample_shuttle_B_placed) sample_shuttle_C_placed = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7) } sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed) sample_shuttle_C_placed = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7) } sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed) parking_placed = { f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {"auto_monitor": True}) for i in range(1, 7) } parking_placed = Dcpt(parking_placed) sample_placed = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {"auto_monitor": True}) for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101] } sample_placed = Dcpt(sample_placed) sample_shuttle_A_names = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True, "auto_monitor": True}) for i in range(1, 7) } sample_shuttle_A_names = Dcpt(sample_shuttle_A_names) sample_shuttle_B_names = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True, "auto_monitor": True}) for i in range(1, 7) } sample_shuttle_B_names = Dcpt(sample_shuttle_B_names) sample_shuttle_C_names = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True, "auto_monitor": True}) for i in range(1, 7) } sample_shuttle_C_names = Dcpt(sample_shuttle_C_names) parking_names = { f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True, "auto_monitor": True}) for i in range(1, 7) } parking_names = Dcpt(parking_names) sample_names = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True, "auto_monitor": True}) for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101] } sample_names = Dcpt(sample_names) sample_in_gripper = Cpt( EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL", auto_monitor=True ) sample_in_gripper_name = Cpt( EpicsSignal, name="sample_in_gripper_name", read_pv="XOMNY-SAMPLE_DB_omny:110.DESC", string=True, auto_monitor=True ) sample_in_samplestage = Cpt( EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL", auto_monitor=True ) sample_in_samplestage_name = Cpt( EpicsSignal, name="sample_in_samplestage_name", read_pv="XOMNY-SAMPLE_DB_omny:0.DESC", string=True, auto_monitor=True ) def __init__(self, prefix="", *, name, **kwargs): super().__init__(prefix, name=name, **kwargs) self.sample_shuttle_A_placed.sample1.subscribe(self._emit_value) def _emit_value(self, **kwargs): timestamp = kwargs.pop("timestamp", time.time()) self.wait_for_connection() self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) def set_sample_slot(self, container: str, slot_nr: int, name: str) -> bool: if slot_nr > 20: raise OMNYSampleStorageError(f"Invalid slot number {slot_nr}.") if container == "A": getattr(self.sample_shuttle_A_placed, f"sample{slot_nr}").set(1) getattr(self.sample_shuttle_A_names, f"sample{slot_nr}").set(name) elif container == "B": getattr(self.sample_shuttle_B_placed, f"sample{slot_nr}").set(1) getattr(self.sample_shuttle_B_names, f"sample{slot_nr}").set(name) elif container == "C": getattr(self.sample_shuttle_C_placed, f"sample{slot_nr}").set(1) getattr(self.sample_shuttle_C_names, f"sample{slot_nr}").set(name) elif container == "O": getattr(self.sample_placed, f"sample{slot_nr}").set(1) getattr(self.sample_names, f"sample{slot_nr}").set(name) def unset_sample_slot(self, container: str, slot_nr: int) -> bool: if slot_nr > 20: raise OMNYSampleStorageError(f"Invalid slot number {slot_nr}.") if container == "A": getattr(self.sample_shuttle_A_placed, f"sample{slot_nr}").set(0) getattr(self.sample_shuttle_A_names, f"sample{slot_nr}").set("-") elif container == "B": getattr(self.sample_shuttle_B_placed, f"sample{slot_nr}").set(0) getattr(self.sample_shuttle_B_names, f"sample{slot_nr}").set("-") elif container == "C": getattr(self.sample_shuttle_C_placed, f"sample{slot_nr}").set(0) getattr(self.sample_shuttle_C_names, f"sample{slot_nr}").set("-") elif container == "O": getattr(self.sample_placed, f"sample{slot_nr}").set(0) getattr(self.sample_names, f"sample{slot_nr}").set("-") def set_shuttle_slot(self, container: str, slot_nr: int) -> bool: if slot_nr > 6: raise OMNYSampleStorageError(f"Invalid slot number {slot_nr}.") getattr(self.parking_placed, f"parking{slot_nr}").set(1) getattr(self.parking_names, f"parking{slot_nr}").set(container) def unset_shuttle_slot(self, slot_nr: int) -> bool: if slot_nr > 6: raise OMNYSampleStorageError(f"Invalid slot number {slot_nr}.") getattr(self.parking_placed, f"parking{slot_nr}").set(0) getattr(self.parking_names, f"parking{slot_nr}").set("none") def set_sample_in_gripper(self, name: str) -> bool: self.sample_in_gripper.set(1) self.sample_in_gripper_name.set(name) def unset_sample_in_gripper(self) -> bool: self.sample_in_gripper.set(0) self.sample_in_gripper_name.set("-") def set_sample_in_samplestage(self, name: str) -> bool: self.sample_in_samplestage.set(1) self.sample_in_samplestage_name.set(name) def unset_sample_in_samplestage(self) -> bool: self.sample_in_samplestage.set(0) self.sample_in_samplestage_name.set("-") def is_sample_slot_used(self, container, slot_nr: int) -> bool: if container == "A": val = getattr(self.sample_shuttle_A_placed, f"sample{slot_nr}").get() if container == "B": val = getattr(self.sample_shuttle_B_placed, f"sample{slot_nr}").get() if container == "C": val = getattr(self.sample_shuttle_C_placed, f"sample{slot_nr}").get() elif container == "O": val = getattr(self.sample_placed, f"sample{slot_nr}").get() return bool(val) def is_shuttle_slot_used(self, slot_nr: int) -> bool: val = getattr(self.parking_placed, f"parking{slot_nr}").get() return bool(val) def is_sample_in_gripper(self) -> bool: val = self.sample_in_gripper.get() return bool(val) def is_sample_in_samplestage(self) -> bool: val = self.sample_in_samplestage.get() return bool(val) def get_sample_name(self, container, slot_nr) -> str: if container == "A": val = getattr(self.sample_shuttle_A_names, f"sample{slot_nr}").get() elif container == "B": val = getattr(self.sample_shuttle_B_names, f"sample{slot_nr}").get() elif container == "C": val = getattr(self.sample_shuttle_C_names, f"sample{slot_nr}").get() elif container == "O": val = getattr(self.sample_names, f"sample{slot_nr}").get() else: val = "unknown container" return str(val) def get_shuttle_name_slot(self, slot_nr: int) -> str: val = getattr(self.parking_names, f"parking{slot_nr}").get() return str(val) def get_sample_name_in_gripper(self) -> str: val = self.sample_in_gripper_name.get() return str(val) def get_sample_name_in_samplestage(self) -> str: val = self.sample_in_samplestage_name.get() return str(val) def search_shuttle_in_slot(self, shuttle: str) -> int: returnvalue = 0 for i in range(1, 7): if self.get_shuttle_name_slot(i) == shuttle: returnvalue = i return returnvalue def show_all(self): t = PrettyTable() red = "\x1b[91m" green = "\x1b[92m" white = "\x1b[0m" for ch in ["A", "B", "C"]: t.clear() shuttle_in_slot = self.search_shuttle_in_slot(ch) if shuttle_in_slot > 0: t.title = green + "Shuttle " + ch + " in OMNY slot " + str(shuttle_in_slot) + white else: t.title = red + "Shuttle " + ch + white field_names = [""] for ax in [1, 3, 5]: row = [] row.extend([self.get_sample_name(ch, ax)]) row.extend(str(ax)) row.extend(str(ax + 1)) row.extend([self.get_sample_name(ch, ax + 1)]) t.add_row(row) t.header = False t.vrules = FRAME print(t) if self.is_sample_in_samplestage(): print(f"\n\n Sample stage: {self.get_sample_name_in_samplestage()}") else: print(f"\n\n Sample stage: no sample") if self.is_sample_in_gripper(): print(f"\n Gripper: {self.get_sample_name_in_gripper()}\n") else: print(f"\n Gripper: no sample\n") t.clear() t.title = "Fixed positions in OMNY" for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]: row = [] row.extend([f"Position {i:3d}"]) if self.is_sample_slot_used("O", i): name = self.get_sample_name("O", i) row.extend([name]) else: row.extend(["free"]) t.add_row(row) print(t) print("Use dev.omny_samples.help() for assistance.") def help(self): print("Help for OMNY sample storage:") print(" To get an overview use dev.omny_samples.show_all()") print(" Modify a slot:") print(" dev.omny_samples.unset_sample_slot('system',position)") print(" dev.omny_samples.set_sample_slot('system',position,'name')") print(" system can be A, B, C, O") print(" dev.omny_samples.set_sample_in_gripper('name') / unset_sample_in_gripper()") print(" dev.omny_samples.set_sample_in_samplestage('name'), unset_sample_in_samplestage()") print(" dev.omny_samples.set_shuttle_slot(container, slot_nr) / unset_shuttle_slot(slot_nr)") print(" dev.omny_samples.set_shuttle_slot('A',2)") print(" omny.otransfer_help()")