Source code for csaxs_bec.devices.epics.xbpms

import time

from ophyd import Component as Cpt
from ophyd import Device, EpicsSignalRO, Signal


[docs] class SumSignal(Signal): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._metadata.update(write_access=False)
[docs] def wait_for_connection(self, timeout=0): super().wait_for_connection(timeout) self._metadata.update(connected=True)
[docs] def get(self, **kwargs): self._metadata["timestamp"] = time.time() val1 = self.parent.current1.get() val2 = self.parent.current2.get() val3 = self.parent.current3.get() val4 = self.parent.current4.get() return val1 + val2 + val3 + val4
[docs] def describe(self): source = [ self.parent.current1.describe()[self.parent.current1.name]["source"], self.parent.current2.describe()[self.parent.current2.name]["source"], self.parent.current3.describe()[self.parent.current3.name]["source"], self.parent.current4.describe()[self.parent.current4.name]["source"], ] source = " / ".join(source) desc = { "shape": [], "dtype": "number", "source": f"PV: {source}", "units": "", "precision": ( self.parent.current1.precision if hasattr(self.parent.current1, "precision") else 0 ), } return desc
[docs] class DiffXYSignal(Signal): def __init__(self, sum1, sum2, *args, **kwargs): self.sum1 = sum1 self.sum2 = sum2 super().__init__(*args, **kwargs) self._metadata.update(write_access=False)
[docs] def wait_for_connection(self, timeout=0): super().wait_for_connection(timeout) self._metadata.update(connected=True)
[docs] def get(self, **kwargs): self._metadata["timestamp"] = time.time() summed_1 = 0 summed_2 = 0 for signal in self.sum1: summed_1 += getattr(self.parent, signal).get() for signal in self.sum2: summed_2 += getattr(self.parent, signal).get() _sum = summed_1 + summed_2 if _sum == 0: return 0.0 return (summed_1 - summed_2) / _sum
[docs] def describe(self): source = [ getattr(self.parent, signal).describe()[getattr(self.parent, signal).name]["source"] for signal in self.sum1 + self.sum2 ] source = " / ".join(source) desc = { "shape": [], "dtype": "number", "source": f"PV: {source}", "units": "", "precision": ( self.parent.current1.precision if hasattr(self.parent.current1, "precision") else 0 ), } return desc
[docs] class BPMDevice(Device): current1 = Cpt( EpicsSignalRO, ":Current1:MeanValue_RBV", kind="normal", doc="Current 1", auto_monitor=True ) current2 = Cpt( EpicsSignalRO, ":Current2:MeanValue_RBV", kind="normal", doc="Current 2", auto_monitor=True ) current3 = Cpt( EpicsSignalRO, ":Current3:MeanValue_RBV", kind="normal", doc="Current 3", auto_monitor=True ) current4 = Cpt( EpicsSignalRO, ":Current4:MeanValue_RBV", kind="normal", doc="Current 4", auto_monitor=True ) sum = Cpt(SumSignal, kind="hinted", doc="Sum of all currents") x = Cpt( DiffXYSignal, sum1=["current1", "current2"], sum2=["current3", "current4"], doc="X difference signal", ) y = Cpt( DiffXYSignal, sum1=["current1", "current3"], sum2=["current2", "current4"], doc="Y difference signal", ) diag = Cpt( DiffXYSignal, sum1=["current1", "current4"], sum2=["current2", "current3"], doc="Diagonal difference signal", ) def __init__(self, prefix="", *args, **kwargs): super().__init__(*args, prefix=prefix, **kwargs)
if __name__ == "__main__": dev = BPMDevice(name="bpm", prefix="X12SA-FE-XBPM1") dev.wait_for_connection() print(dev.read())