Links won't work on Github but should work with nbviewer.
The easiest way to load a configuration is by making sure it is structured just as the Machine
constructor. Your first level elements should be name
, transitions
, states
and so on. When your yaml/json configuration is loaded, you can add your model programatically and pass the whole object to Machine
.
from transitions import Machine
import json
class Model:
def say_hello(self, name):
print(f"Hello {name}!")
# import json
json_config = """
{
"name": "MyMachine",
"states": [
"A",
"B",
{ "name": "C", "on_enter": "say_hello" }
],
"transitions": [
["go", "A", "B"],
{"trigger": "hello", "source": "*", "dest": "C"}
],
"initial": "A"
}
"""
model = Model()
config = json.loads(json_config)
config['model'] = model # adding a model to the configuration
m = Machine(**config) # **config unpacks arguments as kwargs
assert model.is_A()
model.go()
assert model.is_B()
model.hello("world") # >>> Hello world!
assert model.state == 'C'
from transitions import Machine
import yaml
class Model:
def say_hello(self, name):
print(f"Hello {name}!")
yaml_config = """
---
name: "MyMachine"
states:
- "A"
- "B"
- name: "C"
on_enter: "say_hello"
transitions:
- ["go", "A", "B"]
- {trigger: "hello", source: "*", dest: "C"}
initial: "A"
"""
model = Model()
config = yaml.safe_load(yaml_config)
config['model'] = model # adding a model to the configuration
m = Machine(**config) # **config unpacks arguments as kwargs
assert model.is_A()
model.go()
assert model.is_B()
model.hello("world") # >>> Hello world!
assert model.state == 'C'
A default Machine
does not keep track of its configuration but transitions.extensions.markup.MarkupMachine
does.
MarkupMachine
cannot just be used to export your configuration but also to visualize or instrospect your configuration conveniently.
Is is also the foundation for GraphMachine
. You will see that MarkupMachine
will always export every attribute even unset values. This makes such exports visually cluttered but easier to automatically process.
If you plan to use such a configuration with a 'normal' Machine
, you should remove the models
attribute from the markup since Machine
cannot process it properly.
If you pass the (stored and loaded) configuration to another MarkupMachine
however, it will attempt to create and initialize models for you.
#export
from transitions.extensions.markup import MarkupMachine
import json
import yaml
class Model:
def say_hello(self, name):
print(f"Hello {name}!")
model = Model()
m = MarkupMachine(model=None, name="ExportedMachine")
m.add_state('A')
m.add_state('B')
m.add_state('C', on_enter='say_hello')
m.add_transition('go', 'A', 'B')
m.add_transition(trigger='hello', source='*', dest='C')
m.initial = 'A'
m.add_model(model)
model.go()
print("JSON:")
print(json.dumps(m.markup, indent=2))
print('\nYAML:')
print(yaml.dump(m.markup))
config2 = json.loads(json.dumps(m.markup)) # simulate saving and loading
m2 = MarkupMachine(markup=config2)
model2 = m2.models[0] # get the initialized model
assert model2.is_B() # the model state was preserved
model2.hello('again') # >>> Hello again!
assert model2.state == 'C'
transitions
with django models?¶In this comment proofit404 provided a nice example about how to use transitions
and django together:
from django.db import models
from django.db.models.signals import post_init
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from transitions import Machine
class ModelWithState(models.Model):
ASLEEP = 'asleep'
HANGING_OUT = 'hanging out'
HUNGRY = 'hungry'
SWEATY = 'sweaty'
SAVING_THE_WORLD = 'saving the world'
STATE_TYPES = [
(ASLEEP, _('asleep')),
(HANGING_OUT, _('hanging out')),
(HUNGRY, _('hungry')),
(SWEATY, _('sweaty')),
(SAVING_THE_WORLD, _('saving the world')),
]
state = models.CharField(
_('state'),
max_length=100,
choices=STATE_TYPES,
default=ASLEEP,
help_text=_('actual state'),
)
@receiver(post_init, sender=ModelWithState)
def init_state_machine(instance, **kwargs):
states = [state for state, _ in instance.STATE_TYPES]
machine = instance.machine = Machine(model=instance, states=states, initial=instance.state)
machine.add_transition('work_out', instance.HANGING_OUT, instance.HUNGRY)
machine.add_transition('eat', instance.HUNGRY, instance.HANGING_OUT)
transitions
memory footprint is too large for my Django app and adding models takes too long.¶We analyzed the memory footprint of transitions
in this discussion and could verify that the standard approach is not suitable to handle thousands of models. However, with a static (class) machine and some __getattribute__
tweaking we can keep the convenience loss minimal:
from transitions import Machine
from functools import partial
from mock import MagicMock
class Model(object):
machine = Machine(model=None, states=['A', 'B', 'C'], initial=None,
transitions=[
{'trigger': 'go', 'source': 'A', 'dest': 'B', 'before': 'before'},
{'trigger': 'check', 'source': 'B', 'dest': 'C', 'conditions': 'is_large'},
], finalize_event='finalize')
def __init__(self):
self.state = 'A'
self.before = MagicMock()
self.after = MagicMock()
self.finalize = MagicMock()
@staticmethod
def is_large(value=0):
return value > 9000
def __getattribute__(self, item):
try:
return super(Model, self).__getattribute__(item)
except AttributeError:
if item in self.machine.events:
return partial(self.machine.events[item].trigger, self)
raise
model = Model()
model.go()
assert model.state == 'B'
assert model.before.called
assert model.finalize.called
model.check()
assert model.state == 'B'
model.check(value=500)
assert model.state == 'B'
model.check(value=9001)
assert model.state == 'C'
assert model.finalize.call_count == 4
You lose model.is_<state>
convenience functions and the ability to add callbacks such as Model.on_enter_<state>
automatically. However, the second limitation can be tackled with dynamic resolution in states as pointed out by mvanderlee here:
from transitions import State
import logging
logger = logging.getLogger(__name__)
class DynamicState(State):
""" Need to dynamically get the on_enter and on_exit callbacks since the
model can not be registered to the Machine due to Memory limitations
"""
def enter(self, event_data):
""" Triggered when a state is entered. """
logger.debug("%sEntering state %s. Processing callbacks...", event_data.machine.name, self.name)
if hasattr(event_data.model, f'on_enter_{self.name}'):
event_data.machine.callbacks([getattr(event_data.model, f'on_enter_{self.name}')], event_data)
logger.info("%sFinished processing state %s enter callbacks.", event_data.machine.name, self.name)
def exit(self, event_data):
""" Triggered when a state is exited. """
logger.debug("%sExiting state %s. Processing callbacks...", event_data.machine.name, self.name)
if hasattr(event_data.model, f'on_exit_{self.name}'):
event_data.machine.callbacks([getattr(event_data.model, f'on_exit_{self.name}')], event_data)
logger.info("%sFinished processing state %s exit callbacks.", event_data.machine.name, self.name)
class DynamicMachine(Machine):
"""Required to use DynamicState"""
state_cls = DynamicState
from transitions.core import Machine, State, Event, EventData, listify
class DuringState(State):
# add `on_during` to the dynamic callback methods
# this way on_during_<state> can be recognized by `Machine`
dynamic_methods = State.dynamic_methods + ['on_during']
# parse 'during' and remove the keyword before passing the rest along to state
def __init__(self, *args, **kwargs):
during = kwargs.pop('during', [])
self.on_during = listify(during)
super(DuringState, self).__init__(*args, **kwargs)
def during(self, event_data):
for handle in self.on_during:
event_data.machine.callback(handle, event_data)
class DuringEvent(Event):
def _trigger(self, model, *args, **kwargs):
# a successful transition returns `res=True` if res is False, we know that
# no transition has been executed
res = super(DuringEvent, self)._trigger(model, *args, **kwargs)
if res is False:
state = self.machine.get_state(model.state)
event_data = EventData(state, self, self.machine, model, args=args, kwargs=kwargs)
event_data.result = res
state.during(event_data)
return res
class DuringMachine(Machine):
# we need to override the state and event classes used by `Machine`
state_cls = DuringState
event_cls = DuringEvent
class Model:
def on_during_A(self):
print("Dynamically assigned callback")
def another_callback(self):
print("Explicitly assigned callback")
model = Model()
machine = DuringMachine(model=model, states=[{'name': 'A', 'during': 'another_callback'}, 'B'],
transitions=[['go', 'B', 'A']], initial='A', ignore_invalid_triggers=True)
machine.add_transition('test', source='A', dest='A', conditions=lambda: False)
assert not model.go()
assert not model.test()
from transitions import Machine, Transition
from six import string_types
class DependingTransition(Transition):
def __init__(self, source, dest, conditions=None, unless=None, before=None,
after=None, prepare=None, **kwargs):
self._result = self._dest = None
super(DependingTransition, self).__init__(source, dest, conditions, unless, before, after, prepare)
if isinstance(dest, dict):
try:
self._func = kwargs.pop('depends_on')
except KeyError:
raise AttributeError("A multi-destination transition requires a 'depends_on'")
else:
# use base version in case transition does not need special handling
self.execute = super(DependingTransition, self).execute
def execute(self, event_data):
func = getattr(event_data.model, self._func) if isinstance(self._func, string_types) \
else self._func
self._result = func(*event_data.args, **event_data.kwargs)
super(DependingTransition, self).execute(event_data)
@property
def dest(self):
return self._dest[self._result] if self._result is not None else self._dest
@dest.setter
def dest(self, value):
self._dest = value
# subclass Machine to use DependingTransition instead of standard Transition
class DependingMachine(Machine):
transition_cls = DependingTransition
def func(value):
return value
m = DependingMachine(states=['A', 'B', 'C', 'D'], initial='A')
# define a dynamic transition with a 'depends_on' function which will return the required value
m.add_transition(trigger='shuffle', source='A', dest=({1: 'B', 2: 'C', 3: 'D'}), depends_on=func)
m.shuffle(value=2) # func returns 2 which makes the transition dest to be 'C'
assert m.is_C()
Note that this solution has some drawbacks. For instance, the generated graph might not include all possible outcomes.
Machine.get_triggers
should only show valid transitions based on some conditions.¶This has been requested here. Machine.get_triggers
is usually quite naive and only checks for theoretically possible transitions. If you need more sophisticated peeking, this PeekMachine._can_trigger
might be a solution:
from transitions import Machine, EventData
from transitions.core import listify
from functools import partial
class Model(object):
def fails(self, condition=False):
return False
def success(self, condition=False):
return True
# condition is passed by EventData
def depends_on(self, condition=False):
return condition
def is_state_B(self, condition=False):
return self.state == 'B'
class PeekMachine(Machine):
def _can_trigger(self, model, *args, **kwargs):
# We can omit the first two arguments state and event since they are only needed for
# actual state transitions. We do have to pass the machine (self) and the model as well as
# args and kwargs meant for the callbacks.
e = EventData(None, None, self, model, args, kwargs)
return [trigger_name for trigger_name in self.get_triggers(model.state)
if any(all(c.check(e) for c in t.conditions)
for t in self.events[trigger_name].transitions[model.state])]
# override Machine.add_model to assign 'can_trigger' to the model
def add_model(self, model, initial=None):
for mod in listify(model):
mod = self if mod is self.self_literal else mod
if mod not in self.models:
setattr(mod, 'can_trigger', partial(self._can_trigger, mod))
super(PeekMachine, self).add_model(mod, initial)
states = ['A', 'B', 'C', 'D']
transitions = [
dict(trigger='go_A', source='*', dest='A', conditions=['depends_on']), # only available when condition=True is passed
dict(trigger='go_B', source='*', dest='B', conditions=['success']), # always available
dict(trigger='go_C', source='*', dest='C', conditions=['fails']), # never available
dict(trigger='go_D', source='*', dest='D', conditions=['is_state_B']), # only available in state B
dict(trigger='reset', source='D', dest='A', conditions=['success', 'depends_on']), # only available in state D when condition=True is passed
dict(trigger='forwards', source='A', dest='D', conditions=['success', 'fails']), # never available
dict(trigger='forwards', source='D', dest='D', unless=['depends_on'])
]
model = Model()
machine = PeekMachine(model, states=states, transitions=transitions, initial='A', auto_transitions=False)
assert model.can_trigger() == ['go_B']
assert set(model.can_trigger(condition=True)) == set(['go_A', 'go_B'])
model.go_B(condition=True)
assert set(model.can_trigger()) == set(['go_B', 'go_D'])
model.go_D()
assert model.can_trigger() == ['go_B', 'forwards']
assert set(model.can_trigger(condition=True)) == set(['go_A', 'go_B', 'reset'])
There is a high chance that your model already contained a trigger
method or methods with the same name as your even trigger. In this case, transitions
will not add convenience methods to not accidentaly break your model and only emit a warning. If you defined these methods on purpose and want them to be overrided or maybe even call both -- the trigger event AND your predefined method, you can extend/override Machine._checked_assignment
which is always called when something needs to be added to a model:
from transitions import State, Machine
class StateMachineModel:
state = None
def __init__(self):
pass
def transition_one(self):
print('transitioning states...')
def transition_two(self):
print('transitioning states...')
class OverrideMachine(Machine):
def _checked_assignment(self, model, name, func):
setattr(model, name, func)
class CallingMachine(Machine):
def _checked_assignment(self, model, name, func):
if hasattr(model, name):
predefined_func = getattr(model, name)
def nested_func(*args, **kwargs):
predefined_func()
func(*args, **kwargs)
setattr(model, name, nested_func)
else:
setattr(model, name, func)
states = [State(name='A'), State(name='B'), State(name='C'), State(name='D')]
transitions = [
{'trigger': 'transition_one', 'source': 'A', 'dest': 'B'},
{'trigger': 'transition_two', 'source': 'B', 'dest': 'C'},
{'trigger': 'transition_three', 'source': 'C', 'dest': 'D'}
]
state_machine_model = StateMachineModel()
print('OverrideMachine ...')
state_machine = OverrideMachine(model=state_machine_model, states=states, transitions=transitions, initial=states[0])
print('state_machine_model (current state): %s' % state_machine_model.state)
state_machine_model.transition_one()
print('state_machine_model (current state): %s' % state_machine_model.state)
state_machine_model.transition_two()
print('state_machine_model (current state): %s' % state_machine_model.state)
print('\nCallingMachine ...')
state_machine_model = StateMachineModel()
state_machine = CallingMachine(model=state_machine_model, states=states, transitions=transitions, initial=states[0])
print('state_machine_model (current state): %s' % state_machine_model.state)
state_machine_model.transition_one()
print('state_machine_model (current state): %s' % state_machine_model.state)
state_machine_model.transition_two()
print('state_machine_model (current state): %s' % state_machine_model.state)
A common use case involves multiple machines where one machine should react to events emitted by the other(s).
Sometimes this involves 'waiting' as in 'all machines are triggered at the same time but machine 1 needs to wait until
machine 2 is ready'. Machine
will process callbacks sequentially. Thus, if your callbacks contain passages like this
class Model:
def on_enter_state(self):
while not event:
time.sleep(1)
it is very likely that event
will never happen because the callback will block the event processing forever.
Bad news first: there is no one-fits-all-solution for this kind of problem. Now the good news: There is a solution that fits many use cases. An event bus! We consider transitions to be events that can be emitted by user input, system events or other machines.
The event bus approach decouples the need of individual machines to know each other. They communicate via events. Thus, we can model quite complex inter-dependent behaviour without threading or asynchronous processing. Furthermore, other components do not need to know which machine processes which event, they can broadcast the message on the bus and rest assured that whoever is interested in the event will get it. The challenge is to wrap one's head around the concept of modelling transitions as events rather than actions to be conducted.
Since we expect events to be emitted in callbacks, and we also expect that not every event bus member will be able to
process every event sent across the bus, we pass queued=True
(every machine processes one event at a time) and
ignore_invalid_triggers=True
(when the event cannot be triggered from the current state or is unknown, ignore it).
from transitions import Machine
import logging
class EventBus:
def __init__(self):
self.members = []
def add_member(self, member):
"""Member can be a model or a machine acting as a model"""
# We decorate each member with an 'emit' function to fire events.
# EventBus will then broadcast that event to ALL members, including the one that triggered the event.
# Furthermore, we can pass a payload in case there is data that needs to be sent with an event.
setattr(member, 'emit', self.broadcast)
self.members.append(member)
def broadcast(self, event, payload=None):
for member in self.members:
member.trigger(event, payload)
# Our machines can either be off or started
states = ['off', 'started']
class Machine1(Machine):
# this machine can only boot once.
transitions = [['boot', 'off', 'started']]
def __init__(self):
# we pass 'ignore_invalid_triggers' since a machine on an event bus might get events it cannot process
# right now and we do not want to throw an exception every time that happens.
# Furthermore, we will set 'queued=True' to process events sequentially instead of nested.
super(Machine1, self).__init__(states=states, transitions=self.transitions,
ignore_invalid_triggers=True, initial='off', queued=True)
def on_enter_started(self, payload=None):
print("Starting successful")
# We emit out start event and attach ourselves as payload just in case
self.emit("Machine1Started", self)
class Machine2(Machine):
# This machine can also reboot (boot from every state) but only when the 'ready' flag has been set.
# 'ready' is set once the event 'Machine1Started' has been processed (before the transition is from 'off' to 'on'
# is actually executed). Furthermore, we will also boot the machine when we catch that event.
transitions = [{'trigger': 'boot', 'source': '*', 'dest': 'started', 'conditions': 'ready'},
{'trigger': 'Machine1Started', 'source': 'off', 'dest': 'started', 'before': 'on_machine1_started'}]
def __init__(self):
super(Machine2, self).__init__(states=states, transitions=self.transitions,
ignore_invalid_triggers=True, initial='off', queued=True)
self._ready = False
# Callbacks also work with properties. Passing the string 'ready' will evaluate this property
@property
def ready(self):
return self._ready
@ready.setter
def ready(self, value):
self._ready = value
def on_machine1_started(self, payload=None):
self.ready = True
print("I am ready now!")
def on_enter_started(self, payload=None):
print("Booting successful")
logging.basicConfig(level=logging.DEBUG)
bus = EventBus()
machine1 = Machine1()
machine2 = Machine2()
bus.add_member(machine2)
bus.add_member(machine1)
bus.broadcast('boot')
# what will happen:
# - bus will broadcast 'boot' event to machine2
# - machine2 will attempt to boot but fail and return since ready is set to false
# - bus will broadcast 'boot' event to machine1
# - machine1 will boot and emit the 'Machine1Started'
# - bus will broadcast 'Machine1Started' to machine2
# - machine2 will handle the event, boot and return
# - bus will broadcast 'Machine1Started' to machine1
# - machine1 will add that event to its event queue
# - bus broadcast of 'Machine1Started' returns
# - machine1 is done with handling 'boot' and process the next event in the event queue
# - machine1 cannot handle 'Machine1Started' and will ignore it
# - bus broadcast of 'boot' returns
assert machine1.state == machine2.state
bus.broadcast('boot')
# broadcast 'boot' event to all members:
# - machine2 will reboot
# - machine1 won't do anything
If you consider this too much boilerplate and you don't mind some dependencies and less generalization
you can of course go the leaner asynchronous or threaded route and process your callbacks in parallel.
Having while event
loops as mentioned above in async
callbacks is not uncommon. You should consider, however, that
the execution order of callbacks as described in the README is kept for AsyncMachine
as well.
All callbacks of one stage (e.g. prepare
) must return before callbacks of the next state (e.g. conditions
) are triggered.
from transitions.extensions.asyncio import AsyncMachine
import asyncio
states = ['off', 'started']
class Machine1(AsyncMachine):
transitions = [{'trigger': 'boot', 'source': 'off', 'dest': 'started', 'before': 'heavy_processing'}]
def __init__(self):
super(Machine1, self).__init__(states=states, transitions=self.transitions, initial='off')
async def heavy_processing(self):
# we need to do some heavy lifting before we can proceed with booting
await asyncio.sleep(0.5)
print("Processing done!")
class Machine2(AsyncMachine):
transitions = [['boot', 'off', 'started']]
def __init__(self, dependency):
super(Machine2, self).__init__(states=states, transitions=self.transitions, initial='off')
self.dependency = dependency
async def on_enter_started(self):
while not self.dependency.is_started():
print("Waiting for dependency to be ready...")
await asyncio.sleep(0.1)
print("Machine2 up and running")
machine1 = Machine1()
machine2 = Machine2(machine1)
asyncio.get_event_loop().run_until_complete(asyncio.gather(machine1.boot(), machine2.boot()))
assert machine1.state == machine2.state
There are multiple ways to alter a graph's layout. Some are only available when you chose pygraphviz
as backend. You can [1] edit GraphMachine.machine_attributes
for Graph attributes such as flow direction and rank separation or GraphMachine.style_attributes
for node and edge properties. The easiest way is to deepcopy the standard values and override properties you would like to change. Furthermore you can [2] work directly with the returned pygraphviz.AGraph object that model.get_graph()
returns. However, values set via edge_attr
and node_attr
will be overriden by GraphMachine.style_attributes if they haven't been reset. AGraph.draw
features args
as a parameter to pass arguments to Graphviz layout engine directly. Eventually, you can also get the raw dot representation (e.g. AGraph.string()
) as a string. Some styling examples can be found in the Graph MIxin Demo notebook.
from copy import deepcopy
from transitions.extensions import GraphMachine
from collections import defaultdict
states = ['A', 'B', 'C', 'D']
transitions = [
{'trigger': 'go', 'source': 'A', 'dest': 'B'},
{'trigger': 'go', 'source': 'B', 'dest': 'C'},
{'trigger': 'back', 'source': 'C', 'dest': 'B'},
{'trigger': 'back', 'source': 'B', 'dest': 'A'},
{'trigger': 'forward', 'source': 'A', 'dest': 'C'},
{'trigger': 'forward', 'source': 'C', 'dest': 'D'},
{'trigger': 'backward', 'source': 'D', 'dest': 'C'},
{'trigger': 'backward', 'source': 'C', 'dest': 'A'}
]
class CustomGraphMachine(GraphMachine):
# Override Graphmachine's default styling for nodes and edges [1]
# Copy default graph attributes but change direction to top to bottom
machine_attributes = deepcopy(GraphMachine.machine_attributes)
machine_attributes["rankdir"] = "TB" # Left to right layout
# Reset styling
style_attributes = defaultdict(dict)
style_attributes["node"]["default"] = { "fontname": "arial", "shape": "circle"}
style_attributes["edge"]["default"] = { "fontname": "arial" }
machine = CustomGraphMachine(states=states, transitions=transitions, initial='A', title="State Diagram")
graph = machine.get_graph()
# directly modify the graph's attributes (works only with pygraphviz) [2]
# note: this will only work for attributes that haven't been set by the GraphMachine
graph.node_attr.update(fillcolor="turquoise")
graph.edge_attr.update(color="blue")
# modify the attributes of a specific edge
graph.edges(["A", "B"])[0].attr.update(color="red")
# pass additional arguments to the graphviz layout engine via args [3]
graph.draw('state_diagram.png', prog='dot', args='-Gnodesep=1')
# get the graphs source code [4]
source = graph.string()