"""
Module for the Galil RIO (RIO-471xx) controller interface. The controller is a compact PLC
with Ethernet. It has digital and analog I/O as well as counters and timers.
Link to the Galil RIO vendor page:
https://www.galil.com/plcs/remote-io/rio-471xx
This module provides the GalilRIOController for communication with the RIO controller
over TCP/IP. It also provides a device integration that interfaces to its
8 analog channels, and 16 digital output channels. Some PLCs may have 24 digital output channels,
which can be easily supported by changing the _NUM_DIGITAL_OUTPUT_CHANNELS variable.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Literal
from bec_lib.logger import bec_logger
from ophyd import DynamicDeviceComponent as DDC
from ophyd import Kind
from ophyd.utils import ReadOnlyError
from ophyd_devices import PSIDeviceBase
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO
from csaxs_bec.devices.omny.galil.galil_ophyd import (
GalilCommunicationError,
GalilSignalBase,
retry_once,
)
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
logger = bec_logger.logger
[docs]
class GalilRIOController(Controller):
"""Controller Class for Galil RIO controller communication."""
[docs]
@threadlocked
def socket_put(self, val: str) -> None:
"""Socker put method."""
self.sock.put(f"{val}\r".encode())
[docs]
@retry_once
def socket_put_confirmed(self, val: str) -> None:
"""Send message to controller and ensure that it is received by checking that the socket receives a colon.
Args:
val (str): Message that should be sent to the socket
Raises:
GalilCommunicationError: Raised if the return value is not a colon.
"""
return_val = self.socket_put_and_receive(val)
if return_val != ":":
raise GalilCommunicationError(
f"Expected return value of ':' but instead received {return_val}"
)
[docs]
class GalilRIOAnalogSignalRO(GalilSignalBase):
"""
Signal for reading analog input channels of the Galil RIO controller. This signal is read-only, so
the set method raises a ReadOnlyError. The get method retrieves the values of all analog
channels in a single socket command. The readback values of all channels are updated based
on the response, and subscriptions are run for all channels. Readings are cached as implemented
in the SocketSignal class, so that multiple reads of the same channel within an update cycle do
not result in multiple socket calls.
Args:
signal_name (str): The name of the signal, e.g. "ch0", "ch1", ..., "ch7"
channel (int): The channel number corresponding to the signal, e.g. 0 for "ch0", 1 for "ch1", ...
parent (GalilRIO): The parent device instance that this signal belongs to.
"""
_NUM_ANALOG_CHANNELS = 8
READBACK_TIMEOUT = 0.1 # time to wait in between two readback attemps in seconds, otherwise return cached value
def __init__(
self,
signal_name: str,
channel: int,
parent: GalilRIO,
readback_timeout: float = None,
**kwargs,
):
super().__init__(signal_name=signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
self._readback_timeout = (
readback_timeout if readback_timeout is not None else self.READBACK_TIMEOUT
)
self._metadata["write_access"] = False
self._last_readback = 0.0
[docs]
def get(self):
current_time = time.monotonic()
if current_time - self._last_readback > self._readback_timeout:
old_value = self._readback
self._last_readback = current_time # _socket_get may rely on this value to be set.
self._readback = self._socket_get()
self._run_subs(
sub_type=self.SUB_VALUE,
old_value=old_value,
value=self._readback,
timestamp=current_time,
)
return self._readback
def _socket_set(self, val):
"""Read-only signal, so set method raises an error."""
raise ReadOnlyError(f"Signal {self.name} is read-only.")
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = "MG@" + ", @".join([f"AN[{ii}]" for ii in range(self._NUM_ANALOG_CHANNELS)])
ret = self.controller.socket_put_and_receive(cmd)
values = [float(val) for val in ret.strip().split(" ")]
# Run updates for all channels. This also updates the _readback and metadata timestamp
# value of this channel.
self._update_all_channels(values)
return self._readback
# pylint: disable=protected-access
def _update_all_channels(self, values: list[float]) -> None:
"""
Method to receive a list of readback values for channels 0 to 7. Updates for each channel idx
are applied to the corresponding GalilRIOAnalogSignalRO signal with matching attr_name "ch{idx}".
We also update the _last_readback attribute of each of the signals, to avoid multiple socket calls,
but rather use the cached value of the combined reading for all channels.
Args:
values (list[float]): List of new readback values for all channels, where the
index corresponds to the channel number (0-7).
"""
updates: dict[str, tuple[float, float]] = {} # attr_name -> (new_val, old_val)
# Update all readbacks first
for walk in self.parent.walk_signals():
if isinstance(walk.item, GalilRIOAnalogSignalRO):
idx = int(walk.item.attr_name[-1])
if 0 <= idx < len(values):
old_val = walk.item._readback
new_val = values[idx]
walk.item._metadata["timestamp"] = self._last_readback
walk.item._last_readback = self._last_readback
walk.item._readback = new_val
if (
idx != self._channel
): # Only run subscriptions on other channels, not on itself
# as this is handled by the SocketSignal and we want to avoid running multiple
# subscriptions for the same channel update
updates[walk.item.attr_name] = (new_val, old_val)
else:
logger.warning(
f"Received {len(values)} values but found channel index {idx} in signal {walk.item.name}. Skipping update for this signal."
)
# Run subscriptions after all readbacks have been updated
# on all channels except the one that triggered the update
# TODO for now skip running subscribers, this should be re-implemented
# once we properly handle subscriptions from bec running "read"
for walk in self.parent.walk_signals():
if walk.item.attr_name in updates:
new_val, old_val = updates[walk.item.attr_name]
walk.item._run_subs(
sub_type=walk.item.SUB_VALUE,
old_value=old_val,
value=new_val,
timestamp=self._last_readback,
)
[docs]
class GalilRIODigitalOutSignal(GalilSignalBase):
"""Signal for controlling digital outputs of the Galil RIO controller."""
_NUM_DIGITAL_OUTPUT_CHANNELS = 16
def __init__(self, signal_name: str, channel: int, parent: GalilRIO, **kwargs):
super().__init__(signal_name, parent=parent, **kwargs)
self._channel = channel
self._metadata["connected"] = False
def _socket_get(self) -> float:
"""Get command for the readback signal"""
cmd = f"MG@OUT[{self._channel}]"
ret = self.controller.socket_put_and_receive(cmd)
self._readback = float(ret.strip())
return self._readback
def _socket_set(self, val: Literal[0, 1]) -> None:
"""Set command for the digital output signal. Value should be 0 or 1."""
if val not in (0, 1):
raise ValueError("Digital output value must be 0 or 1.")
cmd = f"SB{self._channel}" if val == 1 else f"CB{self._channel}"
self.controller.socket_put_confirmed(cmd)
def _create_analog_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of analog channel definitions for the DynamicDeviceComponent.
Args:
num_channels (int): The number of analog channels to create.
"""
an_channels = {}
for i in range(0, num_channels):
an_channels[f"ch{i}"] = (
GalilRIOAnalogSignalRO,
f"ch{i}",
{"kind": Kind.normal, "channel": i, "doc": f"Analog channel {i}."},
)
return an_channels
def _create_digital_output_channels(num_channels: int) -> dict[str, tuple]:
"""
Helper method to create a dictionary of digital output channel definitions for the DynamicDeviceComponent.
Args:
num_channels (int): The number of digital output channels to create.
"""
di_out_channels = {}
for i in range(0, num_channels):
di_out_channels[f"ch{i}"] = (
GalilRIODigitalOutSignal,
f"ch{i}",
{"kind": Kind.config, "channel": i, "doc": f"Digital output channel {i}."},
)
return di_out_channels
[docs]
class GalilRIO(PSIDeviceBase):
"""
Galil RIO controller integration with 16 digital output channels and 8 analog input channels.
The default port for the controller is 23.
Args:
host (str): Hostname or IP address of the Galil RIO controller.
port (int, optional): Port number for the TCP/IP connection. Defaults to 23.
socket_cls (type[SocketIO], optional): Socket class to use for communication. Defaults to SocketIO.
scan_info (ScanInfo, optional): ScanInfo object for the device.
device_manager (DeviceManagerDS): The device manager instance that manages this device.
**kwargs: Additional keyword arguments passed to the PSIDeviceBase constructor.
"""
SUB_CONNECTION_CHANGE = "connection_change"
#############################
### Analog input channels ###
#############################
analog_in = DDC(_create_analog_channels(GalilRIOAnalogSignalRO._NUM_ANALOG_CHANNELS))
digital_out = DDC(
_create_digital_output_channels(GalilRIODigitalOutSignal._NUM_DIGITAL_OUTPUT_CHANNELS)
)
def __init__(
self,
*,
name: str,
host: str,
device_manager: DeviceManagerDS,
port: int | None = None,
socket_cls: type[SocketIO] = SocketIO,
scan_info: ScanInfo | None = None,
**kwargs,
):
if port is None:
port = 23 # Default port for Galil RIO controller
self.controller = GalilRIOController(
name=f"GalilRIOController_{name}",
socket_cls=socket_cls,
socket_host=host,
socket_port=port,
device_manager=device_manager,
)
self._readback_metadata: dict[str, float] = {"last_readback": 0.0}
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
# pylint: disable=arguments-differ
[docs]
def wait_for_connection(self, timeout: float = 30.0) -> None:
"""Wait for the RIO controller to be connected within timeout period."""
self.controller.on(timeout=timeout)
[docs]
def destroy(self) -> None:
"""Make sure to turn off the controller socket on destroy."""
self.controller.off(update_config=False)
return super().destroy()
# pylint: disable=protected-access
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
if __name__ == "__main__":
HOST_NAME = "129.129.122.14"
from bec_server.device_server.tests.utils import DMMock
dm = DMMock()
rio = GalilRIO(name="rio", host=HOST_NAME, device_manager=dm)
rio.wait_for_connection(timeout=10)
print("Connected:", rio.an_ch1.read())
print("All channels:", rio.read())