"""
DDG1 delay generator
This module implements the DDG1 delay generator logic for the CSAXS beamline.
The attached PDF trigger_scheme_ddg1_ddg2.pdf provides a more detailed overview of
the trigger scheme. If the logic changes in the future, it is highly recommended to
update the PDF accordingly.
The DDG1 is the main trigger delay generator for the CSAXS beamline. It will
receive either a soft trigger from BEC (depending on the scan type) or a hardware trigger
from a beamline device (e.g. the Galil stages). It is responsible for opening the shutter
and sending a trigger to the Delay Generator CSAXS (DDG2), which in turn will
send the trigger to the detectors. DDG1 will not be witout burst mode, but rather in standard
mode creating delays for the channels ab, cd, ef, gh.
A brief summary of the DDG1 logic:
DELAY PAIRS:
- DelayPair ab is connected to the EXT/EN of DDG2.
- DelayPair cd is connected to the SHUTTER.
- DelayPair ef is connected to an OR gate together with the detector
PULSE train for the MCS card. The MCS card needs one extra pulse to forward points.
DELAY CHANNELS:
- a = t0 + 2ms (2ms delay to allow the shutter to open)
- b = a + 1us (short pulse)
- c = t0
- d = a + exp_time * burst_count
- e = d
- f = e + 1us (short pulse to OR gate for MCS triggering)
"""
from __future__ import annotations
import threading
import time
import traceback
from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger
from bec_server.scan_server.scans.scan_base import ScanInfo as ScanServerScanInfo
from ophyd import Component as Cpt
from ophyd import EpicsSignalRO, Kind
from ophyd_devices import CompareStatus, DeviceStatus, StatusBase, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
CHANNELREFERENCE,
OUTPUTPOLARITY,
PROC_EVENT_MODE,
STATUSBITS,
TRIGGERSOURCE,
AllChannelNames,
ChannelConfig,
DelayGeneratorCSAXS,
LiteralChannels,
StatusBitsCompareStatus,
)
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import ACQUIRING, suppress_mca_callbacks
from csaxs_bec.devices.utils.utils import fetch_scan_info
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import DeviceManagerBase, ScanInfo
from csaxs_bec.devices.epics.mcs_card.mcs_card_csaxs import MCSCardCSAXS
logger = bec_logger.logger
########################
## DEFAULT SETTINGS ####
########################
# NOTE Default channel configuration for all channels of the DDG1 delay generator
# This can be adapted as needed, or fine-tuned per channel. On every reload of the
# device configuration in BEC, these values will be set into the DDG1 device.
_DEFAULT_CHANNEL_CONFIG: ChannelConfig = {
"amplitude": 4.5,
"offset": 0.0,
"polarity": OUTPUTPOLARITY.POSITIVE,
"mode": "ttl",
}
# NOTE Here you can adapt the default IO configuration for all channels of the DDG1
# Currently, all channels are set to the same default configuration `_DEFAULT_CHANNEL_CONFIG`.
DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = {
"t0": _DEFAULT_CHANNEL_CONFIG,
"ab": _DEFAULT_CHANNEL_CONFIG,
"cd": _DEFAULT_CHANNEL_CONFIG,
"ef": _DEFAULT_CHANNEL_CONFIG,
"gh": _DEFAULT_CHANNEL_CONFIG,
}
DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT
# NOTE Default readout times for each channel, can be adapted as needed.
# These values are relevant to calculate proper widths of the timing signals.
# They also define a minimum exposure time that can be used as they are subtracted
# as dead times from the exposure time.
DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz
# NOTE Default channel references for each channel of the DDG1 delay generator.
# This needs to be carefully adjusted to match the envisioned trigger scheme.
# If the trigger scheme changes, adapt the values here together with the README and
# PDF `trigger_scheme_ddg1_ddg2.pdf`.
DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [
("A", CHANNELREFERENCE.T0), # T0 + 2ms delay
("B", CHANNELREFERENCE.A),
("C", CHANNELREFERENCE.T0), # T0
("D", CHANNELREFERENCE.C),
("E", CHANNELREFERENCE.B), # B One extra pulse once shutter closes for MCS
("F", CHANNELREFERENCE.E), # E + 1mu s
("G", CHANNELREFERENCE.T0),
("H", CHANNELREFERENCE.G),
]
###############################
## DDG1 IMPLEMENTATION ########
###############################
[docs]
class DDG1(PSIDeviceBase, DelayGeneratorCSAXS):
"""
Implementation of the DelayGenerator DDG1 for the cSAXS beamline. It is the main trigger
source for the cSAXS beamline, and will be triggered by BEC through a software trigger or
by a hardware trigger from a beamline device (e.g. Galil stages). Specific implementation
of the cabling logic expected for this device are described in the module README, the attached
PDF 'trigger_scheme_ddg1_ddg2.pdf' and the module docstring.
The IOC prefix is 'X12SA-CPCL-DDG1:'.
Args:
name (str): Name of the device.
prefix (str, optional): EPICS prefix for the device. Defaults to ''.
scan_info (ScanInfo | None, optional): Scan info object. Defaults to None.
device_manager (DeviceManagerBase | None, optional): Device manager. Defaults to None.
"""
USER_ACCESS = [
"keep_shutter_open_during_scan",
"set_trigger",
"get_shutter_to_open_delay",
"prepare_mcs_on_trigger",
]
# TODO Consider using the 'fsh' device instead.
fast_shutter_readback = Cpt(
EpicsSignalRO,
read_pv="X12SA-ES1-TTL:INP_01",
add_prefix=("",), # Add this to prevent the prefix to be added to the signal
kind=Kind.omitted,
auto_monitor=True,
)
# The shutter control PV can indicate if the shutter is requested to be kept open. If that
# is the case, we can not use the signal shutter_readback signal to check if the delay cycle
# finishes but have to use the polling of the event status register to check if the burst finished.
fast_shutter_control = Cpt(
EpicsSignalRO,
read_pv="X12SA-ES1-TTL:OUT_01",
add_prefix=("",), # Add this to prevent the prefix to be added to the signal
kind=Kind.omitted,
auto_monitor=True,
)
def __init__(
self,
name: str,
prefix: str = "",
scan_info: ScanInfo | None = None,
device_manager: DeviceManagerBase | None = None,
**kwargs,
):
super().__init__(
name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs
)
self._shutter_to_open_delay = 2e-3
self.device_manager = device_manager
self._poll_thread = threading.Thread(target=self._poll_event_status, daemon=True)
self._poll_thread_run_event = threading.Event()
self._poll_thread_poll_loop_done = threading.Event()
self._poll_thread_kill_event = threading.Event()
self._poll_thread.start()
self.scan_parameters: ScanServerScanInfo | None = None
# pylint: disable=attribute-defined-outside-init
[docs]
def on_connected(self) -> None:
"""
This method is called after the device is initialized and all signals are connected. This happens
when a device configuration is loaded in BEC.
It sets the default values for this device - intended to overwrite everything to a usable default state.
For this purpose, we use the DEFAULT SETTINGS defined at the top of this module.
To ensure that this process is robust, we follow these steps:
- First, we stop any ongoing burst mode operation.
- Then, we set the DEFAULT_IO_CONFIG for each channel, the trigger source to DEFAULT_TRIGGER_SOURCE,
and the channel references to DEFAULT_REFERENCES.
- We set the state proc_status to be event based. This triggers readouts of the EventStatusLI bit
based on events. This was empirically found to be a stable solution in combination with the poll
loop of the state.
- Finally, we set the burst delay to 0, to set it to be of no delay.
"""
# NOTE First we make sure that there is nothing running on the DDG. This seems to
# help to tackle that the DDG occasionally freezes during the first scan
# after reconnecting to it. Do not remove.
self.stop_ddg()
# NOTE Setting DEFAULT configurations for IO config, trigger config and references.
# The three dictionaries above 'DEFAULT_IO_CONFIG', 'DEFAULT_TRIGGER_SOURCE' and
# 'DEFAULT_REFERNCES' should be used to adapt configurations if needed.
for channel, config in DEFAULT_IO_CONFIG.items():
self.set_io_values(channel, **config)
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
self.set_references_for_channels(DEFAULT_REFERENCES)
# NOTE Set state proc_status to be event based. This triggers readouts of the EventStatusLI bit
# based on events. This was empirically found to be a stable solution in combination with the poll
# loop of the state.
self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT)
# NOTE Burst delay should be set to 0, don't remove as this will not be checked
# Also set the burst count to 1 to only have a single pulse for DDG1.
# As the IOC may be out of sync with the HW, we make sure that we set the default parameters
# in the IOC to the expected values. In the past, we've experienced that IOC and HW can go out
# of sync.
self.burst_delay.put(1)
time.sleep(0.02) # Give HW time to process
self.burst_delay.put(0)
time.sleep(0.02)
self.burst_count.put(2)
time.sleep(0.02)
self.burst_count.put(1)
time.sleep(0.02)
self.burst_mode.put(1)
time.sleep(0.02)
self.burst_mode.put(0)
time.sleep(0.02)
[docs]
def get_shutter_to_open_delay(self) -> float:
"""Get the current delay that is set to open the shutter before the exposure time."""
return self._shutter_to_open_delay
[docs]
def keep_shutter_open_during_scan(self, open: True) -> None:
"""
Method to configure the delay generator for keeping the shutter open during a scans.
This means that the additional delay to open the shutter needs to be removed (2e-3)
from the timing of the signals.
Args:
open (bool): If True, the shutter will be kept open during the scan.
If False, the shutter will be opened and closed for each trigger cycle.
"""
if open is True:
self._shutter_to_open_delay = 0
else:
self._shutter_to_open_delay = 2e-3
[docs]
def on_stage(self) -> None:
"""
This method is called in preparation for a scan. All information about the upcoming
scan is available in self.scan_info.msg at this point. We use this information to
configure the DDG1 for the upcoming scan.
The DDG is operated in burst mode for the scan, but with only a single burst pulse.
THe length of the pulse is set to the expected exposure time for a single trigger,
which includes any burst acquisitions if frames_per_trigger > 1.
The logic is as follows:
- We check if any default burst parameters need to be set, and set them if needed.
- We calculate the burst pulse width based on the exposure time and frames_per_trigger.
- We set the burst_period and the shutter signal (delay pairs cd) to be
exposure_time * frames_per_trigger + 3ms (2ms for shutter to open, 1ms to close).
- We set the delay pairs ab to be 2ms delayed (to allow the shutter to open) with a width of 1us to trigger DDG2.
- We set the delay pairs ef to be triggered after the shutter closes with a width of 1us to trigger the MCS card.
- Finally, we add a short sleep to ensure that the IOC and DDG HW process the values properly.
"""
logger.info(f"DDG {self.name} on_stage called.")
self.scan_parameters = fetch_scan_info(self.scan_info)
start_time = time.time()
########################################
### Burst mode settings ################
########################################
# NOTE We check here if the delay generator is not in burst mode. We check these values
# and set them to the requried values if they differ from the expected ones.
# This has been found empirically to improve stability and avoid HW getting stuck in triggering cycles.
if self.burst_mode.get() == 0:
self.burst_mode.put(1)
if self.burst_delay.get() != 0:
self.burst_delay.put(0)
if self.burst_count.get() != 1:
self.burst_count.put(1)
#####################################
## Setup trigger source if needed ###
#####################################
# NOTE Some scans may change the trigger source to an external trigger,
# so we will make sure that the default trigger source is set for the DDG1
# before each scan. If a scan requires a different trigger source, i.e.
# external triggers then the scan should implement this change after the
# on_stage method was called.
if self.trigger_source.get() != DEFAULT_TRIGGER_SOURCE:
self.set_trigger(DEFAULT_TRIGGER_SOURCE)
#########################################
### Setup timing for burst and delays ###
#########################################
frames_per_trigger = self.scan_parameters.frames_per_trigger
exp_time = self.scan_parameters.exp_time
# Burst Period DDG1
# Set burst_period to shutter width
# c/t0 + self._shutter_to_open_delay + exp_time * burst_count
# SHUTTER WIDTH timing consists of the delay for the shutter to open
# + the exposure time * frames per trigger
shutter_width = self._shutter_to_open_delay + exp_time * frames_per_trigger
# TOTAL EXPOSURE accounts for the shutter to open AND close. In addition, we add
# a short additional delay of 3e-6 to allow for the extra trigger through 'ef'
# (delay of 1e-6, width of 1e-6)
total_exposure_time = 2 * self._shutter_to_open_delay + exp_time * frames_per_trigger + 3e-6
if self.burst_period.get() != shutter_width:
# The burst_period has to be slightly longer
self.burst_period.put(total_exposure_time)
# Trigger DDG2
# a = t0 + 2ms, b = a + 1us
# a has reference to t0, b has reference to a
# AB is delayed by the shutter opening time, and the falling edge indicates the shutter has
# fully closed, it has to be considered as the blocking signal for the next acquisition to start.
# PS: + 3e-6
self.set_delay_pairs(channel="ab", delay=self._shutter_to_open_delay, width=shutter_width)
# Trigger shutter
# d = c/t0 + self._shutter_to_open_delay + exp_time * burst_count + 1ms
# c has reference to t0, d has reference to c
# Shutter opens without delay at t0, closes after exp_time * burst_count + 2ms (self._shutter_to_open_delay)
self.set_delay_pairs(channel="cd", delay=0, width=shutter_width)
self.set_delay_pairs(
channel="gh",
delay=self._shutter_to_open_delay,
width=(shutter_width - self._shutter_to_open_delay),
)
# Trigger extra pulse for MCS OR gate
# f = e + 1us
# e has refernce to d, f has reference to e
if self.scan_parameters.scan_type == "hardware_triggered":
self.set_delay_pairs(channel="ef", delay=0, width=0)
else:
self.set_delay_pairs(channel="ef", delay=1e-6, width=1e-6)
# NOTE Add additional sleep to make sure that the IOC and DDG HW process the values properly
# This value has been choosen empirically after testing with the HW. It's
# also just called once per scan and has been found to improve stability of the HW.
time.sleep(0.2)
logger.info(f"DDG {self.name} on_stage completed in {time.time() - start_time:.3f}s.")
[docs]
def prepare_mcs_on_trigger(self) -> CompareStatus:
"""
This method is used by the DDG1 on_trigger method to prepare the MCS card for the next trigger.
It checks that the MCS card is properly prepared before BEC sends a software trigger to the DDG1,
which is needed for step scans.
It relies on the MCS card implementation and needs to be adapted if the MCS card logic changes.
"""
mcs_name = "mcs"
if self.device_manager is None or not hasattr(self.device_manager, "devices"):
raise ValueError("Device manager is not properly initialized.")
mcs = self.device_manager.devices.get(mcs_name, None)
if mcs is None:
raise ValueError(f"MCS card {mcs_name} not found in device manager.")
# NOTE First we wait that the MCS card is not acquiring. We add here a timeout of 5s to avoid
# a deadlock in case the MCS card is stuck for some reason. This should not happen normally.
status = CompareStatus(mcs.acquiring, ACQUIRING.DONE)
self.cancel_on_stop(status)
status.wait(timeout=5)
# NOTE Clear the '_omit_mca_callbacks' flag. This makes sure that data received from the mca1...mca3
# counters are forwarded to BEC. Once the flag is set, we create a TransitionStatus DONE->ACQUIRING
# and start the acquisition through erase_start.put(1). Finally, we wait for the card to go to ACQUIRING state.
status_acquiring = CompareStatus(mcs.acquiring, ACQUIRING.ACQUIRING)
with suppress_mca_callbacks(mcs):
mcs.erase_start.put(1)
mcs._omit_mca_callbacks.clear() # pylint: disable=protected-access
self.cancel_on_stop(status_acquiring)
return status_acquiring
def _poll_event_status(self) -> None:
"""
Polling loop to retrieve the event status register of the delay generator DDG1.
This method runs in a background thread and the polling is controlled through the
'_poll_thread_run_event' and '_poll_thread_kill_event'. Polling should only become
active when a software trigger was sent in BEC and we are waiting for the burst to complete.
"""
# Main loop of the polling thread. As long as the kill event is not set, the loop continues.
while not self._poll_thread_kill_event.is_set():
# NOTE Main wait event for the polling thread. If the _poll_thread_run_event is not set,
# The thread will wait here. This event is used to start/stop polling from outside the thread,
# as used in on_trigger and on_stop. Please make sure to set this event also when the thread
# should be killed as its otherwise stuck inside the wait.
self._poll_thread_run_event.wait()
# NOTE Set the event to indicate that we are currently still in the poll_loop. This is needed
# as we have to use sleeps of 20ms within the poll loop. These sleeps were empirically detetermined
# to ensure that no state changes are missed. However, these sleeps have the side effect that
# setting the '_poll_thread_run_event' may not immediately stop the polling. Therefore, we need the
# '_poll_thread_poll_loop_done' event to indicate that polling has finished. If this logic is changed,
# it requires careful testing as failure rates can be in the 1 out of 500 events rate, which are still
# not acceptable for operation. The current implementation has been tested with failure rates smaller then
# ~ 1:100000 if failures happened at all.
self._poll_thread_poll_loop_done.clear()
while (
self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set()
):
try:
self._poll_loop()
except Exception: # pylint: disable=broad-except
content = traceback.format_exc()
logger.error(
f"Exception in polling loop thread, polling continues...\n Error content:\n{content}"
)
# NOTE Set the _poll_thread_poll_loop_done event to indicate that we are done polling. Do not remove!
self._poll_thread_poll_loop_done.set()
def _poll_loop(self) -> None:
"""
This method is the actual poll loop to update the event status from the satus register
of the delay generator DDG1.
It follows a procedure that was established empirically after extended testing with the HW.
Any adaptations to this logic need to be carefully tested to avoid that the HW becomes unstable.
NOTE: Sleeps are important in this logic, and should not be removed or optimized without extensive testing.
20ms has been found to be the minimum sleep time that proofed to be stable in operation.
The logic is as follows:
- Set the 'proc_status' to 1 with use_complete=True to trigger an event based readout of the EventStatusLI.
- Sleep 20ms to give the device time to process the command.
- Check if the kill event or run event are cleared, and exit the loop if so.
- Read the EventStatusLI channel to update the event status.
- Check again if the kill event or run event are cleared, and exit the loop if so.
Please note that any important changes of the status register reading will trigger callbacks
if attached to the event status signal. These callbacks hold the logic to resolve status objects
when waiting for specific events (e.g. end of burst).
"""
self.state.proc_status.put(1, use_complete=True)
# NOTE: Important sleep that has been empirically determined after testing for a long time
# Only remove if absolutely certain that the DDG logic of polling the EventStatusLI works without it.
time.sleep(0.02)
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
return
self.state.event_status.get(use_monitor=False)
if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set():
return
# NOTE: Again important sleep that has been empirically determined after testing for a long time
# Only remove if certain that logic can be replaced to not risk HW failures.
time.sleep(0.02)
def _start_polling(self) -> None:
"""Start the polling loop in the background thread."""
self._poll_thread_run_event.set()
def _stop_polling(self) -> None:
"""Stop the polling loop in the background thread."""
self._poll_thread_run_event.clear()
def _kill_poll_thread(self) -> None:
"""Kill the polling thread."""
self._poll_thread_kill_event.set()
self._stop_polling()
self._poll_thread.join(timeout=1)
if self._poll_thread.is_alive():
logger.warning("Polling thread did not stop gracefully.")
else:
logger.info("Polling thread stopped.")
def _prepare_trigger_status_event(
self, timeout: float | None = None
) -> StatusBitsCompareStatus:
"""
Method to prepare a status object that indicates the end of a burst cycle.
It also sets up a callback to cancel the polling of the event status register
if the status is cancelled externally (e.g. by stopping the device). In addition,
a timeout can either be specified, or is automatically calculated based on the
exposure time, frames_per_trigger and a default extra time of 5 seconds.
Args:
timeout (float | None, optional): Timeout for the status object. If None, a
default timeout based on exposure time and frames_per_trigger is used.
Returns:
StatusBitsCompareStatus:
"""
if timeout is None:
# Default timeout of 5 seconds + exposure time * frames_per_trigger
timeout = 5 + self.scan_parameters.exp_time * self.scan_parameters.frames_per_trigger
# Callback to cancel the status if the device is stopped
def cancel_cb(status: CompareStatus) -> None:
"""Callback to cancel the status if the device is stopped."""
logger.debug("DDG1 end of burst detected, stopping polling loop.")
if status.done:
self._stop_polling()
# Run false is important to ensure that the status is only checked on the next event status update
status = StatusBitsCompareStatus(
self.state.event_status, STATUSBITS.END_OF_BURST, timeout=timeout, run=False
)
status.add_callback(cancel_cb)
self.cancel_on_stop(status)
return status
[docs]
def on_trigger(self) -> DeviceStatus:
"""
This method is called from BEC as a software trigger. Here the logic is as follows:
We first check if the trigger_source is set to SINGLE_SHOT. Only then will we received,
otherwise we return a status object directly as the DDG is triggered by an external
source which will have to implement its own logic to wait for trigger signals to
be received.
I SINGLE_SHOT, the implementation here will send a software trigger. Now there are
two options to wait for the trigger (burst) cycle to be done. One is to rely on the
signal of the "mcs" card if it is present. However, this is only possible if the
scan_type is not "fly" as in fly scans the ef channel is not triggered to send the last
pulse to the card (but the card is finishing its acquisition in complete itself). Then
we rely on the polling of the event status register to check if the burst cycle is done.
It follows a specific procedure to ensure that the DDG1 and MCS card are properly handled
on a trigger event. The established logic is as follows:
- Stop polling the event status register to avoid that the polling loop is still active
before sending the software trigger. This needs to be done to avoid conflicts
in reading the event status register.
- Wait for the _poll_thread_poll_loop_done event to ensure that the polling loop is no
longer active. A timeout of 1s is plenty as sleeps of 20ms are used in the poll loop.
- Add an extra sleep of 20ms to make sure that the HW is again ready to process new commands.
This has been found empirically after long testing to improve stability.
- If the MCS card is present in the current session of BEC, prepare the card for the next trigger.
- Prepare a status StatusBitsCompareStatus that will be resolved once the burst is done.
- Start the polling loop again to monitor the event status register.
- Send the software trigger to the DDG1
- Return the status object to BEC which will automatically resolve once the status register has
the END_OF_BURST bit set. The callback of the status object will also stop the polling loop.
"""
self._stop_polling()
# NOTE If the trigger source is not SINGLE_SHOT, the DDG is triggered by an external source
# thus we can not expect that trigger signals are meant to be awaited for. In this case,
# we can directly return.
if self.trigger_source.get() != TRIGGERSOURCE.SINGLE_SHOT.value:
status = StatusBase(obj=self)
status.set_finished()
return status
# NOTE If the MCS card is present in the current session of BEC,
# we prepare the card for the next trigger. The procedure is implemented
# in the 'prepare_mcs_on_trigger' method. We will also use the mcs card
# to indicate when the burst cycle is done. If no mcs card is available
# the fallback is to use the polling of the DDG
mcs = self.device_manager.devices.get("mcs", None)
if (
mcs is None
or mcs.enabled is False
or self.scan_parameters.scan_type == "hardware_triggered"
):
self._poll_thread_poll_loop_done.wait(timeout=1)
logger.warning("Did not find mcs card with name 'mcs' in current session")
time.sleep(0.02)
# Shutter is kept open, we can only rely on the event status register
status = self._prepare_trigger_status_event()
# Start polling thread again to monitor event status
self._start_polling()
else:
start_time = time.time()
logger.debug(f"Preparing mcs card ")
status_mcs = self.prepare_mcs_on_trigger()
# NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash
# an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should
# be investigated why the EPICS interface is slow to respond.
try:
# current_time = time.time()
# while time.time() - current_time < 3 and mcs.acquiring.get() != ACQUIRING.ACQUIRING:
# time.sleep(0.1)
# logger.warning(
# f"MCS card is not in acquiring state, current state: {mcs.acquiring.get()}"
# )
# if mcs.acquiring.get() != ACQUIRING.ACQUIRING:
# logger.error(
# f"MCS card is not finishing after 3s, current state: {mcs.acquiring.get()}, waiting another 3s for its return to ACQUIRING state. If this happens regularly, please check the EPICS interface and the MCS card logic."
# )
if not status_mcs.done:
mcs.acquiring.get(use_monitor=False)
status_mcs.wait(timeout=3)
except Exception as exc:
if (
mcs.acquiring.get(use_monitor=False) != ACQUIRING.ACQUIRING
): # Get the current state without monitoring to avoid any side effects
logger.warning(
f"MCS did not go to Acquiring within 3s. Retrying erase_start {exc}"
)
raise exc
# mcs.erase_start.put(1)
# status_mcs.wait(timeout=3)
status = CompareStatus(mcs.acquiring, ACQUIRING.DONE)
logger.info(f"Finished preparing mcs card {time.time()-start_time}")
# Send trigger
self.trigger_shot.put(1, use_complete=True)
self.cancel_on_stop(status)
return status
[docs]
def on_stop(self) -> None:
"""Stop the delay generator HW and polling thread when the device is stopped."""
self.stop_ddg()
self._stop_polling()
[docs]
def on_destroy(self) -> None:
"""Clean up resources when the device is destroyed."""
self.stop_ddg()
self._kill_poll_thread()
if __name__ == "__main__":
ddg = DDG1(name="ddg1", prefix="X12SA-CPCL-DDG1:")
ddg.wait_for_connection(all_signals=True, timeout=30)
ddg.summary()