In [0]:
import tensorflow as tf, numpy as np, gym


# Part I¶

We'll introduce OpenAI gym, and build a RandomAgent to iteract with it.

## Gym Environment basics¶

OpenAI's gym provides several environments, together with a simple interface, that can be used to train RL agents.

In [0]:
# Here is a list of envs: https://gym.openai.com/envs/
env = gym.make('CartPole-v1')

In [0]:
# Reset the environment and return the initial state:
env.reset()

Out[0]:
array([0.03043647, 0.03669709, 0.02053445, 0.02925962])
In [0]:
# We can also get the shape of the state space directly:
env.observation_space

Out[0]:
Box(4,)
In [0]:
# If we were local, we could visualize the environment.
# To get an idea, see https://gym.openai.com/envs/CartPole-v1/
# env.render()

In [0]:
# Get available actions
env.action_space

Out[0]:
Discrete(2)
In [0]:
# Sample a random action
env.action_space.sample()

Out[0]:
0
In [0]:
# To perform an action in the environment we do:
obs, reward, done, info = env.step(env.action_space.sample())

print('next state: {}'.format(obs))
print('reward: {}'.format(reward))
print('episode over: {}'.format(done))
print('extra info: {}'.format(info))

next state: [ 0.03117041  0.23151864  0.02111964 -0.25687439]
reward: 1.0
episode over: False
extra info: {}


## Lets make a dummy agent¶

In [0]:
class RandomAgent():
def __init__(self, env):
self.env = env

def train(self):
""" This agent doesn't need to be trained! """
pass

def act(self, state):
""" This executes the agent's policy """
return self.env.action_space.sample()

def run_episode(self, max_steps=100, verbose=False):
""" Runs a test episode """
state = self.env.reset()
total_reward = 0
if verbose:
print("Step 0; State: {}".format(state))
for step in range(max_steps):
next_state, reward, done, _ = self.env.step(self.act(state))
if verbose:
print("Step {}; State: {}; Reward: {}".format(step, state, reward))

state = next_state
total_reward += reward
if done:
print("Done! Final state: {}; Total reward: {}".format(state, total_reward))
break
elif step ==  max_steps -1:
print("Did not finish in time! Final state: {}; Total reward: {}".format(state, total_reward))


In [0]:
agent = RandomAgent(env)
agent.act([0,0,0,0])

Out[0]:
0

#### Now we can run an episode with a random policy:¶

In [0]:
agent.run_episode(max_steps=20, verbose=True)

Step 0; State: [-0.03681555  0.02262997 -0.0247906  -0.03605247]
Step 0; State: [-0.03681555  0.02262997 -0.0247906  -0.03605247]; Reward: 1.0
Step 1; State: [-0.03636295  0.21809849 -0.02551165 -0.33645284]; Reward: 1.0
Step 2; State: [-0.03200098  0.41357404 -0.0322407  -0.63707035]; Reward: 1.0
Step 3; State: [-0.0237295   0.60913041 -0.04498211 -0.9397296 ]; Reward: 1.0
Step 4; State: [-0.01154689  0.80482891 -0.0637767  -1.2462007 ]; Reward: 1.0
Step 5; State: [ 0.00454969  1.00070825 -0.08870072 -1.55816   ]; Reward: 1.0
Step 6; State: [ 0.02456386  1.19677323 -0.11986392 -1.87714511]; Reward: 1.0
Step 7; State: [ 0.04849932  1.39298144 -0.15740682 -2.20450137]; Reward: 1.0
Step 8; State: [ 0.07635895  1.19968414 -0.20149685 -1.96422674]; Reward: 1.0
Done! Final state: [ 0.10035263  1.00718522 -0.24078138 -1.7401603 ]; Total reward: 9.0


# Part II¶

Let's build a simple Random gym environment, solve it, and see why we need RL.

## Building a new Gym environment¶

See https://github.com/openai/gym/blob/master/gym/core.py for reference.

The main API methods that users of this class need to know are:

- step
- reset
- render
- close
- seed

And set the following attributes:

- action_space: The Space object corresponding to valid actions
- observation_space: The Space object corresponding to valid observations
- reward_range: A tuple corresponding to the min and max possible rewards

