import tensorflow as tf, numpy as np, gym
# Here is a list of envs: https://gym.openai.com/envs/
env = gym.make('CartPole-v1')
# Reset the environment and return the initial state:
env.reset()
array([0.03043647, 0.03669709, 0.02053445, 0.02925962])
# We can also get the shape of the state space directly:
env.observation_space
Box(4,)
# If we were local, we could visualize the environment.
# To get an idea, see https://gym.openai.com/envs/CartPole-v1/
# env.render()
# Get available actions
env.action_space
Discrete(2)
# Sample a random action
env.action_space.sample()
0
# To perform an action in the environment we do:
obs, reward, done, info = env.step(env.action_space.sample())
print('next state: {}'.format(obs))
print('reward: {}'.format(reward))
print('episode over: {}'.format(done))
print('extra info: {}'.format(info))
next state: [ 0.03117041 0.23151864 0.02111964 -0.25687439] reward: 1.0 episode over: False extra info: {}
class RandomAgent():
def __init__(self, env):
self.env = env
def train(self):
""" This agent doesn't need to be trained! """
pass
def act(self, state):
""" This executes the agent's policy """
return self.env.action_space.sample()
def run_episode(self, max_steps=100, verbose=False):
""" Runs a test episode """
state = self.env.reset()
total_reward = 0
if verbose:
print("Step 0; State: {}".format(state))
for step in range(max_steps):
next_state, reward, done, _ = self.env.step(self.act(state))
if verbose:
print("Step {}; State: {}; Reward: {}".format(step, state, reward))
state = next_state
total_reward += reward
if done:
print("Done! Final state: {}; Total reward: {}".format(state, total_reward))
break
elif step == max_steps -1:
print("Did not finish in time! Final state: {}; Total reward: {}".format(state, total_reward))
agent = RandomAgent(env)
agent.act([0,0,0,0])
0
agent.run_episode(max_steps=20, verbose=True)
Step 0; State: [-0.03681555 0.02262997 -0.0247906 -0.03605247] Step 0; State: [-0.03681555 0.02262997 -0.0247906 -0.03605247]; Reward: 1.0 Step 1; State: [-0.03636295 0.21809849 -0.02551165 -0.33645284]; Reward: 1.0 Step 2; State: [-0.03200098 0.41357404 -0.0322407 -0.63707035]; Reward: 1.0 Step 3; State: [-0.0237295 0.60913041 -0.04498211 -0.9397296 ]; Reward: 1.0 Step 4; State: [-0.01154689 0.80482891 -0.0637767 -1.2462007 ]; Reward: 1.0 Step 5; State: [ 0.00454969 1.00070825 -0.08870072 -1.55816 ]; Reward: 1.0 Step 6; State: [ 0.02456386 1.19677323 -0.11986392 -1.87714511]; Reward: 1.0 Step 7; State: [ 0.04849932 1.39298144 -0.15740682 -2.20450137]; Reward: 1.0 Step 8; State: [ 0.07635895 1.19968414 -0.20149685 -1.96422674]; Reward: 1.0 Done! Final state: [ 0.10035263 1.00718522 -0.24078138 -1.7401603 ]; Total reward: 9.0
Let's build a simple Random gym environment, solve it, and see why we need RL.
See https://github.com/openai/gym/blob/master/gym/core.py for reference.
The main API methods that users of this class need to know are:
- step
- reset
- render
- close
- seed
And set the following attributes:
- action_space: The Space object corresponding to valid actions
- observation_space: The Space object corresponding to valid observations
- reward_range: A tuple corresponding to the min and max possible rewards
Note: a default reward range set to [-inf,+inf] already exists. Set it if you want a narrower range.
class RandomEnv(gym.Env):
def __init__(self, num_states=5, num_actions=1, reward_range=(-1,1), seed=None):
# We generate random num_states x num_states transition matrices,
# one for each action
# so self.transitions[action][state] returns a vector of next_state probs
# i.e., self.transitions[a][s][s'] = T(s, a, s')
self.seed(seed)
self.transitions = []
for action in range(num_actions):
t = np.random.random((num_states, num_states)) + 1e-2
t = t / np.sum(t, axis=1, keepdims=True)
self.transitions.append(t)
# We'll make rewards a function of states and actions
# So self.rewards[state][action] is our reward fn
self.rewards = np.random.random((num_states, num_actions))
self.action_space = gym.spaces.Discrete(num_actions)
self.observation_space = gym.spaces.Discrete(num_states)
self.reward_range = reward_range
self._state = 0
def seed(self, seed):
np.random.seed(seed=seed)
def render(self):
pass
def close(self):
pass
def reset(self):
self._state = 0
return 0
def step(self, action):
reward = self.rewards[self._state, action]
next_state_probs = self.transitions[action][self._state]
next_state = np.random.choice(range(self.observation_space.n), p=next_state_probs)
self._state = next_state
return next_state, reward, False, None
E = RandomEnv(seed=1)
E.transitions[0][0]
array([0.26092828, 0.44625877, 0.00618031, 0.19084825, 0.0957844 ])
E.rewards[3][0]
0.1698304195645689
We've now defined an MDP (well, just a Markov "Reward" Process when num_actions = 1). Let's solve it.
Since there are no decisions to be made, solving the MRP amounts to finding the value function in each state---which is equivalent to the "policy valuation" step in an MDP (where a given policy determines an MRP --- we'll see this shortly).
We can do policy valuation in many ways:
Let's do the first, and solve by matrix inversion.
Recall the Bellman relation for MRPs (no actions):
$$V(s) = R(s) + \gamma E(V(s'))$$Using vectors $v$ and $r$, and noting that $E(v') = Tv$, we have:
$$ \begin{align} v &= r + \gamma Tv\\ (I - \gamma T)v &= r\\ v &= (I - \gamma T)^{-1} r \end{align} $$So we can solve for the value function by matrix inversion:
value_vector = np.linalg.inv((np.eye(E.observation_space.n) - 0.99*E.transitions[0])).dot(E.rewards[:,0])
value_vector
array([40.72539222, 39.93188114, 39.74523951, 40.00483612, 40.78187634])
E.rewards[:,0]
array([0.89460666, 0.08504421, 0.03905478, 0.16983042, 0.8781425 ])
E = RandomEnv(num_states = 5, num_actions = 2, seed=1)
E.transitions
[array([[0.26092828, 0.44625877, 0.00618031, 0.19084825, 0.0957844 ], [0.06357446, 0.12192016, 0.22088033, 0.25269083, 0.34093422], [0.18954246, 0.3070254 , 0.09470719, 0.3922137 , 0.01651125], [0.33439019, 0.20998289, 0.27946125, 0.07390201, 0.10226366], [0.21905254, 0.26431343, 0.08738497, 0.18975835, 0.23949071]]), array([[0.42737082, 0.04490252, 0.02317536, 0.08495878, 0.41959252], [0.03936469, 0.15663049, 0.35165467, 0.1973434 , 0.25500675], [0.1226013 , 0.26232817, 0.31811746, 0.01065442, 0.28629864], [0.3374558 , 0.25613912, 0.09812376, 0.27002898, 0.03825234], [0.21620095, 0.43372795, 0.14335574, 0.14059887, 0.06611648]])]
We now have 2 actions available in every state, and our transition function is no longer a matrix. Our Bellman optimality equation now has a "max" in it:
$$V^*(s) = \max_a \left[ R(s, a) + \gamma E_a(V^*(s')) \right]$$We can no longer solve this as a set of linear equations. (But note that we can still value any policy using the Bellman relation, as we did above).
Instead we have a few options:
Let's do Value Iteration. It is a dynamic programming algorithm that keeps a current estimate of $v$ (our value function as a vector), and updates each entry successively according to the Bellman optimality equation:
$$v(s) \leftarrow = \max_a \left[ R(s, a) + \gamma E_a (v(s')) \right]$$def value_iteration(env, iters=1000, gamma=0.99):
v = np.random.random((env.observation_space.n,))
for i in range(iters):
for s in range(env.observation_space.n):
rs = env.rewards[s]
next_state_values = []
for a in range(env.action_space.n):
next_state_values.append(env.transitions[a][s].dot(v))
next_state_targets = rs + gamma * np.array(next_state_values)
v[s] = np.max(next_state_targets)
return v
v = value_iteration(E)
print(v)
[54.49761196 53.90892641 54.13984941 54.26911537 54.30439125]
def act(env, s, v, gamma=0.99):
rs = env.rewards[s]
next_state_values = []
for a in range(env.action_space.n):
next_state_values.append(env.transitions[a][s].dot(v))
next_state_targets = rs + gamma * np.array(next_state_values)
return np.argmax(next_state_values)
for state in range(5):
print(act(E, state, v))
1 0 0 1 0
Above we assumed had access to env.transitions and env.rewards.
In general, we won't have this, so what do we do?
Q-learning! Note that the Q Learning agent below never accesses env.transitions/env.rewards, except via the environment's step function, but still learns the optimal value function.
class QLearningAgent():
""" Copying over our random agent, but adding the ability to train it. """
def __init__(self, env, lr=0.01, gamma=0.99):
self.env = env
self.gamma = gamma
self.lr = lr
# Our Q function will be a [num_states, num_actions] matrix
# So Q(s, a) = self.Q[s, a]
self.Q = np.zeros((env.observation_space.n, env.action_space.n))
def train(self, num_steps=100000):
""" This agent doesn't need to be trained! """
state = self.env.reset()
for step in range(num_steps):
# take a step in the environment]
action = self.act(state)
next_state, reward, done, info = self.env.step(action)
if done:
discount = 0
else:
discount = self.gamma
target = reward + discount * np.max(self.Q[next_state])
td_error = target - self.Q[state, action]
self.Q[state, action] += self.lr * td_error
state = next_state
def act(self, state):
""" This executes the agent's policy -- let's stick to random for now"""
return self.env.action_space.sample()
def run_episode(self, max_steps=100, verbose=False):
""" Runs a test episode """
state = self.env.reset()
total_reward = 0
if verbose:
print("Step 0; State: {}".format(state))
for step in range(max_steps):
next_state, reward, done, _ = self.env.step(self.act(state))
if verbose:
print("Step {}; State: {}; Reward: {}".format(step, state, reward))
state = next_state
total_reward += reward
if done:
print("Done! Final state: {}; Total reward: {}".format(state, total_reward))
break
elif step == max_steps -1:
print("Did not finish in time! Final state: {}; Total reward: {}".format(state, total_reward))
agent = QLearningAgent(E)
agent.train(100000)
# Not quite there... need more training steps!
agent.Q
array([[34.0170347 , 34.87141505], [34.25573211, 34.26880007], [34.42765406, 33.96374134], [34.62721616, 34.18911257], [34.63770088, 34.68663144]])
agent.train(500000)
agent.Q
array([[53.58386508, 54.4235428 ], [53.8241882 , 53.82901508], [54.06684333, 53.61486028], [54.17800861, 53.78310343], [54.20732702, 54.21742006]])
# Compare to this to the result of the value iteration above:
print(v)
print(np.max(agent.Q, axis=1))
[54.49761196 53.90892641 54.13984941 54.26911537 54.30439125] [54.4235428 53.82901508 54.06684333 54.17800861 54.21742006]
TBD