In this notebook you will:
Recommended Prerequisites:
Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace status
with restart all
and run again.
!supervisorctl -c supervisor/supervisord.conf status
%run scripts/beamline_configuration.py
import time
from ophyd import Device, Signal, Component as Cpt, DeviceStatus
from ophyd.sim import SynSignal, SynPeriodicSignal
sig = Signal(name='sig', value=3)
sig
sig.name
sig.parent is None
sig.connected
sig.limits
sig.read()
sig.describe()
def cb(value, old_value, **a_whole_bunch_of_junk):
print(f'changed from {old_value} to {value}')
sig.subscribe(cb)
# The act of subscribing always generates one reading immediately...
If this were an EpicsSignal
instead of a Signal
, cb
would be called from a thread every time pyepics receives a new update about the value of sig
. In this case, we have to update it manually.
sig.put(5)
sig.put(10)
Or we can connect to the random_walk
IOC which publishes a new updates at a regular interval.
from ophyd import EpicsSignal
rand = EpicsSignal('random_walk:x', name='rand')
token = rand.subscribe(cb)
rand.unsubscribe(token)
def cb():
print("finished at t =", time.time())
status = sig.set(5)
status.add_callback(cb)
status
status.done
status = sig.trigger()
status.add_callback(cb)
status = DeviceStatus(sig)
status.done
status.success
def cb():
print("BOOM")
status.add_callback(cb)
status.callbacks
status.device # the Device or Signal that the Status pertains to
status._finished()
status.done
status.success
# Failure looks like this:
status = DeviceStatus(sig)
status.add_callback(cb)
status._finished(success=False)
status.success
We'll see later how to actually use this in practice.
# This encodes the _structure_ of a kind of Device.
# Real examples include EpicsMotor, EpicsScaler or user-defined
# combinations of these, such as a platform that can move in X and Y.
class Platform(Device):
x = Cpt(Signal, value=3)
y = Cpt(Signal, value=4)
p1 = Platform(name='p1')
p2 = Platform(name='p2')
p1
p1.component_names
p1.x
p1.y
p1.name
p1.x.name
p1.x.parent is p1
p1.read()
p1.x.read()
and describe
works exactly the same way:
p1.describe()
p1.x.describe()
OMITTED
-- not read (exposed for debugging only)NORMAL
/ read_attrs
-- things to read once per Event (i.e. row in the table)CONFIG
/ configuration_attrs
-- things to read once per Event Descriptor (which usually means one per run)HINTED
-- subset of NORMAL
flagged as interestingp1.read_attrs
p1.configuration_attrs
# dumb example...
class Platform(Device):
x = Cpt(Signal, value=3)
y = Cpt(Signal, value=4)
motion_compensation = Cpt(Signal, value=1, kind='CONFIG') # a boolean
p1 = Platform(name='p1')
p2 = Platform(name='p2')
p1.read_attrs
p1.configuration_attrs
p1.read_configuration()
p1.describe_configuration()
The data from configuration_attrs
isn't displayed by the built-in callbacks...
RE(count([p1]))
... but the data is saved, and it can accessed conveniently like so:
h = db[-1]
h.config_data('p1')
p1.summary()
Hints are meant to help downstream consumers of the data correctly infer user intent and automatically construct useful views on the data. They are only a suggestion. They do not affect what is saved.
# dumb example...
class Platform(Device):
x = Cpt(Signal, value=3, kind='hinted')
y = Cpt(Signal, value=4, kind='hinted')
motion_compensation = Cpt(Signal, value=1, kind='config') # a boolean
p1 = Platform(name='p1')
p1.hints
p1.summary()
class Platform(Device):
_default_configuration_attrs = ('motion_compensation',)
_default_read_attrs = ('x', 'y')
x = Cpt(Signal, value=3)
y = Cpt(Signal, value=4)
motion_compensation = Cpt(Signal, value=1) # a boolean
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stage_sigs['motion_compensation'] = 1
p1 = Platform(name='p1')
p1.motion_compensation.get()
p1.motion_compensation.put(0)
Device.stage()
stashes the current state of the signals in stage_sigs
and then puts the device into the desired state.
p1.stage()
p1.motion_compensation.get()
Device.unstage()
uses that stashed stage to put everything back.
p1.unstage()
p1.motion_compensation.get()
Staging twice is illegal:
p1.stage()
# THIS IS EXPECTED TO CREATE AN ERROR.
p1.stage()
But unstaging is indempotent:
p1.unstage()
p1.unstage()
p1.unstage()
from ophyd import (PseudoPositioner, PseudoSingle)
from ophyd.pseudopos import (pseudo_position_argument,
real_position_argument)
from ophyd import SoftPositioner
C = Cpt
class SPseudo3x3(PseudoPositioner):
pseudo1 = C(PseudoSingle, limits=(-10, 10), egu='a')
pseudo2 = C(PseudoSingle, limits=(-10, 10), egu='b')
pseudo3 = C(PseudoSingle, limits=None, egu='c')
real1 = C(SoftPositioner, init_pos=0.)
real2 = C(SoftPositioner, init_pos=0.)
real3 = C(SoftPositioner, init_pos=0.)
sig = C(Signal, value=0)
@pseudo_position_argument
def forward(self, pseudo_pos):
# logger.debug('forward %s', pseudo_pos)
return self.RealPosition(real1=-pseudo_pos.pseudo1,
real2=-pseudo_pos.pseudo2,
real3=-pseudo_pos.pseudo3)
@real_position_argument
def inverse(self, real_pos):
# logger.debug('inverse %s', real_pos)
return self.PseudoPosition(pseudo1=-real_pos.real1,
pseudo2=-real_pos.real2,
pseudo3=-real_pos.real3)
p3 = SPseudo3x3(name='p3')
from ophyd.sim import det
RE(scan([det, p3], p3.pseudo2, -1, 1, 5))