csaxs_bec.devices.panda_box.panda_box.PandaBoxCSAXS#

class PandaBoxCSAXS(*, name: str, host: str, signal_alias: dict[str, str] | None = None, scan_info: ScanInfo | None = None, device_manager: DeviceManagerDS | None = None, **kwargs)[source]#

Bases: PandaBox

PandaBox integration for cSAXS. This class implements cSAXS specific logic for the PandaBox integration.

TODO: This logic is not yet mapped to any existing hardware. Adapt Docstring once the hardware is defined and integrated.

Initialize the PSI Device Base class.

Parameters:
  • name (str) – Name of the device

  • scan_info (ScanInfo) – The scan info to use.

Methods

add_data_callback

Register a data callback to be called whenever new data is received from the PandaBox.

add_instantiation_callback

Register a callback which will receive each OphydObject instance.

add_status_callback

This methods registers a status callback to the data receiving loop that will resolve if the PandaBox receives specific data events.

cancel_on_stop

Register a status object to be cancelled when the device is stopped.

check_value

Check if the value is valid for this object

clear_sub

Remove a subscription, given the original callback function

complete

Complete the device.

configure

Configure the device for something during a run

convert_frame_data

Convert the data from a FrameData object into a dictionary with expected OPHYD read format, e.g. signal {signal_name: {"value": [...]}}.

describe

Provide schema and meta-data for read().

describe_configuration

Provide schema & meta-data for read_configuration()

destroy

We append the cleanup of the data readout thread to the destroy method, and call it prior to the super().destroy() call.

get

Get the value of all components in the device

get_device_tuple

The device tuple type associated with an Device class

get_instantiated_signals

Yields all of the instantiated signals in a device hierarchy

get_panda_state

Get current panda data state.

kickoff

Kickoff the device.

on_complete

On complete is called after the scan is complete.

on_connected

Here we start the data readout thread upon connection to the PandaBox device.

on_destroy

Called when the device is destroyed.

on_init

Called when the device is initialized.

on_kickoff

Called to kickoff a device for a fly scan.

on_pre_scan

On pre_scan hook for the PandaBox.

on_stage

On stage hook for the PandaBox.

on_stop

Called when the device is stopped.

on_trigger

Called when the device is triggered.

on_unstage

Unstage hook for the PandaBox.

pause

Attempt to 'pause' the device.

pre_scan

Pre-scan function.

put

Put a value to all components of the device

read

Read data from the device.

read_configuration

Dictionary mapping names to value dicts with keys: value, timestamp

remove_data_callback

Remove a previously registered data callback.

remove_status_callback

Remove a previously registered status callback.

resume

Resume a device from a 'paused' state.

send_raw

Send a raw command to the PandaBox.

set_defaults

Set class-wide defaults for device communications

stage

Stage the device.

stop

Stopping the PandaBox device should ensure that the PandaBox is disarmed.

subscribe

Subscribe to events this event_type generates.

summary

trigger

Trigger the device.

unstage

Unstage the device.

unsubscribe

Remove a subscription

unsubscribe_all

wait_for_condition

Utility method to easily wait for signals or methods to reach an expected state.

wait_for_connection

Check if PandaBox is reachable by sending a raw command.

walk_components

Walk all components in the Device hierarchy

walk_signals

Walk all signals in the Device hierarchy

walk_subdevice_classes

Walk all sub-Devices classes in the Device hierarchy

walk_subdevices

Walk all sub-Devices in the hierarchy

Attributes

SUB_ACQ_DONE

SUB_DEVICE_MONITOR_1D

SUB_DEVICE_MONITOR_2D

SUB_DONE_MOVING

SUB_FILE_EVENT

SUB_MOTOR_IS_MOVING

SUB_PROGRESS

SUB_READBACK

SUB_VALUE

USER_ACCESS

attr_name

component_names

configuration_attrs

connected

If the device is connected.

connection_timeout

data

A descriptor representing a device component (or signal)

destroyed

Check if the device has been destroyed.

dotted_name

Return the dotted name

event_types

Events that can be subscribed to via obj.subscribe

hints

kind

lazy_wait_for_connection

name

name of the device

panda_state

Get the current state of the data acquisition on the PandaBox.

parent

The parent of the ophyd object.

read_attrs

report

A report on the object.

root

Walk parents to find ultimate ancestor (parent's parent...).

signal_names

staged

Check if the device has been staged.

stopped

Check if the device has been stopped.

subscriptions

trigger_signals

class OphydAttrList(device, kind, remove_kind, recurse_key)#

Bases: MutableSequence

list proxy to migrate away from Device.read_attrs and Device.config_attrs

append(value)#

S.append(value) – append value to the end of the sequence

clear() None -- remove all items from S#
count(value) integer -- return number of occurrences of value#
extend(values)#

S.extend(iterable) – extend sequence by appending elements from the iterable

index(value[, start[, stop]]) integer -- return first index of value.#

Raises ValueError if the value is not present.

Supporting start and stop arguments is optional, but recommended.

insert(index, object)#

S.insert(index, value) – insert value before index

pop([index]) item -- remove and return item at index (default last).#

Raise IndexError if list is empty or index is out of range.

remove(value)#

S.remove(value) – remove first occurrence of value. Raise ValueError if the value is not present.

reverse()#

S.reverse() – reverse IN PLACE

add_data_callback(callback: Callable[[ReadyData | StartData | FrameData | EndData | Data], None], data_type: PandaState = PandaState.FRAME) str#

Register a data callback to be called whenever new data is received from the PandaBox.

Parameters:
  • callback (Callable[[LITERAL_PANDA_DATA], None]) – The callback function to register. It should accept a single argument of type LITERAL_PANDA_DATA (see notes).

  • data_type ("ready", "start", "frame", "end") – The type of data to register the callback for. Defaults to “frame”.

Returns:

The unique ID of the registered callback. This can be used to remove the callback.

Return type:

str

classmethod add_instantiation_callback(callback, fail_if_late=False)#

Register a callback which will receive each OphydObject instance.

Parameters:
  • callback (callable) – Expected signature: f(ophydobj_instance)

  • fail_if_late (boolean) – If True, verify that OphydObj has not yet been instantiated and raise RuntimeError if it has, as a way of verify that no instances will be “missed” by this registry. False by default.

add_status_callback(status: StatusBase, success: list[PandaState], failure: list[PandaState], check_directly: bool = True) str#

This methods registers a status callback to the data receiving loop that will resolve if the PandaBox receives specific data events. It is used to allow asynchronous resolution of status objects based on PandaBox events. Per default, the callback checks the current panda_state directly to see if the status can be resolved immediately. This is useful when the status is created after the PandaBox has already sent some data events. However, this can also be disabled by setting check_directly to False.

Parameters:
  • status (StatusBase) – The status object to register the callback for.

  • success (list[PandaState]) – The list of PandaBox data events that will resolve the status as successful.

  • failure (list[PandaState]) – The list of PandaBox data events that will resolve the status as failed.

  • check_directly (bool) – Whether to check the current panda_state directly to resolve the status immediately. Defaults to True.

Returns:

The unique ID of the registered callback. This can be used to remove the callback. If the

status is resolved directly, an empty string is returned.

Return type:

str

cancel_on_stop(status: StatusBase) None#

Register a status object to be cancelled when the device is stopped.

Parameters:

status (StatusBase) – The status object to be cancelled.

check_value(value, **kwargs)#

Check if the value is valid for this object

This function does no normalization, but may raise if the value is invalid.

Raises:

ValueError

clear_sub(cb, event_type=None)#

Remove a subscription, given the original callback function

See also subscribe(), unsubscribe()

Parameters:
  • cb (callable) – The callback

  • event_type (str, optional) – The event to unsubscribe from (if None, removes it from all event types)

complete() DeviceStatus | StatusBase#

Complete the device.

configure(d: Dict[str, Any]) Tuple[Dict[str, Any], Dict[str, Any]]#

Configure the device for something during a run

This default implementation allows the user to change any of the configuration_attrs. Subclasses might override this to perform additional input validation, cleanup, etc.

Parameters:

d (dict) – The configuration dictionary. To specify the order that the changes should be made, use an OrderedDict.

Returns:

  • (old, new) tuple of dictionaries

  • Where old and new are pre- and post-configure configuration states.

property connected#

If the device is connected.

Subclasses should override this

convert_frame_data(frame_data: FrameData) dict[str, Any]#

Convert the data from a FrameData object into a dictionary with expected OPHYD read format, e.g. signal {signal_name: {“value”: […]}}. Please be aware that if this method is overridden by child classes, you need to make sure that the key names in the FrameData is converted to the expected names in the data signal. This includes replacing dots in the original PandaBox keys with underscores using “block_name_mapping” from the utils, and using the key mapping provided through the signal_alias mapping.

Parameters:

frame_data (FrameData) – The FrameData object received from the PandaBox.

Returns:

The converted data in OPHYD read format.

Return type:

dict[str, Any]

describe() OrderedDictType[str, Dict[str, Any]]#

Provide schema and meta-data for read().

This keys in the OrderedDict this method returns must match the keys in the OrderedDict return by read().

This provides schema related information, (ex shape, dtype), the source (ex PV name), and if available, units, limits, precision etc.

Returns:

data_keys – The keys must be strings and the values must be dict-like with the event_model.event_descriptor.data_key schema.

Return type:

OrderedDict

describe_configuration() OrderedDictType[str, Dict[str, Any]]#

Provide schema & meta-data for read_configuration()

This keys in the OrderedDict this method returns must match the keys in the OrderedDict return by read().

This provides schema related information, (ex shape, dtype), the source (ex PV name), and if available, units, limits, precision etc.

Returns:

data_keys – The keys must be strings and the values must be dict-like with the event_model.event_descriptor.data_key schema.

Return type:

OrderedDict

destroy()#

We append the cleanup of the data readout thread to the destroy method, and call it prior to the super().destroy() call. This ensures that the data readout thread is properly cleaned up when the PandaBox device is destroyed.

property destroyed: bool#

Check if the device has been destroyed.

property dotted_name: str#

Return the dotted name

property event_types#

Events that can be subscribed to via obj.subscribe

get(**kwargs)#

Get the value of all components in the device

Keyword arguments are passed onto each signal.get(). Components beginning with an underscore will not be included.

classmethod get_device_tuple()#

The device tuple type associated with an Device class

This is a tuple representing the full state of all components and dynamic device sub-components.

get_instantiated_signals(*, attr_prefix=None)#

Yields all of the instantiated signals in a device hierarchy

Parameters:

attr_prefix (string, optional) – The attribute prefix. If None, defaults to self.name

Yields:

(fully_qualified_attribute_name, signal_instance)

get_panda_state() str#

Get current panda data state.

kickoff() DeviceStatus | StatusBase#

Kickoff the device.

property name#

name of the device

on_complete()[source]#

On complete is called after the scan is complete. We need to wait for the capture to complete before we can disarm the PandaBox.

on_connected()#

Here we start the data readout thread upon connection to the PandaBox device. We do this after the super().on_connected() call to ensure that any additional connection logic from child classes is executed first.

on_destroy() None#

Called when the device is destroyed. Cleanup resources here.

on_init()[source]#

Called when the device is initialized.

No signals are connected at this point. If you like to set default values on signals, please use on_connected instead.

on_kickoff() DeviceStatus | StatusBase | None#

Called to kickoff a device for a fly scan. Has to be called explicitly.

on_pre_scan() StatusBase | OphydStatusBase | None#

On pre_scan hook for the PandaBox. We use this hook to arm the PCAP module for data acquisition. This logic makes sure that the data readout loop is started and that we received the READY event from the device. Only then can the PCAP module acquire data.

on_stage()[source]#

On stage hook for the PandaBox. Here we make sure that the PandaBox is disarmed before staging.

on_stop() None#

Called when the device is stopped.

on_trigger() DeviceStatus | StatusBase | None#

Called when the device is triggered.

on_unstage() list[object] | StatusBase | OphydStatusBase#

Unstage hook for the PandaBox. This resets the device to a known state before delegating to the parent implementation.

property panda_state: str#

Get the current state of the data acquisition on the PandaBox.

property parent#

The parent of the ophyd object.

If at the top of its hierarchy, parent will be None

pause() None#

Attempt to ‘pause’ the device.

This is called when ever the RunEngine is interrupted.

A device may have internal state that means plans can not safely be re-wound. This method may: put the device in a ‘paused’ state and/or raise NoReplayAllowed to indicate that the plan can not be rewound.

Raises:

bluesky.run_engine.NoReplayAllowed

pre_scan() DeviceStatus | StatusBase | None#

Pre-scan function.

put(dev_t, **kwargs)#

Put a value to all components of the device

Keyword arguments are passed onto each signal.put()

Parameters:

dev_t (DeviceTuple or tuple) – The device tuple with the value(s) to put (see get_device_tuple)

read() OrderedDictType[str, Dict[str, Any]]#

Read data from the device.

This method is expected to be as instantaneous as possible, with any substantial acquisition time taken care of in trigger().

The OrderedDict returned by this method must have identical keys (in the same order) as the OrderedDict returned by describe().

By convention, the first key in the return is the ‘primary’ key and maybe used by heuristics in bluesky.

The values in the ordered dictionary must be dict (-likes) with the keys {'value', 'timestamp'}. The 'value' may have any type, the timestamp must be a float UNIX epoch timestamp in UTC.

Returns:

data – The keys must be strings and the values must be dict-like with the keys {'value', 'timestamp'}

Return type:

OrderedDict

read_configuration() OrderedDictType[str, Dict[str, Any]]#

Dictionary mapping names to value dicts with keys: value, timestamp

To control which fields are included, change the Component kinds on the device, or modify the configuration_attrs list.

remove_data_callback(cb_id: str) None#

Remove a previously registered data callback.

Parameters:

cb_id (str) – The unique ID of the callback to remove.

remove_status_callback(cb_id: str) None#

Remove a previously registered status callback.

Parameters:

cb_id (str) – The unique ID of the callback to remove.

property report#

A report on the object.

resume() None#

Resume a device from a ‘paused’ state.

This is called by the bluesky.run_engine.RunEngine when it resumes from an interruption and is responsible for ensuring that the device is ready to take data again.

property root#

Walk parents to find ultimate ancestor (parent’s parent…).

send_raw(raw_command: str | list[str]) Any#

Send a raw command to the PandaBox. This can be used to set for example values on PandaBox block fields directly, e.g. ‘BITS.B=1’ to set the BITS.B field to 1. Please note, list of raw commands are not allowed as they have to sent sequentially. The list[str] input is needed to certain commands that require this syntax, e.g. [“SEQ1.TABLE>”, “1”, “1”, “0”, “0”, “”]

Parameters:

raw_command (str | list[str]) – The raw command to send to the PandaBox. We can also send a list of raw commands at once. This will be executed sequentially by the PandaBox client.

Returns:

The response from the PandaBox client.

Return type:

Any

Notes

Other useful raw commands: - ‘BITS.B=1’ or similar once to set bit fields - ‘*CAPTURE?’ to inspect which signals have been configured for capture (PCAP?) TODO to check

classmethod set_defaults(*, connection_timeout=10.0)#

Set class-wide defaults for device communications

This may be called only before any instances of Device are made.

This setting applies to the class it is called on and all its subclasses. For example,

>>> Device.set_defaults(...)

will apply to any Device subclass.

Parameters:

connection_timeout (float, optional) – Time (seconds) allocated for establishing a connection with the IOC.

Raises:

RuntimeError – If called after EpicsSignalBase has been instantiated for the first time.

stage() list[object] | DeviceStatus | StatusBase#

Stage the device.

property staged: Staged#

Check if the device has been staged.

stop(*, success=False)#

Stopping the PandaBox device should ensure that the PandaBox is disarmed. We call this prior to the super().stop() call to ensure that the PandaBox is disarmed before any additional stopping logic from child classes is executed.

property stopped: bool#

Check if the device has been stopped.

subscribe(callback, event_type=None, run=True)#

Subscribe to events this event_type generates.

The callback will be called as cb(*args, **kwargs) with the values passed to _run_subs with the following additional keys:

sub_type : the string value of the event_type obj : the host object, added if ‘obj’ not already in kwargs

if the key ‘timestamp’ is in kwargs _and_ is None, then it will be replaced with the current time before running the callback.

The *args, **kwargs passed to _run_subs will be cached as shallow copies, be aware of passing in mutable data.

Warning

If the callback raises any exceptions when run they will be silently ignored.

Parameters:
  • callback (callable) –

    A callable function (that takes kwargs) to be run when the event is generated. The expected signature is

    def cb(*args, obj: OphydObject, sub_type: str, **kwargs) -> None:
    

    The exact args/kwargs passed are whatever are passed to _run_subs

  • event_type (str, optional) –

    The name of the event to subscribe to (if None, defaults to the default sub for the instance - obj._default_sub)

    This maps to the sub_type kwargs in _run_subs

  • run (bool, optional) – Run the callback now

See also

clear_sub, _run_subs

Returns:

cid – id of callback, can be passed to unsubscribe to remove the callback

Return type:

int

trigger() DeviceStatus | StatusBase#

Trigger the device.

unstage() list[object] | DeviceStatus | StatusBase#

Unstage the device.

unsubscribe(cid)#

Remove a subscription

See also subscribe(), clear_sub()

Parameters:

cid (int) – token return by subscribe()

wait_for_condition(condition: Callable[[], bool], timeout: float, check_stopped: bool = False, interval: float = 0.05) bool#

Utility method to easily wait for signals or methods to reach an expected state.

Parameters:
  • condition (Callable) – function that returns True if the condition is met, False otherwise

  • timeout (float) – timeout in seconds

  • check_stopped (bool) – True if stopped flag should be checked

  • interval (float) – interval in seconds

Returns:

True if all signals are in the desired state, False if timeout is reached

Return type:

bool

Example

>>> self.wait_for_condition(condition=my_condition, timeout=5, interval=0.05, check_stopped=True)
wait_for_connection(timeout: float | None = None) bool#

Check if PandaBox is reachable by sending a raw command.

classmethod walk_components()#

Walk all components in the Device hierarchy

Yields:

ComponentWalk – Where ancestors is all ancestors of the signal, including the top-level device walk_components was called on.

walk_signals(*, include_lazy=False)#

Walk all signals in the Device hierarchy

EXPERIMENTAL: This method is experimental, and there are tentative plans to change its API in a way that may not be backward-compatible.

Parameters:

include_lazy (bool, optional) – Include not-yet-instantiated lazy signals

Yields:

ComponentWalk – Where ancestors is all ancestors of the signal, including the top-level device walk_signals was called on.

classmethod walk_subdevice_classes()#

Walk all sub-Devices classes in the Device hierarchy

Yields:

(dotted_name, subdevice_class)

walk_subdevices(*, include_lazy=False)#

Walk all sub-Devices in the hierarchy

EXPERIMENTAL: This method is experimental, and there are tentative plans to change its API in a way that may not be backward-compatible.

Yields:

(dotted_name, subdevice_instance)