import numpy as np
from pydrake.common.containers import namedview
from pydrake.common.value import AbstractValue
from pydrake.math import RigidTransform, RotationMatrix
from pydrake.systems.analysis import Simulator
from pydrake.systems.framework import BasicVector, LeafSystem
from pydrake.trajectories import PiecewisePolynomial
The Modeling Dynamical Systems tutorial gave a very basic introduction to Drake's Systems framework, including writing a basic LeafSystem. In this notebook we'll provide a more advanced/complete overview of authoring those systems.
Leaf systems can have any number of input and output ports. There are two basic types of ports: vector-valued ports and abstract-valued ports.
Let's start with vector-valued ports, which are the easiest to work with. The following system declares two vector input ports, and two vector output ports. It forms the sum and difference of the two 2-element input vectors and places the results on the two output ports.
| MyAdder |
|
class MyAdder(LeafSystem):
def __init__(self):
super().__init__() # Don't forget to initialize the base class.
self._a_port = self.DeclareVectorInputPort(name="a", size=2)
self._b_port = self.DeclareVectorInputPort(name="b", size=2)
self.DeclareVectorOutputPort(name="sum", size=2, calc=self.CalcSum)
self.DeclareVectorOutputPort(name="difference",
size=2,
calc=self.CalcDifference)
def CalcSum(self, context, output):
# Evaluate the input ports to obtain the 2x1 vectors.
a = self._a_port.Eval(context)
b = self._b_port.Eval(context)
# Write the sum into the output vector.
output.SetFromVector(a + b)
def CalcDifference(self, context, output):
# Evaluate the input ports to obtain the 2x1 vectors.
a = self._a_port.Eval(context)
b = self._b_port.Eval(context)
# Write the difference into output vector.
output.SetFromVector(a - b)
# Construct an instance of this system and a context.
system = MyAdder()
context = system.CreateDefaultContext()
# Fix the input ports to some constant values.
system.GetInputPort("a").FixValue(context, [3, 4])
system.GetInputPort("b").FixValue(context, [1, 2])
# Evaluate the output ports.
print(f"sum: {system.GetOutputPort('sum').Eval(context)}")
print(f"difference: {system.GetOutputPort('difference').Eval(context)}")
There are a few things to notice here:
Eval()
method from any of the system methods that accept a Context
.It is also helpful to understand that:
DeclareVectorOutputPort()
and DeclareAbstractOutputPort()
ports take an optional prerequisites_of_calc
argument which is used by the caching system to determine when the output port needs to be re-evaluated. Declaring narrow prerequisites can speed up your code. Surprisingly, explicitly narrowing the prerequisites can also dramatically speed up the construction of some Diagram
s, because when these are not specified the DiagramBuilder
attempts to check for algebraic loops by converting your systems into symbolic form.Not all inputs and outputs are best represented as a vector. Drake's systems framework also supports passing structured types across ports, which is implemented in C++ using a technique called "type erasure". From the perspective of the systems framework, classes work with an "abstract" data type, and only the system implementation itself needs to know how to work with the structured type. In practice, if you can overlook a small amount of boilerplate, the usage is straightforward. Let's give an example of a LeafSystem
that uses a RigidTransform
in its input and output ports:
| RotateAboutZ |
|
class RotateAboutZ(LeafSystem):
def __init__(self):
super().__init__() # Don't forget to initialize the base class.
self.DeclareAbstractInputPort(name="in",
model_value=AbstractValue.Make(
RigidTransform()))
self.DeclareAbstractOutputPort(
name="out",
alloc=lambda: AbstractValue.Make(RigidTransform()),
calc=self.CalcOutput)
def CalcOutput(self, context, output):
# Evaluate the input port to obtain the RigidTransform.
X_1 = system.get_input_port().Eval(context)
X_2 = RigidTransform(RotationMatrix.MakeZRotation(np.pi / 2)) @ X_1
# Set the output RigidTransform.
output.set_value(X_2)
# Construct an instance of this system and a context.
system = RotateAboutZ()
context = system.CreateDefaultContext()
# Fix the input port to a constant value.
system.get_input_port().FixValue(context, RigidTransform())
# Evaluate the output port.
print(f"output: {system.get_output_port(0).Eval(context)}")
Systems store their state in the Context
, and work with state in a very similar way to input and output ports. Again, we can have vector-valued state or abstract-valued state. Abstract state can only be updated in a discrete-time fashion, but vector-valued state can be declared to be either discrete or continuous.
A discrete state $x_d$ will evolve based on update events, the most common of which would be a simple periodic event to define a difference equation. We saw an example of this in the introductory notebook.
class SimpleDiscreteTimeSystem(LeafSystem):
def __init__(self):
super().__init__()
state_index = self.DeclareDiscreteState(1) # One state variable.
self.DeclareStateOutputPort("y", state_index) # One output: y=x.
self.DeclarePeriodicDiscreteUpdateEvent(
period_sec=1.0, # One second time step.
offset_sec=0.0, # The first event is at time zero.
update=self.Update) # Call the Update method defined below.
# x[n+1] = x^3[n].
def Update(self, context, discrete_state):
x = context.get_discrete_state_vector().GetAtIndex(0)
x_next = x**3
discrete_state.get_mutable_vector().SetAtIndex(0, x_next)
# Instantiate the System.
system = SimpleDiscreteTimeSystem()
simulator = Simulator(system)
context = simulator.get_mutable_context()
# Set the initial conditions: x[0] = [0.9].
context.get_mutable_discrete_state_vector().SetFromVector([0.9])
# Run the simulation.
simulator.AdvanceTo(4.0)
print(context.get_discrete_state_vector())
You can find details on the implementation and order of these events here. It's also possible to define more complicated events for updating the discrete variables.
Note that the states of a system are not immediately accessible to other systems in a Diagram
. For a system to share its state, it must do that using an output port. We provide the DeclareStateOutputPort()
method to make this common case easy.
The evolution of continuous vector-valued state variables is governed by a differential equation. While it is possible and convenient to declare many different groups of discrete state variables (they may even be updated at different rates by different events), we only define zero or one continuous state vector for a system, and define its dynamics by overloading the LeafSystem::DoCalcTimeDerivatives()
method. Here is the simple continuous-time example in the introductory notebook again:
# Define the system.
class SimpleContinuousTimeSystem(LeafSystem):
def __init__(self):
super().__init__()
state_index = self.DeclareContinuousState(1) # One state variable.
self.DeclareStateOutputPort("y", state_index) # One output: y=x.
# xdot(t) = -x(t) + x^3(t).
def DoCalcTimeDerivatives(self, context, derivatives):
x = context.get_continuous_state_vector().GetAtIndex(0)
xdot = -x + x**3
derivatives.get_mutable_vector().SetAtIndex(0, xdot)
# Instantiate the System.
system = SimpleContinuousTimeSystem()
simulator = Simulator(system)
context = simulator.get_mutable_context()
# Set the initial conditions: x(0) = [0.9]
context.SetContinuousState([0.9])
# Run the simulation.
simulator.AdvanceTo(4.0)
print(context.get_continuous_state_vector())
For abstract-valued state, we have a similar workflow. We declare the abstract state in the constructor, and define update events to update that state. Discrete update events are restricted to updating only the discrete states, so we use "unrestricted" events to update the abstract state. Here is an example which stores a simple PiecewisePolynomial
trajectory as abstract state:
class AbstractStateSystem(LeafSystem):
def __init__(self):
super().__init__()
self._traj_index = self.DeclareAbstractState(
AbstractValue.Make(PiecewisePolynomial()))
self.DeclarePeriodicUnrestrictedUpdateEvent(period_sec=1.0,
offset_sec=0.0,
update=self.Update)
def Update(self, context, state):
t = context.get_time()
traj = PiecewisePolynomial.FirstOrderHold(
[t, t + 1],
np.array([[-np.pi / 2.0 + 1., -np.pi / 2.0 - 1.], [-2., 2.]]))
# Update the state
state.get_mutable_abstract_state(int(self._traj_index)).set_value(traj)
system = AbstractStateSystem()
simulator = Simulator(system)
context = simulator.get_mutable_context()
# Set an initial condition for the abstract state.
context.SetAbstractState(0, PiecewisePolynomial())
# Run the simulation.
simulator.AdvanceTo(4.0)
traj = context.get_abstract_state(0).get_value()
print(f"breaks: {traj.get_segment_times()}")
print(f"traj({context.get_time()}) = {traj.value(context.get_time())}")
Parameters are like state, except that they are constant through the lifetime of a simulation. Parameters in the systems framework are declared and accessed almost identically to state, but they are never updated. Once again, we have vector-valued (declared with DeclareNumericParameter
) and abstract-valued (via DeclareAbstractParameter
) parameters.
SystemWithParameters |
|
class SystemWithParameters(LeafSystem):
def __init__(self):
super().__init__() # Don't forget to initialize the base class.
self.DeclareNumericParameter(BasicVector([1.2, 3.4]))
self.DeclareAbstractParameter(
AbstractValue.Make(
RigidTransform(RotationMatrix.MakeXRotation(np.pi / 6))))
# Declare output ports to demonstrate how to access the parameters in
# system methods.
self.DeclareVectorOutputPort(name="numeric",
size=2,
calc=self.OutputNumeric)
self.DeclareAbstractOutputPort(
name="abstract",
alloc=lambda: AbstractValue.Make(RigidTransform()),
calc=self.OutputAbstract)
def OutputNumeric(self, context, output):
output.SetFromVector(context.get_numeric_parameter(0).get_value())
def OutputAbstract(self, context, output):
output.set_value(context.get_abstract_parameter(0).get_value())
# Construct an instance of this system and a context.
system = SystemWithParameters()
context = system.CreateDefaultContext()
# Evaluate the output ports.
print(f"numeric: {system.get_output_port(0).Eval(context)}")
print(f"abstract: {system.get_output_port(1).Eval(context)}")
In addition to methods that update the state, and methods that evaluate an output port, another supported method in a LeafSystem
is a callback to "publish". A "publish" method cannot modify any state: they are useful for broadcasting data outside of the systems framework (e.g. via a message-passing protocol like ROS), for terminating a simulation, for detecting errors, and for forcing boundaries between integration steps. They are declared in much the same way as other system callbacks.
class MyPublishingSystem(LeafSystem):
def __init__(self):
super().__init__()
# Calling `ForcePublish()` will trigger the callback.
self.DeclareForcedPublishEvent(self.Publish)
# Publish once every second.
self.DeclarePeriodicPublishEvent(period_sec=1,
offset_sec=0,
publish=self.Publish)
def Publish(self, context):
print(f"Publish() called at time={context.get_time()}")
system = MyPublishingSystem()
simulator = Simulator(system)
simulator.AdvanceTo(5.3)
# We can also "force" a publish at a arbitrary time.
print("\ncalling ForcedPublish:")
system.ForcedPublish(simulator.get_context())
Abstract-valued ports, state, and/or parameters can be used to work with structured data. But it can also be convenient to use string names to reference the individual elements of vector-valued data. In Python, we recommend using a namedview
workflow, which was inspired by Python's namedtuple
and is demonstrated in the example below. The Drake developers plan to provide similar functionality in C++.
Note that the memory layout for a continuous state vector is slightly different than for discrete state, parameters, and input/output port data, so the syntax for dealing with it requires a slightly distinct CopyToVector()
and SetFromVector()
. See issue #9171.
Note that usages of my_view[:]
are meant as a shorthand to get a NumPy array view of the namedview
itself.
For more details, see the example and warning in the documentation for namedview
.
# Define the system.
class NamedViewDemo(LeafSystem):
MyDiscreteState = namedview("MyDiscreteState", ["a", "b"])
MyContinuousState = namedview("MyContinuousState", ["x", "z", "theta"])
MyOutput = namedview("MyOutput", ["x","a"])
def __init__(self):
super().__init__()
self.DeclareDiscreteState(2)
self.DeclarePeriodicDiscreteUpdateEvent(
period_sec=1.0,
offset_sec=0.0,
update=self.DiscreteUpdate)
self.DeclareContinuousState(3)
self.DeclareVectorOutputPort(name="out", size=2, calc=self.CalcOutput)
def DiscreteUpdate(self, context, discrete_values):
discrete_state = self.MyDiscreteState(
context.get_discrete_state_vector().value())
continuous_state = self.MyContinuousState(
context.get_continuous_state_vector().CopyToVector())
next_state = self.MyDiscreteState(discrete_values.get_mutable_value())
# Now we can compute the next state by referencing each element by name.
next_state.a = discrete_state.a + 1
next_state.b = discrete_state.b + continuous_state.x
def DoCalcTimeDerivatives(self, context, derivatives):
continuous_state = self.MyContinuousState(
context.get_continuous_state_vector().CopyToVector())
dstate_dt = self.MyContinuousState(continuous_state[:])
dstate_dt.x = -continuous_state.x
dstate_dt.z = -continuous_state.z
dstate_dt.theta = -np.arctan2(continuous_state.z, continuous_state.x)
derivatives.SetFromVector(dstate_dt[:])
def CalcOutput(self, context, output):
discrete_state = self.MyDiscreteState(
context.get_discrete_state_vector().value())
continuous_state = self.MyContinuousState(
context.get_continuous_state_vector().CopyToVector())
out = self.MyOutput(output.get_mutable_value())
out.x = continuous_state.x
out.a = discrete_state.a
# Instantiate the System.
system = NamedViewDemo()
simulator = Simulator(system)
context = simulator.get_mutable_context()
# Set the initial conditions.
initial_discrete_state = NamedViewDemo.MyDiscreteState([3, 4])
context.SetDiscreteState(initial_discrete_state[:])
initial_continuous_state = NamedViewDemo.MyContinuousState.Zero()
initial_continuous_state.x = 0.5
initial_continuous_state.z = 0.92
initial_continuous_state.theta = 0.23
context.SetContinuousState(initial_continuous_state[:])
# Run the simulation.
simulator.AdvanceTo(4.0)
print(
NamedViewDemo.MyDiscreteState(context.get_discrete_state_vector().value()))
print(
NamedViewDemo.MyContinuousState(
context.get_continuous_state_vector().CopyToVector()))
print(NamedViewDemo.MyOutput(system.get_output_port().Eval(context)))
In order to support AutoDiff and Symbolic types throughout the systems framework, LeafSystems can be written to support "scalar type conversion". In Python, adding this support requires a small amount of boilerplate. Here is a simple example:
| RunningCost |
|
from pydrake.systems.framework import LeafSystem_
from pydrake.systems.scalar_conversion import TemplateSystem
from pydrake.autodiffutils import AutoDiffXd
from pydrake.symbolic import Expression
@TemplateSystem.define("RunningCost_")
def RunningCost_(T):
class Impl(LeafSystem_[T]):
def _construct(self, converter=None, Q=np.eye(2)):
super().__init__(converter)
self._Q = Q
self._state_port = self.DeclareVectorInputPort("state", 2)
self._command_port = self.DeclareVectorInputPort("command", 1)
self.DeclareVectorOutputPort("cost", 1, self.CostOutput)
def _construct_copy(self, other, converter=None):
# Any member fields (e.g. configuration values) need to be
# transferred here from `other` to `self`.
Impl._construct(self, converter=converter, Q=other._Q)
def CostOutput(self, context, output):
x = self._state_port.Eval(context)
u = self._command_port.Eval(context)[0]
output[0] = x.dot(self._Q.dot(x)) + u**2
return Impl
RunningCost = RunningCost_[None] # Default instantiation.
The important steps are:
@TemplateSystem
decorator.LeafSystem_[T]
instead of simply LeafSystem
._construct
method instead of the typical __init__
method._construct_copy
method, which needs to populate the same member fields as _construct
(as we did with self.Q
in this example).RunningCost
in addition to using RunningCost_[float]
.For further details, you can find the related documentation for scalar conversion in C++ here and the documentation for the @TemplateSystem
decorator.
# Having done this, we can still use the system in the original way:
system = RunningCost(Q=np.diag([10, 1]))
context = system.CreateDefaultContext()
system.get_input_port(0).FixValue(context, [1, 2])
system.get_input_port(1).FixValue(context, [3])
print(system.get_output_port().Eval(context))
# But we can also now use an autodiff or symbolic versions of the system,
# either by declaring them directly:
system_ad = RunningCost_[AutoDiffXd]()
system_symbolic = RunningCost_[Expression]()
# or by scalar conversion:
system_ad = system.ToAutoDiffXd()
system_symbolic = system.ToSymbolic()
# We can also convert the time, state, parameters, and (if needed) input ports
# from the original system:
context_symbolic = system_symbolic.CreateDefaultContext()
context_symbolic.SetTimeStateAndParametersFrom(context)
system_symbolic.FixInputPortsFrom(system, context, context_symbolic)
print(system_symbolic.get_output_port().Eval(context_symbolic))
Caching in the systems framework
Declaring system constraints: equality and inequality.