In this tutorial we will see how the stage
and unstage
methods can be used to get a device ready and then return it to its previous state.
First, let's ensure our simulated IOCs are running.
The IOCs may already be running in the background. Run this command to verify
that they are running: it should produce output with STARTING or RUNNING on each line.
In the event of a problem, edit this command to replace status
with restart all
and run again.
!../supervisor/start_supervisor.sh status
There are many situations when we want to ensure that one or more PVs are set up in a well-defined and safe state, prior to doing things such as triggering a detector or collecting data. Likewise, once we are done, we may want to return everything back to the way it was, to get it back to a neutral state rather than leaving it primed for the specific task we were doing.
In Bluesky, this sort of priming or "setting the stage" is conventionally done using the stage
and unstage
methods.
This is typically used with detectors, but it can be used with any Device.
Consider the following example:
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, set_and_wait, DeviceStatus
import time
class TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain', kind='config')
exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
reading = Component(EpicsSignalRO, ':reading', kind='normal')
acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
enabled = Component(EpicsSignal, ':enabled', kind='omitted')
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
def stage(self):
"""
Set the stage, storing previous value for later.
"""
self.initial_enabled_state = self.enabled.get()
set_and_wait(self.enabled, 1)
return super().stage()
def unstage(self):
"""
Restore previously saved state.
"""
ret = super().unstage()
set_and_wait(self.enabled, self.initial_enabled_state)
return ret
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
triggered_detector
When bluesky obtains a reading from some device
it typically:
device.trigger()
and receives back a status objectdevice.read()
If it obtains multiple readings in sequence, it repeats this trigger/wait/read cycle.
Here we can see simple examples of stage
and unstage
methods, which can be used to perform a choreographed sequence of steps necessary to make device
ready for use and some corresponding sequence to put in back safely into a resting state. Bluesky plans typically call device.stage()
once before first using a device in a plan and then device.unstage()
at the end. There are mechanisms to ensure that, even if a plan is interrupted by the user or by an exception is raised, device.unstage()
will be called if device.stage()
was previously called as part of a plan executed by RE
.
Important: Note the use of set_and_wait
in these methods. Unlike many of the other mechanisms in bluesky, we do not want these methods to return until we are certain that the desired state is fully set and complete. So we use those blocking calls to ensure the PV values are set to the indicated values.
There is no mechanism to automatically call device.stage()
just because it was defined. You must invoke it explicitly, either as part of a plan or manually from the command line. If invoked manually, you must also manually unstage
. Let's have a look at what this might look like:
# Set up our "pre-stage" state to have the `enabled` signal at 0
triggered_detector.enabled.put(0)
triggered_detector.enabled.get()
# Staging should set it to 1, and ensure it is fully set prior to returning
triggered_detector.stage()
triggered_detector.enabled.get()
# Trigger the detector and obtain status object to monitor
status = triggered_detector.trigger()
# Wait until it is done prior to reading
while not status.done:
time.sleep(0.01)
triggered_detector.read()
# We can trigger and read again; RE would typically do this in a loop internally
status = triggered_detector.trigger()
while not status.done:
time.sleep(0.01)
triggered_detector.read()
# Unstage when we are done
triggered_detector.unstage()
# Let's check that `enabled` really went back to its previous state:
triggered_detector.enabled.get()
stage_sigs
¶Storing and restoring individual signal values by hand is a bit tedious. The base implementations of stage
and unstage
allow us to do this with far less manual labor. In fact, in this simple scenario, we can do away with overriding these methods entirely:
class TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain', kind='config')
exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
reading = Component(EpicsSignalRO, ':reading', kind='hinted')
acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
enabled = Component(EpicsSignal, ':enabled', kind='omitted')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stage_sigs['enabled'] = 1 # OrderedDict mapping component name to desired state
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
triggered_detector = TriggeredDetector('trigger_with_pc', name='triggered_detector')
Keep in mind this is a fairly trivial example and, if you need to perform more sophisticated setup and cleanup, you will still need to define your own stage
and unstage
methods.
Try staging the device twice in a row. Then try unstaging it twice in a row.
stop
, resume
, pause
¶These optional methods can be used to further customize a Device's cleanup:
stop
-- called by bluesky when a plan is paused or exits (successfully or in error)pause
-- called when the RunEngine is pausedresume
-- called when the RunEngine resumes from a pauseclass TriggeredDetector(Device):
"""
A detector that requires triggering
"""
gain = Component(EpicsSignal, ':gain', kind='config')
exposure_time = Component(EpicsSignal, ':exposure_time', kind='config')
reading = Component(EpicsSignalRO, ':reading', kind='hinted')
acquire = Component(EpicsSignal, ':acquire', kind='omitted', put_complete=True)
enabled = Component(EpicsSignal, ':enabled', kind='omitted')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stage_sigs['enabled'] = 1 # OrderedDict mapping component name to desired state
def trigger(self):
"""
Trigger the detector and return a Status object.
"""
status = DeviceStatus(self)
self.acquire.put(1, callback=status._finished)
return status
def resume(self):
...
def pause(self):
...
def stop(self, success=False):
...