import time
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignalRO, Signal
[docs]
class SumSignal(Signal):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._metadata.update(write_access=False)
[docs]
def wait_for_connection(self, timeout=0):
super().wait_for_connection(timeout)
self._metadata.update(connected=True)
[docs]
def get(self, **kwargs):
self._metadata["timestamp"] = time.time()
val1 = self.parent.current1.get()
val2 = self.parent.current2.get()
val3 = self.parent.current3.get()
val4 = self.parent.current4.get()
return val1 + val2 + val3 + val4
[docs]
def describe(self):
source = [
self.parent.current1.describe()[self.parent.current1.name]["source"],
self.parent.current2.describe()[self.parent.current2.name]["source"],
self.parent.current3.describe()[self.parent.current3.name]["source"],
self.parent.current4.describe()[self.parent.current4.name]["source"],
]
source = " / ".join(source)
desc = {
"shape": [],
"dtype": "number",
"source": f"PV: {source}",
"units": "",
"precision": (
self.parent.current1.precision if hasattr(self.parent.current1, "precision") else 0
),
}
return desc
[docs]
class DiffXYSignal(Signal):
def __init__(self, sum1, sum2, *args, **kwargs):
self.sum1 = sum1
self.sum2 = sum2
super().__init__(*args, **kwargs)
self._metadata.update(write_access=False)
[docs]
def wait_for_connection(self, timeout=0):
super().wait_for_connection(timeout)
self._metadata.update(connected=True)
[docs]
def get(self, **kwargs):
self._metadata["timestamp"] = time.time()
summed_1 = 0
summed_2 = 0
for signal in self.sum1:
summed_1 += getattr(self.parent, signal).get()
for signal in self.sum2:
summed_2 += getattr(self.parent, signal).get()
_sum = summed_1 + summed_2
if _sum == 0:
return 0.0
return (summed_1 - summed_2) / _sum
[docs]
def describe(self):
source = [
getattr(self.parent, signal).describe()[getattr(self.parent, signal).name]["source"]
for signal in self.sum1 + self.sum2
]
source = " / ".join(source)
desc = {
"shape": [],
"dtype": "number",
"source": f"PV: {source}",
"units": "",
"precision": (
self.parent.current1.precision if hasattr(self.parent.current1, "precision") else 0
),
}
return desc
[docs]
class BPMDevice(Device):
current1 = Cpt(
EpicsSignalRO, ":Current1:MeanValue_RBV", kind="normal", doc="Current 1", auto_monitor=True
)
current2 = Cpt(
EpicsSignalRO, ":Current2:MeanValue_RBV", kind="normal", doc="Current 2", auto_monitor=True
)
current3 = Cpt(
EpicsSignalRO, ":Current3:MeanValue_RBV", kind="normal", doc="Current 3", auto_monitor=True
)
current4 = Cpt(
EpicsSignalRO, ":Current4:MeanValue_RBV", kind="normal", doc="Current 4", auto_monitor=True
)
sum = Cpt(SumSignal, kind="hinted", doc="Sum of all currents")
x = Cpt(
DiffXYSignal,
sum1=["current1", "current2"],
sum2=["current3", "current4"],
doc="X difference signal",
)
y = Cpt(
DiffXYSignal,
sum1=["current1", "current3"],
sum2=["current2", "current4"],
doc="Y difference signal",
)
diag = Cpt(
DiffXYSignal,
sum1=["current1", "current4"],
sum2=["current2", "current3"],
doc="Diagonal difference signal",
)
def __init__(self, prefix="", *args, **kwargs):
super().__init__(*args, prefix=prefix, **kwargs)
if __name__ == "__main__":
dev = BPMDevice(name="bpm", prefix="X12SA-FE-XBPM1")
dev.wait_for_connection()
print(dev.read())