Note: a default reward range set to [-inf,+inf] already exists. Set it if you want a narrower range.
In [0]:
class RandomEnv(gym.Env):
def __init__(self, num_states=5, num_actions=1, reward_range=(-1,1), seed=None):

# We generate random num_states x num_states transition matrices,
# one for each action
# so self.transitions[action][state] returns a vector of next_state probs
# i.e., self.transitions[a][s][s'] = T(s, a, s')

self.seed(seed)

self.transitions = []
for action in range(num_actions):
t = np.random.random((num_states, num_states)) + 1e-2
t = t / np.sum(t, axis=1, keepdims=True)
self.transitions.append(t)

# We'll make rewards a function of states and actions
# So self.rewards[state][action] is our reward fn
self.rewards = np.random.random((num_states, num_actions))

self.action_space = gym.spaces.Discrete(num_actions)
self.observation_space = gym.spaces.Discrete(num_states)
self.reward_range = reward_range

self._state = 0

def seed(self, seed):
np.random.seed(seed=seed)

def render(self):
pass

def close(self):
pass

def reset(self):
self._state = 0
return 0

def step(self, action):
reward = self.rewards[self._state, action]
next_state_probs = self.transitions[action][self._state]
next_state = np.random.choice(range(self.observation_space.n), p=next_state_probs)
self._state = next_state

return next_state, reward, False, None

In [0]:
E = RandomEnv(seed=1)

In [0]:
E.transitions[0][0]

Out[0]:
array([0.26092828, 0.44625877, 0.00618031, 0.19084825, 0.0957844 ])
In [0]:
E.rewards[3][0]

Out[0]:
0.1698304195645689

## Policy Valuation¶

We've now defined an MDP (well, just a Markov "Reward" Process when num_actions = 1). Let's solve it.

