#!/usr/bin/env python # coding: utf-8 # # RL with DQN for cart-pole task # https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html # # There is a leaderboard at https://gym.openai.com/envs/CartPole-v0/ # In[12]: import gym import random import numpy as np import matplotlib import matplotlib.pyplot as plt import matplotlib.animation as animation from IPython.display import display, clear_output, HTML from collections import namedtuple, deque from itertools import count from PIL import Image import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import torchvision.transforms as T # In[2]: env = gym.make('CartPole-v1').unwrapped device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # `Transition` represents a single transition in the environment, mapping (state, action) pairs to (next_state, reward) result, # In[3]: Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward')) # `ReplayMemory` is a fixed-size buffer holding the most recent `Transition`s. It also has a `sample()` method for getting a batch of transitions for training. # In[4]: class ReplayMemory: def __init__(self, capacity): self.memory = deque([], maxlen=capacity) def push(self, *args): self.memory.append(Transition(*args)) def sample(self, batch_size): return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) # define the network # In[5]: class DQN(nn.Module): def __init__(self, h, w, outputs): super(DQN, self).__init__() self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2) self.bn1 = nn.BatchNorm2d(16) self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2) self.bn2 = nn.BatchNorm2d(32) self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2) self.bn3 = nn.BatchNorm2d(32) # the dimension of the fully connected layer depends on the kernel size and input size # so compute it with this function def conv2d_size_out(size, kernel_size = 5, stride = 2): return (size - (kernel_size - 1) - 1) // stride + 1 convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w))) convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h))) linear_input_size = convw * convh * 32 self.head = nn.Linear(linear_input_size, outputs) def forward(self, x): x = x.to(device) x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) return self.head(x.view(x.size(0), -1)) # functions for extracting and processing renderings from the environment # In[6]: resize = T.Compose([T.ToPILImage(), T.Resize(40, interpolation=T.InterpolationMode.BICUBIC), T.ToTensor()]) def get_cart_location(screen_width): world_width = env.x_threshold * 2 scale = screen_width / world_width return int(env.state[0] * scale + screen_width / 2.0) # MIDDLE OF CART def get_screen(): # Returned screen requested by gym is 400x600x3, but is sometimes larger # such as 800x1200x3. Transpose it into torch order (CHW). screen = env.render(mode='rgb_array').transpose((2, 0, 1)) # Cart is in the lower half, so strip off the top and bottom of the screen _, screen_height, screen_width = screen.shape screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)] view_width = int(screen_width * 0.6) cart_location = get_cart_location(screen_width) if cart_location < view_width // 2: slice_range = slice(view_width) elif cart_location > (screen_width - view_width // 2): slice_range = slice(-view_width, None) else: slice_range = slice(cart_location - view_width // 2, cart_location + view_width // 2) # Strip off the edges, so that we have a square image centered on a cart screen = screen[:, :, slice_range] # Convert to float, rescale, convert to torch tensor # (this doesn't require a copy) screen = np.ascontiguousarray(screen, dtype=np.float32) / 255 screen = torch.from_numpy(screen) # Resize, and add a batch dimension (BCHW) return resize(screen).unsqueeze(0).to(device) env.reset() plt.figure() plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(), interpolation='none') plt.title('Example extracted screen') plt.show() # instantiate the model & optimizer, and define utility functions # In[7]: BATCH_SIZE = 128 GAMMA = 0.999 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 TARGET_UPDATE = 10 # Get screen size so that we can initialize layers correctly based on shape # returned from AI gym. Typical dimensions at this point are close to 3x40x90 # which is the result of a clamped and down-scaled render buffer in get_screen() init_screen = get_screen() _, _, screen_height, screen_width = init_screen.shape # Get number of actions from gym action space n_actions = env.action_space.n policy_net = DQN(screen_height, screen_width, n_actions).to(device) target_net = DQN(screen_height, screen_width, n_actions).to(device) target_net.load_state_dict(policy_net.state_dict()) target_net.eval() print(f"number of parameters: {sum(p.numel() for p in policy_net.parameters())}") optimizer = optim.RMSprop(policy_net.parameters()) memory = ReplayMemory(10000) steps_done = 0 def select_action(state): global steps_done sample = random.random() # change slowly from exploration to exploitation # uses the epsilon greedy policy eps_threshold = EPS_END + (EPS_START - EPS_END) * \ np.exp(-1. * steps_done / EPS_DECAY) steps_done += 1 if sample > eps_threshold: # action with highest Q value with torch.no_grad(): return policy_net(state).max(1)[1].view(1, 1) else: # random action return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long) episode_durations = [] def plot_durations(ax, fig): durations_t = torch.tensor(episode_durations, dtype=torch.float) ax.cla() ax.set_title('Training...') ax.set_xlabel('Episode') ax.set_ylabel('Duration') ax.plot(durations_t.numpy()) # Take 100 episode averages and plot them too if len(durations_t) >= 100: means = durations_t.unfold(0, 100, 1).mean(1).view(-1) means = torch.cat((torch.zeros(99), means)) ax.plot(means.numpy()) display(fig) clear_output(wait=True) plt.pause(0.01) # pause a bit so that plots are updated # # training loop # # Huber loss is used; acts like mean squared error when error is small, and like mean absolute error when error is large, which for some reason makes it more robust to outliers when the values of the Q function is noisy. # ![](https://upload.wikimedia.org/wikipedia/commons/thumb/c/cc/Huber_loss.svg/450px-Huber_loss.svg.png) # # In[8]: def optimize_model(): if len(memory) < BATCH_SIZE: return transitions = memory.sample(BATCH_SIZE) # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for # detailed explanation). This converts batch-array of Transitions # to Transition of batch-arrays. batch = Transition(*zip(*transitions)) # Compute a mask of non-final states and concatenate the batch elements # (a final state would've been the one after which simulation ended) non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool) non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]) state_batch = torch.cat(batch.state) action_batch = torch.cat(batch.action) reward_batch = torch.cat(batch.reward) # Compute Q(s_t, a) - the model computes Q(s_t), then we select the # columns of actions taken. These are the actions which would've been taken # for each batch state according to policy_net state_action_values = policy_net(state_batch).gather(1, action_batch) # Compute V(s_{t+1}) for all next states. # Expected values of actions for non_final_next_states are computed based # on the "older" target_net; selecting their best reward with max(1)[0]. # This is merged based on the mask, such that we'll have either the expected # state value or 0 in case the state was final. next_state_values = torch.zeros(BATCH_SIZE, device=device) next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach() # Compute the expected Q values expected_state_action_values = (next_state_values * GAMMA) + reward_batch # Compute Huber loss loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1)) # Optimize the model optimizer.zero_grad() loss.backward() for param in policy_net.parameters(): param.grad.data.clamp_(-1, 1) optimizer.step() # In[9]: num_episodes = 500 fig = plt.figure() ax = fig.add_subplot(1, 1, 1) for i_episode in range(num_episodes): # Initialize the environment and state env.reset() last_screen = get_screen() current_screen = get_screen() state = current_screen - last_screen for t in count(): # Select and perform an action action = select_action(state) _, reward, done, _ = env.step(action.item()) reward = torch.tensor([reward], device=device) # Observe new state last_screen = current_screen current_screen = get_screen() if not done: next_state = current_screen - last_screen else: next_state = None # Store the transition in memory memory.push(state, action, next_state, reward) # Move to the next state state = next_state # Perform one step of the optimization (on the target network) optimize_model() if done: episode_durations.append(t + 1) plot_durations(ax, fig) break # Update the target network once in a while, copying all weights and biases in DQN if i_episode % TARGET_UPDATE == 0: target_net.load_state_dict(policy_net.state_dict()) torch.save(policy_net.state_dict(), f"dqn_model_episode{i_episode}.pth") # try out the policy from the learned model # In[16]: # load model policy_net.load_state_dict(torch.load('dqn_model_episode470.pth')) env.reset() last_screen = get_screen() current_screen = get_screen() state = current_screen - last_screen # %matplotlib ipympl fig = plt.figure() ims = [] while True: # select action with torch.no_grad(): action = policy_net(state).max(1)[1].view(1, 1) _, _, done, _ = env.step(action.item()) last_screen = current_screen current_screen = get_screen() state = current_screen - last_screen # visualization ims.append([plt.imshow(env.render(mode='rgb_array'))]) if done: break print(f"frames: {len(ims)}") ani = animation.ArtistAnimation(fig, ims, interval=100) html = HTML(ani.to_jshtml()) display(html) plt.close() # works fairly well!