#!/usr/bin/env python # coding: utf-8 # # Achieving Complex Behaviors with Set Methods # # In this tutorial we will learn how to encode more complex behaviors into Devices by defining `set` methods. # # This will allow us to set multiple PVs at once, as well as to perform calculations on input values as needed. # # ## Set up for tutorial # # First, let's ensure our simulated IOCs are running. # # The IOCs may already be running in the background. Run this command to verify # that they are running: it should produce output with STARTING or RUNNING on each line. # In the event of a problem, edit this command to replace `status` with `restart all` and run again. # In[ ]: get_ipython().system('../supervisor/start_supervisor.sh status') # ## Adding a set method to `Device` # # Sometimes, setting a value to a Signal and knowing when it is "done" involves just one PV. Here's a simple example from the previous tutorial: # In[ ]: from ophyd import Device, Component, EpicsSignal, EpicsSignalRO class RandomWalk(Device): x = Component(EpicsSignalRO, 'x') dt = Component(EpicsSignal, 'dt') random_walk = RandomWalk('random_walk:', name='random_walk') random_walk.wait_for_connection() status = random_walk.dt.set(2) # In other cases it involves coordination across multiple PVs, such as a setpoint PV and a readback PV, or a setpoint PV and a "done" PV. For those cases, we define a `set` method on the Device to manage the coordination across multiple Signals. # In[ ]: from ophyd import DeviceStatus class Decay(Device): """ A device with a setpoint and readback that decays exponentially toward the setpoint. """ readback = Component(EpicsSignalRO, ':I') setpoint = Component(EpicsSignal, ':SP') def set(self, setpoint): """ Set the setpoint and return a Status object that monitors the readback. """ status = DeviceStatus(self) # Wire up a callback that will mark the status object as finished # when the readback approaches within some tolerance of the setpoint. def callback(old_value, value, **kwargs): TOLERANCE = 1 # hard-coded; we'll make this configurable later on... if abs(value - setpoint) < TOLERANCE: status.set_finished() self.readback.clear_sub(callback) self.readback.subscribe(callback) # Now 'put' the value. self.setpoint.put(setpoint) # And return the Status object, which the caller can use to # tell when the action is complete. return status decay = Decay('decay', name='decay') decay.wait_for_connection() decay # In[ ]: decay.read() # In[ ]: status = decay.set(115) # We can watch for completion either by registering a callback: # In[ ]: def callback(status): print("DONE:", status) status.add_callback(callback) # or by blocking: # In[ ]: status = decay.set(120) status.wait() # blocks here print("DONE!") # ### Make the tolerance configurable with a "soft" Signal # In[ ]: from ophyd import Signal class Decay(Device): """ A device with a setpoint and readback that decays exponentially toward the setpoint. """ readback = Component(EpicsSignalRO, ':I') setpoint = Component(EpicsSignal, ':SP') tolerance = Component(Signal, value=1) # not associated with anything in EPICS---a pure ophyd construct def set(self, setpoint): """ Set the setpoint and return a Status object that monitors the readback. """ status = DeviceStatus(self) # Wire up a callback that will mark the status object as finished # when the readback approaches within some tolerance of the setpoint. def callback(old_value, value, **kwargs): if abs(value - setpoint) < self.tolerance.get(): status.set_finished() self.readback.clear_sub(callback) self.readback.subscribe(callback) # Now 'put' the value. self.setpoint.put(setpoint) # And return the Status object, which the caller can use to # tell when the action is complete. return status decay = Decay('decay', name='decay') status = decay.set(125) status.add_callback(callback) # In[ ]: decay.tolerance.set(2) status = decay.set(130) status.add_callback(callback) # ### Let the IOC tell us when it is done # # Some IOCs (but not all) provide a specific signal that we can use to know when a set is complete. In that case we can remove the "tolerance" logic entirely if we want to and trust the IOC. # In[ ]: class Decay(Device): """ A device with a setpoint and readback that decays exponentially toward the setpoint. """ readback = Component(EpicsSignalRO, ':I') setpoint = Component(EpicsSignal, ':SP') done = Component(EpicsSignalRO, ':done') def set(self, setpoint): """ Set the setpoint and return a Status object that monitors the 'done' PV. """ status = DeviceStatus(self) # Wire up a callback that will mark the status object as finished # when the done signal goes from low to high---that is, a positive edge. def callback(old_value, value, **kwargs): if old_value == 0 and value == 1: status.set_finished() self.done.clear_sub(callback) self.done.subscribe(callback) # Now 'put' the value. self.setpoint.put(setpoint) # And return the Status object, which the caller can use to # tell when the action is complete. return status decay = Decay('decay', name='decay') decay # In[ ]: status = decay.set(135) status.add_callback(callback) # ## `PVPositioner` # # The pattern of `readback`, `setpoint` and `done` is pretty common, so ophyd has a special `Device` subclass that writes the `set()` method for you if you provide components with these particular names. # In[ ]: from ophyd import PVPositioner class Decay(PVPositioner): """ A device with a setpoint and readback that decays exponentially toward the setpoint. """ readback = Component(EpicsSignalRO, ':I') setpoint = Component(EpicsSignal, ':SP') done = Component(EpicsSignalRO, ':done') # actuate = Component(EpicsSignal, ...) # the "Go" button, not applicable to this IOC, but sometimes needed decay = Decay('decay', name='decay') status = decay.set(140) status.add_callback(callback) # In[ ]: