In this post, We will take a hands-on-lab of Cross-Entropy Methods (CEM for short) on openAI gym MountainCarContinuous-v0 environment. This is the coding exercise from udacity Deep Reinforcement Learning Nanodegree.
In this notebook, you will implement CEM on OpenAI Gym's MountainCarContinuous-v0 environment. For summary, The cross-entropy method is sort of Black box optimization and it iteratively suggests a small number of neighboring policies, and uses a small percentage of the best performing policies to calculate a new estimate.
import gym
import math
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import base64, io
# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display
import glob
C:\Users\kcsgo\anaconda3\lib\site-packages\numpy\_distributor_init.py:32: UserWarning: loaded more than 1 DLL from .libs: C:\Users\kcsgo\anaconda3\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll C:\Users\kcsgo\anaconda3\lib\site-packages\numpy\.libs\libopenblas.WCDJNK7YVMPZQ2ME2ZZHJJRJ3JIKNDB7.gfortran-win_amd64.dll stacklevel=1)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device
device(type='cuda', index=0)
MountainCar environment has two types: Discrete and Continuous. In this notebook, we used Continuous version of MountainCar. That is, we can move the car to the left (or right) precisely.
env = gym.make('MountainCarContinuous-v0')
env.seed(101)
np.random.seed(101)
print('observation space:', env.observation_space)
print('action space:', env.action_space)
print(' - low:', env.action_space.low)
print(' - high:', env.action_space.high)
observation space: Box(-1.2000000476837158, 0.6000000238418579, (2,), float32) action space: Box(-1.0, 1.0, (1,), float32) - low: [-1.] - high: [1.]
class Agent(nn.Module):
def __init__(self, env, h_size=16):
super(Agent, self).__init__()
self.env = env
# state, hidden layer, action sizes
self.s_size = env.observation_space.shape[0]
self.h_size = h_size
self.a_size = env.action_space.shape[0]
# define layers (we used 2 layers)
self.fc1 = nn.Linear(self.s_size, self.h_size)
self.fc2 = nn.Linear(self.h_size, self.a_size)
def set_weights(self, weights):
s_size = self.s_size
h_size = self.h_size
a_size = self.a_size
# separate the weights for each layer
fc1_end = (s_size * h_size) + h_size
fc1_W = torch.from_numpy(weights[:s_size*h_size].reshape(s_size, h_size))
fc1_b = torch.from_numpy(weights[s_size*h_size:fc1_end])
fc2_W = torch.from_numpy(weights[fc1_end:fc1_end+(h_size*a_size)].reshape(h_size, a_size))
fc2_b = torch.from_numpy(weights[fc1_end+(h_size*a_size):])
# set the weights for each layer
self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
self.fc2.weight.data.copy_(fc2_W.view_as(self.fc2.weight.data))
self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))
def get_weights_dim(self):
return (self.s_size + 1) * self.h_size + (self.h_size + 1) * self.a_size
def forward(self, x):
x = F.relu(self.fc1(x))
x = torch.tanh(self.fc2(x))
return x.cpu().data
def act(self, state):
state = torch.from_numpy(state).float().to(device)
with torch.no_grad():
action = self.forward(state)
return action
def evaluate(self, weights, gamma=1.0, max_t=5000):
self.set_weights(weights)
episode_return = 0.0
state = self.env.reset()
for t in range(max_t):
state = torch.from_numpy(state).float().to(device)
action = self.forward(state)
state, reward, done, _ = self.env.step(action)
episode_return += reward * math.pow(gamma, t)
if done:
break
return episode_return
def cem(agent, n_iterations=500, max_t=1000, gamma=1.0, print_every=10, pop_size=50, elite_frac=0.2, sigma=0.5):
"""PyTorch implementation of the cross-entropy method.
Params
======
Agent (object): agent instance
n_iterations (int): maximum number of training iterations
max_t (int): maximum number of timesteps per episode
gamma (float): discount rate
print_every (int): how often to print average score (over last 100 episodes)
pop_size (int): size of population at each iteration
elite_frac (float): percentage of top performers to use in update
sigma (float): standard deviation of additive noise
"""
n_elite=int(pop_size*elite_frac)
scores_deque = deque(maxlen=100)
scores = []
# Initialize the weight with random noise
best_weight = sigma * np.random.randn(agent.get_weights_dim())
for i_iteration in range(1, n_iterations+1):
# Define the cadidates and get the reward of each candidate
weights_pop = [best_weight + (sigma * np.random.randn(agent.get_weights_dim())) for i in range(pop_size)]
rewards = np.array([agent.evaluate(weights, gamma, max_t) for weights in weights_pop])
# Select best candidates from collected rewards
elite_idxs = rewards.argsort()[-n_elite:]
elite_weights = [weights_pop[i] for i in elite_idxs]
best_weight = np.array(elite_weights).mean(axis=0)
reward = agent.evaluate(best_weight, gamma=1.0)
scores_deque.append(reward)
scores.append(reward)
torch.save(agent.state_dict(), 'checkpoint.pth')
if i_iteration % print_every == 0:
print('Episode {}\tAverage Score: {:.2f}'.format(i_iteration, np.mean(scores_deque)))
if np.mean(scores_deque)>=90.0:
print('\nEnvironment solved in {:d} iterations!\tAverage Score: {:.2f}'.format(i_iteration-100, np.mean(scores_deque)))
break
return scores
agent = Agent(env).to(device)
scores = cem(agent)
Episode 10 Average Score: -1.44 Episode 20 Average Score: -3.98 Episode 30 Average Score: -4.18 Episode 40 Average Score: 2.57 Episode 50 Average Score: 18.74 Episode 60 Average Score: 29.35 Episode 70 Average Score: 38.69 Episode 80 Average Score: 45.65 Episode 90 Average Score: 47.98 Episode 100 Average Score: 52.56 Episode 110 Average Score: 62.09 Episode 120 Average Score: 72.28 Episode 130 Average Score: 82.21 Episode 140 Average Score: 89.48 Environment solved in 47 iterations! Average Score: 90.83
# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores)+1), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()
def show_video(env_name):
mp4list = glob.glob('video/*.mp4')
if len(mp4list) > 0:
mp4 = 'video/{}.mp4'.format(env_name)
video = io.open(mp4, 'r+b').read()
encoded = base64.b64encode(video)
display.display(HTML(data='''<video alt="test" autoplay
loop controls style="height: 400px;">
<source src="data:video/mp4;base64,{0}" type="video/mp4" />
</video>'''.format(encoded.decode('ascii'))))
else:
print("Could not find video")
def show_video_of_model(agent, env_name):
env = gym.make(env_name)
vid = video_recorder.VideoRecorder(env, path="video/{}.mp4".format(env_name))
agent.load_state_dict(torch.load('checkpoint.pth'))
state = env.reset()
done = False
while not done:
vid.capture_frame()
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
state = next_state
if done:
break
env.close()
agent = Agent(env).to(device)
show_video_of_model(agent, 'MountainCarContinuous-v0')
show_video('MountainCarContinuous-v0')
Note: While I tried to execute the model with VideoRecorder in Linux, it doesn't show correctly. But in windows, it works! Perhaps, It may be bug in VideoRecoder.