This Jupyter notebook acts as supporting material for topics covered in Chapter 17 Making Complex Decisions of the book* Artificial Intelligence: A Modern Approach*. We make use of the implementations in mdp.py module. This notebook also includes a brief summary of the main topics as a review. Let us import everything from the mdp module to get started.
from mdp import *
from notebook import psource, pseudocode, plot_pomdp_utility
Before we start playing with the actual implementations let us review a couple of things about MDPs.
A stochastic process has the Markov property if the conditional probability distribution of future states of the process (conditional on both past and present states) depends only upon the present state, not on the sequence of events that preceded it.
-- Source: Wikipedia
Often it is possible to model many different phenomena as a Markov process by being flexible with our definition of state.
Our overall goal to solve a MDP is to come up with a policy which guides us to select the best action in each state so as to maximize the expected sum of future rewards.
To begin with let us look at the implementation of MDP class defined in mdp.py The docstring tells us what all is required to define a MDP namely - set of states, actions, initial state, transition model, and a reward function. Each of these are implemented as methods. Do not close the popup so that you can follow along the description of code below.
psource(MDP)
class MDP:
"""A Markov Decision Process, defined by an initial state, transition model,
and reward function. We also keep track of a gamma value, for use by
algorithms. The transition model is represented somewhat differently from
the text. Instead of P(s' | s, a) being a probability number for each
state/state/action triplet, we instead have T(s, a) return a
list of (p, s') pairs. We also keep track of the possible states,
terminal states, and actions for each state. [page 646]"""
def __init__(self, init, actlist, terminals, transitions = {}, reward = None, states=None, gamma=.9):
if not (0 < gamma <= 1):
raise ValueError("An MDP must have 0 < gamma <= 1")
if states:
self.states = states
else:
## collect states from transitions table
self.states = self.get_states_from_transitions(transitions)
self.init = init
if isinstance(actlist, list):
## if actlist is a list, all states have the same actions
self.actlist = actlist
elif isinstance(actlist, dict):
## if actlist is a dict, different actions for each state
self.actlist = actlist
self.terminals = terminals
self.transitions = transitions
if self.transitions == {}:
print("Warning: Transition table is empty.")
self.gamma = gamma
if reward:
self.reward = reward
else:
self.reward = {s : 0 for s in self.states}
#self.check_consistency()
def R(self, state):
"""Return a numeric reward for this state."""
return self.reward[state]
def T(self, state, action):
"""Transition model. From a state and an action, return a list
of (probability, result-state) pairs."""
if(self.transitions == {}):
raise ValueError("Transition model is missing")
else:
return self.transitions[state][action]
def actions(self, state):
"""Set of actions that can be performed in this state. By default, a
fixed list of actions, except for terminal states. Override this
method if you need to specialize by state."""
if state in self.terminals:
return [None]
else:
return self.actlist
def get_states_from_transitions(self, transitions):
if isinstance(transitions, dict):
s1 = set(transitions.keys())
s2 = set([tr[1] for actions in transitions.values()
for effects in actions.values() for tr in effects])
return s1.union(s2)
else:
print('Could not retrieve states from transitions')
return None
def check_consistency(self):
# check that all states in transitions are valid
assert set(self.states) == self.get_states_from_transitions(self.transitions)
# check that init is a valid state
assert self.init in self.states
# check reward for each state
#assert set(self.reward.keys()) == set(self.states)
assert set(self.reward.keys()) == set(self.states)
# check that all terminals are valid states
assert all([t in self.states for t in self.terminals])
# check that probability distributions for all actions sum to 1
for s1, actions in self.transitions.items():
for a in actions.keys():
s = 0
for o in actions[a]:
s += o[0]
assert abs(s - 1) < 0.001
The _ init _ method takes in the following parameters:
R method returns the reward for each state by using the self.reward dict.
T method is not implemented and is somewhat different from the text. Here we return (probability, s') pairs where s' belongs to list of possible state by taking action a in state s.
actions method returns list of actions possible in each state. By default it returns all actions for states other than terminal states.
Now let us implement the simple MDP in the image below. States A, B have actions X, Y available in them. Their probabilities are shown just above the arrows. We start with using MDP as base class for our CustomMDP. Obviously we need to make a few changes to suit our case. We make use of a transition matrix as our transitions are not very simple.
# Transition Matrix as nested dict. State -> Actions in state -> List of (Probability, State) tuples
t = {
"A": {
"X": [(0.3, "A"), (0.7, "B")],
"Y": [(1.0, "A")]
},
"B": {
"X": {(0.8, "End"), (0.2, "B")},
"Y": {(1.0, "A")}
},
"End": {}
}
init = "A"
terminals = ["End"]
rewards = {
"A": 5,
"B": -10,
"End": 100
}
class CustomMDP(MDP):
def __init__(self, init, terminals, transition_matrix, reward = None, gamma=.9):
# All possible actions.
actlist = []
for state in transition_matrix.keys():
actlist.extend(transition_matrix[state])
actlist = list(set(actlist))
MDP.__init__(self, init, actlist, terminals, transition_matrix, reward, gamma=gamma)
def T(self, state, action):
if action is None:
return [(0.0, state)]
else:
return self.t[state][action]
Finally we instantize the class with the parameters for our MDP in the picture.
our_mdp = CustomMDP(init, terminals, t, rewards, gamma=.9)
With this we have successfully represented our MDP. Later we will look at ways to solve this MDP.
Now we look at a concrete implementation that makes use of the MDP as base class. The GridMDP class in the mdp module is used to represent a grid world MDP like the one shown in in Fig 17.1 of the AIMA Book. We assume for now that the environment is fully observable, so that the agent always knows where it is. The code should be easy to understand if you have gone through the CustomMDP example.
psource(GridMDP)
class GridMDP(MDP):
"""A two-dimensional grid MDP, as in [Figure 17.1]. All you have to do is
specify the grid as a list of lists of rewards; use None for an obstacle
(unreachable state). Also, you should specify the terminal states.
An action is an (x, y) unit vector; e.g. (1, 0) means move east."""
def __init__(self, grid, terminals, init=(0, 0), gamma=.9):
grid.reverse() # because we want row 0 on bottom, not on top
reward = {}
states = set()
self.rows = len(grid)
self.cols = len(grid[0])
self.grid = grid
for x in range(self.cols):
for y in range(self.rows):
if grid[y][x] is not None:
states.add((x, y))
reward[(x, y)] = grid[y][x]
self.states = states
actlist = orientations
transitions = {}
for s in states:
transitions[s] = {}
for a in actlist:
transitions[s][a] = self.calculate_T(s, a)
MDP.__init__(self, init, actlist=actlist,
terminals=terminals, transitions = transitions,
reward = reward, states = states, gamma=gamma)
def calculate_T(self, state, action):
if action is None:
return [(0.0, state)]
else:
return [(0.8, self.go(state, action)),
(0.1, self.go(state, turn_right(action))),
(0.1, self.go(state, turn_left(action)))]
def T(self, state, action):
if action is None:
return [(0.0, state)]
else:
return self.transitions[state][action]
def go(self, state, direction):
"""Return the state that results from going in this direction."""
state1 = vector_add(state, direction)
return state1 if state1 in self.states else state
def to_grid(self, mapping):
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""
return list(reversed([[mapping.get((x, y), None)
for x in range(self.cols)]
for y in range(self.rows)]))
def to_arrows(self, policy):
chars = {
(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}
return self.to_grid({s: chars[a] for (s, a) in policy.items()})
The _ init _ method takes grid as an extra parameter compared to the MDP class. The grid is a nested list of rewards in states.
go method returns the state by going in particular direction by using vector_add.
T method is not implemented and is somewhat different from the text. Here we return (probability, s') pairs where s' belongs to list of possible state by taking action a in state s.
actions method returns list of actions possible in each state. By default it returns all actions for states other than terminal states.
to_arrows are used for representing the policy in a grid like format.
We can create a GridMDP like the one in Fig 17.1 as follows:
GridMDP([[-0.04, -0.04, -0.04, +1],
[-0.04, None, -0.04, -1],
[-0.04, -0.04, -0.04, -0.04]],
terminals=[(3, 2), (3, 1)])
In fact the sequential_decision_environment in mdp module has been instantized using the exact same code.
sequential_decision_environment
<mdp.GridMDP at 0x1d384953fd0>
Now that we have looked how to represent MDPs. Let's aim at solving them. Our ultimate goal is to obtain an optimal policy. We start with looking at Value Iteration and a visualisation that should help us understanding it better.
We start by calculating Value/Utility for each of the states. The Value of each state is the expected sum of discounted future rewards given we start in that state and follow a particular policy $\pi$. The value or the utility of a state is given by
$$U(s)=R(s)+\gamma\max_{a\epsilon A(s)}\sum_{s'} P(s'\ |\ s,a)U(s')$$This is called the Bellman equation. The algorithm Value Iteration (Fig. 17.4 in the book) relies on finding solutions of this Equation. The intuition Value Iteration works is because values propagate through the state space by means of local updates. This point will we more clear after we encounter the visualisation. For more information you can refer to Section 17.2 of the book.
psource(value_iteration)
def value_iteration(mdp, epsilon=0.001):
"""Solving an MDP by value iteration. [Figure 17.4]"""
U1 = {s: 0 for s in mdp.states}
R, T, gamma = mdp.R, mdp.T, mdp.gamma
while True:
U = U1.copy()
delta = 0
for s in mdp.states:
U1[s] = R(s) + gamma * max([sum([p * U[s1] for (p, s1) in T(s, a)])
for a in mdp.actions(s)])
delta = max(delta, abs(U1[s] - U[s]))
if delta < epsilon * (1 - gamma) / gamma:
return U
It takes as inputs two parameters, an MDP to solve and epsilon, the maximum error allowed in the utility of any state. It returns a dictionary containing utilities where the keys are the states and values represent utilities.
Value Iteration starts with arbitrary initial values for the utilities, calculates the right side of the Bellman equation and plugs it into the left hand side, thereby updating the utility of each state from the utilities of its neighbors.
This is repeated until equilibrium is reached.
It works on the principle of Dynamic Programming - using precomputed information to simplify the subsequent computation.
If $U_i(s)$ is the utility value for state $s$ at the $i$ th iteration, the iteration step, called Bellman update, looks like this:
As you might have noticed, value_iteration
has an infinite loop. How do we decide when to stop iterating?
The concept of contraction successfully explains the convergence of value iteration.
Refer to Section 17.2.3 of the book for a detailed explanation.
In the algorithm, we calculate a value $delta$ that measures the difference in the utilities of the current time step and the previous time step.
This value of delta decreases as the values of $U_i$ converge. We terminate the algorithm if the $\delta$ value is less than a threshold value determined by the hyperparameter epsilon.
$$\delta \lt \epsilon \frac{(1 - \gamma)}{\gamma}$$To summarize, the Bellman update is a contraction by a factor of $gamma$ on the space of utility vectors.
Hence, from the properties of contractions in general, it follows that value_iteration
always converges to a unique solution of the Bellman equations whenever $gamma$ is less than 1.
We then terminate the algorithm when a reasonable approximation is achieved.
In practice, it often occurs that the policy $pi$ becomes optimal long before the utility function converges. For the given 4 x 3 environment with $gamma = 0.9$, the policy $pi$ is optimal when $i = 4$ (at the 4th iteration), even though the maximum error in the utility function is stil 0.46. This can be clarified from figure 17.6 in the book. Hence, to increase computational efficiency, we often use another method to solve MDPs called Policy Iteration which we will see in the later part of this notebook.
For now, let us solve the sequential_decision_environment GridMDP using value_iteration
.
value_iteration(sequential_decision_environment)
{(0, 0): 0.2962883154554812, (0, 1): 0.3984432178350045, (0, 2): 0.5093943765842497, (1, 0): 0.25386699846479516, (1, 2): 0.649585681261095, (2, 0): 0.3447542300124158, (2, 1): 0.48644001739269643, (2, 2): 0.7953620878466678, (3, 0): 0.12987274656746342, (3, 1): -1.0, (3, 2): 1.0}
The pseudocode for the algorithm:
pseudocode("Value-Iteration")
function VALUE-ITERATION(mdp, ε) returns a utility function
inputs: mdp, an MDP with states S, actions A(s), transition model P(s′ | s, a),
rewards R(s), discount γ
ε, the maximum error allowed in the utility of any state
local variables: U, U′, vectors of utilities for states in S, initially zero
δ, the maximum change in the utility of any state in an iteration
repeat
U ← U′; δ ← 0
for each state s in S do
U′[s] ← R(s) + γ maxa ∈ A(s) Σ P(s′ | s, a) U[s′]
if | U′[s] − U[s] | > δ then δ ← | U′[s] − U[s] |
until δ < ε(1 − γ)/γ
return U
Figure ?? The value iteration algorithm for calculating utilities of states. The termination condition is from Equation (??).
function VALUE-ITERATION(mdp, ε) returns a utility function
inputs: mdp, an MDP with states S, actions A(s), transition model P(s′ | s, a),
rewards R(s), discount γ
ε, the maximum error allowed in the utility of any state
local variables: U, U′, vectors of utilities for states in S, initially zero
δ, the maximum change in the utility of any state in an iteration
repeat
U ← U′; δ ← 0
for each state s in S do
U′[s] ← R(s) + γ maxa ∈ A(s) Σ P(s′ | s, a) U[s′]
if | U′[s] − U[s] | > δ then δ ← | U′[s] − U[s] |
until δ < ε(1 − γ)/γ
return U
Figure ?? The value iteration algorithm for calculating utilities of states. The termination condition is from Equation (??).
To illustrate that values propagate out of states let us create a simple visualisation. We will be using a modified version of the value_iteration function which will store U over time. We will also remove the parameter epsilon and instead add the number of iterations we want.
def value_iteration_instru(mdp, iterations=20):
U_over_time = []
U1 = {s: 0 for s in mdp.states}
R, T, gamma = mdp.R, mdp.T, mdp.gamma
for _ in range(iterations):
U = U1.copy()
for s in mdp.states:
U1[s] = R(s) + gamma * max([sum([p * U[s1] for (p, s1) in T(s, a)])
for a in mdp.actions(s)])
U_over_time.append(U)
return U_over_time
Next, we define a function to create the visualisation from the utilities returned by value_iteration_instru. The reader need not concern himself with the code that immediately follows as it is the usage of Matplotib with IPython Widgets. If you are interested in reading more about these visit ipywidgets.readthedocs.io
columns = 4
rows = 3
U_over_time = value_iteration_instru(sequential_decision_environment)
%matplotlib inline
from notebook import make_plot_grid_step_function
plot_grid_step = make_plot_grid_step_function(columns, rows, U_over_time)
import ipywidgets as widgets
from IPython.display import display
from notebook import make_visualize
iteration_slider = widgets.IntSlider(min=1, max=15, step=1, value=0)
w=widgets.interactive(plot_grid_step,iteration=iteration_slider)
display(w)
visualize_callback = make_visualize(iteration_slider)
visualize_button = widgets.ToggleButton(description = "Visualize", value = False)
time_select = widgets.ToggleButtons(description='Extra Delay:',options=['0', '0.1', '0.2', '0.5', '0.7', '1.0'])
a = widgets.interactive(visualize_callback, Visualize = visualize_button, time_step=time_select)
display(a)
The installed widget Javascript is the wrong version. It must satisfy the semver range ~2.1.4.
Move the slider above to observe how the utility changes across iterations. It is also possible to move the slider using arrow keys or to jump to the value by directly editing the number with a double click. The Visualize Button will automatically animate the slider for you. The Extra Delay Box allows you to set time delay in seconds upto one second for each time step. There is also an interactive editor for grid-world problems grid_mdp.py
in the gui folder for you to play around with.
We have already seen that value iteration converges to the optimal policy long before it accurately estimates the utility function. If one action is clearly better than all the others, then the exact magnitude of the utilities in the states involved need not be precise. The policy iteration algorithm works on this insight. The algorithm executes two fundamental steps:
The algorithm terminates when the policy improvement step yields no change in the utilities. Refer to Figure 17.6 in the book to see how this is an improvement over value iteration. We now have a simplified version of the Bellman equation
$$U_i(s) = R(s) + \gamma \sum_{s'}P(s'\ |\ s, \pi_i(s))U_i(s')$$An important observation in this equation is that this equation doesn't have the max
operator, which makes it linear.
For n states, we have n linear equations with n unknowns, which can be solved exactly in time O(n³).
For more implementational details, have a look at Section 17.3.
Let us now look at how the expected utility is found and how policy_iteration
is implemented.
psource(expected_utility)
def expected_utility(a, s, U, mdp):
"""The expected utility of doing a in state s, according to the MDP and U."""
return sum([p * U[s1] for (p, s1) in mdp.T(s, a)])
psource(policy_iteration)
def policy_iteration(mdp):
"""Solve an MDP by policy iteration [Figure 17.7]"""
U = {s: 0 for s in mdp.states}
pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
while True:
U = policy_evaluation(pi, U, mdp)
unchanged = True
for s in mdp.states:
a = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
if a != pi[s]:
pi[s] = a
unchanged = False
if unchanged:
return pi
Fortunately, it is not necessary to do exact policy evaluation.
The utilities can instead be reasonably approximated by performing some number of simplified value iteration steps.
The simplified Bellman update equation for the process is
and this is repeated k times to produce the next utility estimate. This is called modified policy iteration.
psource(policy_evaluation)
def policy_evaluation(pi, U, mdp, k=20):
"""Return an updated utility mapping U from each state in the MDP to its
utility, using an approximation (modified policy iteration)."""
R, T, gamma = mdp.R, mdp.T, mdp.gamma
for i in range(k):
for s in mdp.states:
U[s] = R(s) + gamma * sum([p * U[s1] for (p, s1) in T(s, pi[s])])
return U
Let us now solve sequential_decision_environment
using policy_iteration
.
policy_iteration(sequential_decision_environment)
{(0, 0): (0, 1), (0, 1): (0, 1), (0, 2): (1, 0), (1, 0): (1, 0), (1, 2): (1, 0), (2, 0): (0, 1), (2, 1): (0, 1), (2, 2): (1, 0), (3, 0): (-1, 0), (3, 1): None, (3, 2): None}
pseudocode('Policy-Iteration')
function POLICY-ITERATION(mdp) returns a policy
inputs: mdp, an MDP with states S, actions A(s), transition model P(s′ | s, a)
local variables: U, a vector of utilities for states in S, initially zero
π, a policy vector indexed by state, initially random
repeat
U ← POLICY-EVALUATION(π, U, mdp)
unchanged? ← true
for each state s in S do
if maxa ∈ A(s) Σs′ P(s′ | s, a) U[s′] > Σs′ P(s′ | s, π[s]) U[s′] then do
π[s] ← argmaxa ∈ A(s) Σs′ P(s′ | s, a) U[s′]
unchanged? ← false
until unchanged?
return π
Figure ?? The policy iteration algorithm for calculating an optimal policy.
function POLICY-ITERATION(mdp) returns a policy
inputs: mdp, an MDP with states S, actions A(s), transition model P(s′ | s, a)
local variables: U, a vector of utilities for states in S, initially zero
π, a policy vector indexed by state, initially random
repeat
U ← POLICY-EVALUATION(π, U, mdp)
unchanged? ← true
for each state s in S do
if maxa ∈ A(s) Σs′ P(s′ | s, a) U[s′] > Σs′ P(s′ | s, π[s]) U[s′] then do
π[s] ← argmaxa ∈ A(s) Σs′ P(s′ | s, a) U[s′]
unchanged? ← false
until unchanged?
return π
Figure ?? The policy iteration algorithm for calculating an optimal policy.
Now that we have the tools required to solve MDPs, let us see how Sequential Decision Problems can be solved step by step and how a few built-in tools in the GridMDP class help us better analyse the problem at hand.
As always, we will work with the grid world from Figure 17.1 from the book.
This is the environment for our agent.
We assume for now that the environment is fully observable, so that the agent always knows where it is.
We also assume that the transitions are Markovian, that is, the probability of reaching state $s'$ from state $s$ depends only on $s$ and not on the history of earlier states.
Almost all stochastic decision problems can be reframed as a Markov Decision Process just by tweaking the definition of a state for that particular problem.
These properties of the agent are called the transition properties and are hardcoded into the GridMDP class as you can see below.
psource(GridMDP.T)
def T(self, state, action):
if action is None:
return [(0.0, state)]
else:
return self.transitions[state][action]
To completely define our task environment, we need to specify the utility function for the agent. This is the function that gives the agent a rough estimate of how good being in a particular state is, or how much reward an agent receives by being in that state. The agent then tries to maximize the reward it gets. As the decision problem is sequential, the utility function will depend on a sequence of states rather than on a single state. For now, we simply stipulate that in each state $s$, the agent receives a finite reward $R(s)$.
For any given state, the actions the agent can take are encoded as given below:
None
We now wonder what a valid solution to the problem might look like. We cannot have fixed action sequences as the environment is stochastic and we can eventually end up in an undesirable state. Therefore, a solution must specify what the agent shoulddo for any state the agent might reach.
psource(GridMDP.to_arrows)
def to_arrows(self, policy):
chars = {
(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}
return self.to_grid({s: chars[a] for (s, a) in policy.items()})
This method directly encodes the actions that the agent can take (described above) to characters representing arrows and shows it in a grid format for human visalization purposes.
It converts the received policy from a dictionary
to a grid using the to_grid
method.
psource(GridMDP.to_grid)
def to_grid(self, mapping):
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""
return list(reversed([[mapping.get((x, y), None)
for x in range(self.cols)]
for y in range(self.rows)]))
Now that we have all the tools required and a good understanding of the agent and the environment, we consider some cases and see how the agent should behave for each case.
# Note that this environment is also initialized in mdp.py by default
sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1],
[-0.04, None, -0.04, -1],
[-0.04, -0.04, -0.04, -0.04]],
terminals=[(3, 2), (3, 1)])
We will use the best_policy
function to find the best policy for this environment.
But, as you can see, best_policy
requires a utility function as well.
We already know that the utility function can be found by value_iteration
.
Hence, our best policy is:
pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .001))
We can now use the to_arrows
method to see how our agent should pick its actions in the environment.
from utils import print_table
print_table(sequential_decision_environment.to_arrows(pi))
> > > . ^ None ^ . ^ > ^ <
This is exactly the output we expected
sequential_decision_environment = GridMDP([[-0.4, -0.4, -0.4, +1],
[-0.4, None, -0.4, -1],
[-0.4, -0.4, -0.4, -0.4]],
terminals=[(3, 2), (3, 1)])
pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .001))
from utils import print_table
print_table(sequential_decision_environment.to_arrows(pi))
> > > . ^ None ^ . ^ > ^ <
This is exactly the output we expected
As the reward for each state is now more negative, life is certainly more unpleasant. The agent takes the shortest route to the +1 state and is willing to risk falling into the -1 state by accident.
sequential_decision_environment = GridMDP([[-4, -4, -4, +1],
[-4, None, -4, -1],
[-4, -4, -4, -4]],
terminals=[(3, 2), (3, 1)])
pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .001))
from utils import print_table
print_table(sequential_decision_environment.to_arrows(pi))
> > > . ^ None > . > > > ^
This is exactly the output we expected
The living reward for each state is now lower than the least rewarding terminal. Life is so painful that the agent heads for the nearest exit as even the worst exit is less painful than any living state.
sequential_decision_environment = GridMDP([[4, 4, 4, +1],
[4, None, 4, -1],
[4, 4, 4, 4]],
terminals=[(3, 2), (3, 1)])
pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .001))
from utils import print_table
print_table(sequential_decision_environment.to_arrows(pi))
> > < . > None < . > > > v
In this case, the output we expect is
Partially Observable Markov Decision Problems
In retrospect, a Markov decision process or MDP is defined as:
An MDP consists of a set of states (with an initial state $s_0$); a set $A(s)$ of actions in each state; a transition model $P(s' | s, a)$; and a reward function $R(s)$.
The MDP seeks to make sequential decisions to occupy states so as to maximise some combination of the reward function $R(s)$.
The characteristic problem of the MDP is hence to identify the optimal policy function $\pi^*(s)$ that provides the utility-maximising action $a$ to be taken when the current state is $s$.
Note: The book refers to the belief vector as the belief state. We use the latter terminology here to retain our ability to refer to the belief vector as a probability distribution over states.
The solution of an MDP is subject to certain properties of the problem which are assumed and justified in [Section 17.1]. One critical assumption is that the agent is fully aware of its current state at all times.
A tedious (but rewarding, as we will see) way of expressing this is in terms of the belief vector $b$ of the agent. The belief vector is a function mapping states to probabilities or certainties of being in those states.
Consider an agent that is fully aware that it is in state $s_i$ in the statespace $(s_1, s_2, ... s_n)$ at the current time.
Its belief vector is the vector $(b(s_1), b(s_2), ... b(s_n))$ given by the function $b(s)$: \begin{align*} b(s) &= 0 \quad \text{if }s \neq s_i \\ &= 1 \quad \text{if } s = s_i \end{align*}
Note that $b(s)$ is a probability distribution that necessarily sums to $1$ over all $s$.
The POMDP really has only two modifications to the problem formulation compared to the MDP.
Belief state - In the real world, the current state of an agent is often not known with complete certainty. This makes the concept of a belief vector extremely relevant. It allows the agent to represent different degrees of certainty with which it believes it is in each state.
Evidence percepts - In the real world, agents often have certain kinds of evidence, collected from sensors. They can use the probability distribution of observed evidence, conditional on state, to consolidate their information. This is a known distribution $P(e\ |\ s)$ - $e$ being an evidence, and $s$ being the state it is conditional on.
Consider the world we used for the MDP.
An agent beginning at $(1, 1)$ may not be certain that it is indeed in $(1, 1)$. Consider a belief vector $b$ such that: \begin{align*} b((1,1)) &= 0.8 \\ b((2,1)) &= 0.1 \\ b((1,2)) &= 0.1 \\ b(s) &= 0 \quad \quad \forall \text{ other } s \end{align*}
By horizontally catenating each row, we can represent this as an 11-dimensional vector (omitting $(2, 2)$).
Thus, taking $s_1 = (1, 1)$, $s_2 = (1, 2)$, ... $s_{11} = (4,3)$, we have $b$:
$b = (0.8, 0.1, 0, 0, 0.1, 0, 0, 0, 0, 0, 0)$
This fully represents the certainty to which the agent is aware of its state.
The evidence observed here could be the number of adjacent 'walls' or 'dead ends' observed by the agent. We assume that the agent cannot 'orient' the walls - only count them.
In this case, $e$ can take only two values, 1 and 2. This gives $P(e\ |\ s)$ as: \begin{align*} P(e=2\ |\ s) &= \frac{1}{7} \quad \forall \quad s \in \{s_1, s_2, s_4, s_5, s_8, s_9, s_{11}\}\\ P(e=1\ |\ s) &= \frac{1}{4} \quad \forall \quad s \in \{s_3, s_6, s_7, s_{10}\} \\ P(e\ |\ s) &= 0 \quad \forall \quad \text{ other } s, e \end{align*}
Note that the implications of the evidence on the state must be known a priori to the agent. Ways of reliably learning this distribution from percepts are beyond the scope of this notebook.
A POMDP is thus a sequential decision problem for for a partially observable, stochastic environment with a Markovian transition model, a known 'sensor model' for inferring state from observation, and additive rewards.
Practically, a POMDP has the following, which an MDP also has:
And the following, which an MDP does not:
Additionally, the POMDP is now uncertain of its current state hence has:
It is useful to intuitively appreciate the new uncertainties that have arisen in the agent's awareness of its own state.
FORWARD
function¶The new belief vector $b'(s')$ after an action $a$ on the belief vector $b(s)$ and the noting of evidence $e$ is: $$ b'(s') = \alpha P(e\ |\ s') \sum_s P(s'\ | s, a) b(s)$$
where $\alpha$ is a normalising constant (to retain the interpretation of $b$ as a probability distribution.
This equation is just counts the sum of likelihoods of going to a state $s'$ from every possible state $s$, times the initial likelihood of being in each $s$. This is multiplied by the likelihood that the known evidence actually implies the new state $s'$.
This function is represented as b' = FORWARD(b, a, e)
The goal here is to find $P(b'\ |\ b, a)$ - the probability that action $a$ transforms belief vector $b$ into belief vector $b'$. The following steps illustrate this -
The probability of observing evidence $e$ when action $a$ is enacted on belief vector $b$ can be distributed over each possible new state $s'$ resulting from it: \begin{align*} P(e\ |\ b, a) &= \sum_{s'} P(e\ |\ b, a, s') P(s'\ |\ b, a) \\ &= \sum_{s'} P(e\ |\ s') P(s'\ |\ b, a) \\ &= \sum_{s'} P(e\ |\ s') \sum_s P(s'\ |\ s, a) b(s) \end{align*}
The probability of getting belief vector $b'$ from $b$ by application of action $a$ can thus be summed over all possible evidences $e$: \begin{align*} P(b'\ |\ b, a) &= \sum_{e} P(b'\ |\ b, a, e) P(e\ |\ b, a) \\ &= \sum_{e} P(b'\ |\ b, a, e) \sum_{s'} P(e\ |\ s') \sum_s P(s'\ |\ s, a) b(s) \end{align*}
where $P(b'\ |\ b, a, e) = 1$ if $b' = $ FORWARD(b, a, e)
and $= 0$ otherwise.
Given initial and final belief states $b$ and $b'$, the transition probabilities still depend on the action $a$ and observed evidence $e$. Some belief states may be achievable by certain actions, but have non-zero probabilities for states prohibited by the evidence $e$. Thus, the above condition thus ensures that only valid combinations of $(b', b, a, e)$ are considered.
For MDPs, the reward space was simple - one reward per available state. However, for a belief vector $b(s)$, the expected reward is now: $$\rho(b) = \sum_s b(s) R(s)$$
Thus, as the belief vector can take infinite values of the distribution over states, so can the reward for each belief vector vary over a hyperplane in the belief space, or space of states (planes in an $N$-dimensional space are formed by a linear combination of the axes).
Now that we know the basics, let's have a look at the POMDP
class.
psource(POMDP)
class POMDP(MDP):
"""A Partially Observable Markov Decision Process, defined by
a transition model P(s'|s,a), actions A(s), a reward function R(s),
and a sensor model P(e|s). We also keep track of a gamma value,
for use by algorithms. The transition and the sensor models
are defined as matrices. We also keep track of the possible states
and actions for each state. [page 659]."""
def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95):
"""Initialize variables of the pomdp"""
if not (0 < gamma <= 1):
raise ValueError('A POMDP must have 0 < gamma <= 1')
self.states = states
self.actions = actions
# transition model cannot be undefined
self.t_prob = transitions or {}
if not self.t_prob:
print('Warning: Transition model is undefined')
# sensor model cannot be undefined
self.e_prob = evidences or {}
if not self.e_prob:
print('Warning: Sensor model is undefined')
self.gamma = gamma
self.rewards = rewards
def remove_dominated_plans(self, input_values):
"""
Remove dominated plans.
This method finds all the lines contributing to the
upper surface and removes those which don't.
"""
values = [val for action in input_values for val in input_values[action]]
values.sort(key=lambda x: x[0], reverse=True)
best = [values[0]]
y1_max = max(val[1] for val in values)
tgt = values[0]
prev_b = 0
prev_ix = 0
while tgt[1] != y1_max:
min_b = 1
min_ix = 0
for i in range(prev_ix + 1, len(values)):
if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0:
trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1])
if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b:
min_b = trans_b
min_ix = i
prev_b = min_b
prev_ix = min_ix
tgt = values[min_ix]
best.append(tgt)
return self.generate_mapping(best, input_values)
def remove_dominated_plans_fast(self, input_values):
"""
Remove dominated plans using approximations.
Resamples the upper boundary at intervals of 100 and
finds the maximum values at these points.
"""
values = [val for action in input_values for val in input_values[action]]
values.sort(key=lambda x: x[0], reverse=True)
best = []
sr = 100
for i in range(sr + 1):
x = i / float(sr)
maximum = (values[0][1] - values[0][0]) * x + values[0][0]
tgt = values[0]
for value in values:
val = (value[1] - value[0]) * x + value[0]
if val > maximum:
maximum = val
tgt = value
if all(any(tgt != v) for v in best):
best.append(tgt)
return self.generate_mapping(best, input_values)
def generate_mapping(self, best, input_values):
"""Generate mappings after removing dominated plans"""
mapping = defaultdict(list)
for value in best:
for action in input_values:
if any(all(value == v) for v in input_values[action]):
mapping[action].append(value)
return mapping
def max_difference(self, U1, U2):
"""Find maximum difference between two utility mappings"""
for k, v in U1.items():
sum1 = 0
for element in U1[k]:
sum1 += sum(element)
sum2 = 0
for element in U2[k]:
sum2 += sum(element)
return abs(sum1 - sum2)
The POMDP
class includes all variables of the MDP
class and additionally also stores the sensor model in e_prob
.
To understand how we can model a partially observable MDP, let's take a simple example. Let's consider a simple two state world. The states are labelled 0 and 1, with the reward at state 0 being 0 and at state 1 being 1.
Let's model this POMDP using the POMDP
class.
# transition probability P(s'|s,a)
t_prob = [[[0.9, 0.1], [0.1, 0.9]], [[0.1, 0.9], [0.9, 0.1]]]
# evidence function P(e|s)
e_prob = [[[0.6, 0.4], [0.4, 0.6]], [[0.6, 0.4], [0.4, 0.6]]]
# reward function
rewards = [[0.0, 0.0], [1.0, 1.0]]
# discount factor
gamma = 0.95
# actions
actions = ('0', '1')
# states
states = ('0', '1')
pomdp = POMDP(actions, t_prob, e_prob, rewards, states, gamma)
We have defined our POMDP
object.
Defining a POMDP is useless unless we can find a way to solve it. As POMDPs can have infinitely many belief states, we cannot calculate one utility value for each state as we did in value_iteration
for MDPs.
The utility function can be found by pomdp_value_iteration
.
pseudocode('POMDP-Value-Iteration')
function POMDP-VALUE-ITERATION(pomdp, ε) returns a utility function
inputs: pomdp, a POMDP with states S, actions A(s), transition model P(s′ | s, a),
sensor model P(e | s), rewards R(s), discount γ
ε, the maximum error allowed in the utility of any state
local variables: U, U′, sets of plans p with associated utility vectors αp
U′ ← a set containing just the empty plan [], with α[](s) = R(s)
repeat
U ← U′
U′ ← the set of all plans consisting of an action and, for each possible next percept,
a plan in U with utility vectors computed according to Equation(??)
U′ ← REMOVE-DOMINATED-PLANS(U′)
until MAX-DIFFERENCE(U, U′) < ε(1 − γ) ⁄ γ
return U
Figure ?? A high-level sketch of the value iteration algorithm for POMDPs. The REMOVE-DOMINATED-PLANS step and MAX-DIFFERENCE test are typically implemented as linear programs.
Let's have a look at the pomdp_value_iteration
function.
psource(pomdp_value_iteration)
def pomdp_value_iteration(pomdp, epsilon=0.1):
"""Solving a POMDP by value iteration."""
U = {'':[[0]* len(pomdp.states)]}
count = 0
while True:
count += 1
prev_U = U
values = [val for action in U for val in U[action]]
value_matxs = []
for i in values:
for j in values:
value_matxs.append([i, j])
U1 = defaultdict(list)
for action in pomdp.actions:
for u in value_matxs:
u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)], Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))), [[1], [1]])
u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]])
U1[action].append(u1[0])
U = pomdp.remove_dominated_plans_fast(U1)
# replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations
if count > 10:
if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma:
return U
This function uses two aptly named helper methods from the POMDP
class, remove_dominated_plans
and max_difference
.
Let's try solving a simple one-dimensional POMDP using value-iteration.
# transition function P(s'|s,a)
t_prob = [[[0.65, 0.35], [0.65, 0.35]], [[0.65, 0.35], [0.65, 0.35]], [[1.0, 0.0], [0.0, 1.0]]]
# evidence function P(e|s)
e_prob = [[[0.5, 0.5], [0.5, 0.5]], [[0.5, 0.5], [0.5, 0.5]], [[0.8, 0.2], [0.3, 0.7]]]
# reward function
rewards = [[5, -10], [-20, 5], [-1, -1]]
gamma = 0.95
actions = ('0', '1', '2')
states = ('0', '1')
pomdp = POMDP(actions, t_prob, e_prob, rewards, states, gamma)
We have defined the POMDP
object.
Let's run pomdp_value_iteration
to find the utility function.
utility = pomdp_value_iteration(pomdp, epsilon=0.1)
%matplotlib inline
plot_pomdp_utility(utility)
Surprisingly, it turns out that there are six other optimal policies for various ranges of R(s).
You can try to find them out for yourself.
See Exercise 17.5.
To help you with this, we have a GridMDP editor in grid_mdp.py
in the GUI folder.