Since there are no decisions to be made, solving the MRP amounts to finding the value function in each state---which is equivalent to the "policy valuation" step in an MDP (where a given policy determines an MRP --- we'll see this shortly).

We can do policy valuation in many ways:

• Solving a set of linear equations
• Monte Carlo rollouts
• Value iteration

Let's do the first, and solve by matrix inversion.

Recall the Bellman relation for MRPs (no actions):

$$V(s) = R(s) + \gamma E(V(s'))$$

Using vectors $v$ and $r$, and noting that $E(v') = Tv$, we have:

\begin{align} v &= r + \gamma Tv\\ (I - \gamma T)v &= r\\ v &= (I - \gamma T)^{-1} r \end{align}

So we can solve for the value function by matrix inversion:

In [0]:
value_vector = np.linalg.inv((np.eye(E.observation_space.n) - 0.99*E.transitions[0])).dot(E.rewards[:,0])

In [0]:
value_vector

Out[0]:
array([40.72539222, 39.93188114, 39.74523951, 40.00483612, 40.78187634])
In [0]:
E.rewards[:,0]

Out[0]:
array([0.89460666, 0.08504421, 0.03905478, 0.16983042, 0.8781425 ])

## Solving an MDP¶

In [0]:
E = RandomEnv(num_states = 5, num_actions = 2, seed=1)

E.transitions

Out[0]:
[array([[0.26092828, 0.44625877, 0.00618031, 0.19084825, 0.0957844 ],
[0.06357446, 0.12192016, 0.22088033, 0.25269083, 0.34093422],
[0.18954246, 0.3070254 , 0.09470719, 0.3922137 , 0.01651125],
[0.33439019, 0.20998289, 0.27946125, 0.07390201, 0.10226366],
[0.21905254, 0.26431343, 0.08738497, 0.18975835, 0.23949071]]),
array([[0.42737082, 0.04490252, 0.02317536, 0.08495878, 0.41959252],
[0.03936469, 0.15663049, 0.35165467, 0.1973434 , 0.25500675],
[0.1226013 , 0.26232817, 0.31811746, 0.01065442, 0.28629864],
[0.3374558 , 0.25613912, 0.09812376, 0.27002898, 0.03825234],
[0.21620095, 0.43372795, 0.14335574, 0.14059887, 0.06611648]])]

We now have 2 actions available in every state, and our transition function is no longer a matrix. Our Bellman optimality equation now has a "max" in it:

$$V^*(s) = \max_a \left[ R(s, a) + \gamma E_a(V^*(s')) \right]$$

We can no longer solve this as a set of linear equations. (But note that we can still value any policy using the Bellman relation, as we did above).

Instead we have a few options:

• Value Iteration
• Policy Iteration
• Linear Programming

Let's do Value Iteration. It is a dynamic programming algorithm that keeps a current estimate of $v$ (our value function as a vector), and updates each entry successively according to the Bellman optimality equation:

$$v(s) \leftarrow = \max_a \left[ R(s, a) + \gamma E_a (v(s')) \right]$$
In [0]:
def value_iteration(env, iters=1000, gamma=0.99):
v = np.random.random((env.observation_space.n,))
for i in range(iters):
for s in range(env.observation_space.n):
rs = env.rewards[s]
next_state_values = []
for a in range(env.action_space.n):
next_state_values.append(env.transitions[a][s].dot(v))
next_state_targets = rs + gamma * np.array(next_state_values)

v[s] = np.max(next_state_targets)
return v

In [0]:
v = value_iteration(E)
print(v)

[54.49761196 53.90892641 54.13984941 54.26911537 54.30439125]


#### How do we use this to get a policy?¶

In [0]:
def act(env, s, v, gamma=0.99):
rs = env.rewards[s]
next_state_values = []
for a in range(env.action_space.n):
next_state_values.append(env.transitions[a][s].dot(v))
next_state_targets = rs + gamma * np.array(next_state_values)

return np.argmax(next_state_values)

In [0]:
for state in range(5):
print(act(E, state, v))

1
0
0
1
0


### Part III: Reinforcement Learning¶

In general, we won't have this, so what do we do?

Q-learning! Note that the Q Learning agent below never accesses env.transitions/env.rewards, except via the environment's step function, but still learns the optimal value function.

### Let's make a Q-learning agent¶

In [0]:
class QLearningAgent():
""" Copying over our random agent, but adding the ability to train it. """
def __init__(self, env, lr=0.01, gamma=0.99):
self.env = env
self.gamma = gamma
self.lr = lr

# Our Q function will be a [num_states, num_actions] matrix
# So Q(s, a) = self.Q[s, a]
self.Q = np.zeros((env.observation_space.n, env.action_space.n))

def train(self, num_steps=100000):
""" This agent doesn't need to be trained! """
state = self.env.reset()
for step in range(num_steps):
# take a step in the environment]
action = self.act(state)
next_state, reward, done, info = self.env.step(action)

if done:
discount = 0
else:
discount = self.gamma

target = reward + discount * np.max(self.Q[next_state])

td_error = target - self.Q[state, action]
self.Q[state, action] += self.lr * td_error

state = next_state

def act(self, state):
""" This executes the agent's policy -- let's stick to random for now"""
return self.env.action_space.sample()

def run_episode(self, max_steps=100, verbose=False):
""" Runs a test episode """
state = self.env.reset()
total_reward = 0
if verbose:
print("Step 0; State: {}".format(state))
for step in range(max_steps):
next_state, reward, done, _ = self.env.step(self.act(state))
if verbose:
print("Step {}; State: {}; Reward: {}".format(step, state, reward))

state = next_state
total_reward += reward
if done:
print("Done! Final state: {}; Total reward: {}".format(state, total_reward))
break
elif step ==  max_steps -1:
print("Did not finish in time! Final state: {}; Total reward: {}".format(state, total_reward))

In [0]:
agent = QLearningAgent(E)
agent.train(100000)

In [0]:
# Not quite there... need more training steps!
agent.Q

Out[0]:
array([[34.0170347 , 34.87141505],
[34.25573211, 34.26880007],
[34.42765406, 33.96374134],
[34.62721616, 34.18911257],
[34.63770088, 34.68663144]])
In [0]:
agent.train(500000)

In [0]:
agent.Q

Out[0]:
array([[53.58386508, 54.4235428 ],
[53.8241882 , 53.82901508],
[54.06684333, 53.61486028],
[54.17800861, 53.78310343],
[54.20732702, 54.21742006]])
In [0]:
# Compare to this to the result of the value iteration above:
print(v)
print(np.max(agent.Q, axis=1))

[54.49761196 53.90892641 54.13984941 54.26911537 54.30439125]
[54.4235428  53.82901508 54.06684333 54.17800861 54.21742006]

In [0]:



## Part IV: Going beyond Tabular Q-Learning¶

TBD

• Function Approximation
• Continuous Action Spaces