Source code for csaxs_bec.devices.omny.flomni_sample_storage

import time

from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import PrettyTable


[docs] class FlomniSampleStorageError(Exception): pass
[docs] class FlomniSampleStorage(Device): USER_ACCESS = [ "is_sample_slot_used", "is_sample_in_gripper", "set_sample_slot", "unset_sample_slot", "set_sample_in_gripper", "unset_sample_in_gripper", "show_all", ] SUB_VALUE = "value" _default_sub = SUB_VALUE sample_placed = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {"auto_monitor": True}) for i in range(21) } sample_placed = Dcpt(sample_placed) sample_names = { f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True, "auto_monitor": True}) for i in range(21) } sample_names = Dcpt(sample_names) sample_in_gripper = Cpt( EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET", auto_monitor=True ) sample_in_gripper_name = Cpt( EpicsSignal, name="sample_in_gripper_name", read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC", string=True, auto_monitor=True ) def __init__(self, prefix="", *, name, **kwargs): super().__init__(prefix, name=name, **kwargs) self.sample_placed.sample1.subscribe(self._emit_value) def _emit_value(self, **kwargs): timestamp = kwargs.pop("timestamp", time.time()) self.wait_for_connection() self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) def set_sample_slot(self, slot_nr: int, name: str) -> bool: if slot_nr > 20: raise FlomniSampleStorageError(f"Invalid slot number {slot_nr}.") getattr(self.sample_placed, f"sample{slot_nr}").set(1) getattr(self.sample_names, f"sample{slot_nr}").set(name) def unset_sample_slot(self, slot_nr: int) -> bool: if slot_nr > 20: raise FlomniSampleStorageError(f"Invalid slot number {slot_nr}.") getattr(self.sample_placed, f"sample{slot_nr}").set(0) getattr(self.sample_names, f"sample{slot_nr}").set("-") def set_sample_in_gripper(self, name: str) -> bool: self.sample_in_gripper.set(1) self.sample_in_gripper_name.set(name) def unset_sample_in_gripper(self) -> bool: self.sample_in_gripper.set(0) self.sample_in_gripper_name.set("-") def is_sample_slot_used(self, slot_nr: int) -> bool: val = getattr(self.sample_placed, f"sample{slot_nr}").get() return bool(val) def is_sample_in_gripper(self) -> bool: val = self.sample_in_gripper.get() return bool(val) def get_sample_name(self, slot_nr) -> str: val = getattr(self.sample_names, f"sample{slot_nr}").get() return str(val) def show_all(self): t = PrettyTable() t.title = "flOMNI sample storage" field_names = [""] field_names.extend(str(ax) for ax in range(1, 11)) for ct in range(0, 2): t.field_names = field_names row = ["Container " + str(ct)] row.extend( "used" if self.is_sample_slot_used(slot_nr) else "free" for slot_nr in range((ct * 10) + 1, (ct * 10) + 11) ) t.add_row(row) print(t) print("\n\nFollowing samples are currently loaded:\n") for ct in range(1, 21): if self.is_sample_slot_used(ct): print(f" Position {ct:2.0f}: {self.get_sample_name(ct)}") if self.sample_in_gripper.get(): print(f"\n Gripper: {self.sample_in_gripper_name.get()}\n") else: print(f"\n Gripper: no sample\n") if self.is_sample_slot_used(0): print(f" flOMNI stage: {self.get_sample_name(0)}\n") else: print(f" flOMNI stage: no sample\n")