#!/usr/bin/env python # coding: utf-8 # # Policy Gradient with gym-MiniGrid # > In this session, it will show the pytorch-implemented Policy Gradient in Gym-MiniGrid Environment. Through this, you will know how to implement Vanila Policy Gradient (also known as REINFORCE), and test it on open source RL environment. # # - toc: true # - badges: true # - comments: true # - author: Chanseok Kang # - categories: [Python, PyTorch, Reinforcement_Learning] # - image: images/Minigrid_sample.png # ## Basic Jupyter Setting # In[3]: import numpy as np import matplotlib.pyplot as plt from pprint import pprint get_ipython().run_line_magic('matplotlib', 'inline') plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots plt.rcParams['image.interpolation'] = 'nearest' plt.rcParams['image.cmap'] = 'gray' # for auto-reloading external modules # see http://stackoverflow.com/questions/1907993/autoreload-of-modules-in-ipython get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # ## Setup the environment # Gridworld is widely used in RL environment. [Gym-MiniGrid](https://github.com/maximecb/gym-minigrid) is custom GridWorld environment of OpenAI [gym](https://github.com/openai/gym) style. Before dive in this environment, you need to install both of them. # ``` # pip install gym # pip install gym-minigrid # ``` # At first, Let's look at some frames of MiniGrid. # In[2]: import gym import gym_minigrid env = gym.make('MiniGrid-Empty-5x5-v0') env.reset() before_img = env.render('rgb_array') action = env.actions.forward obs, reward, done, info = env.step(action) after_img = env.render('rgb_array') plt.imshow(np.concatenate([before_img, after_img], 1)); # This is the example of `MiniGrid-Empty-5x5-v0` environment. There are some blank cells, and gray obstacle which the agent cannot pass it. And the green cell is the goal to reach. The ultimate goal of this environment (and most of RL problem) is to find the optimal policy with highest reward. In this case, well-trained agent should find the optimal path to reach the goal. # # Let's move to more larger environment `MiniGrid-Empty-8x8-v0`, and find the information what we can get. # In[11]: # Make a new environment MiniGrid-Empty-8x8-v0 env = gym.make('MiniGrid-Empty-8x8-v0') # Reset the environment env.reset() # Select the action right (sample action) action = env.actions.right # Take a step in the environment and store it in appropriate variables obs, reward, done, info = env.step(action) # Render the current state of the environment img = env.render('rgb_array') print('Observation:', obs) print('Reward:', reward) print('Done:', done) print('Info:', info) print('Image shape:', img.shape) plt.imshow(img); # As the agent take an action, environment (MiniGrid) will be changed with respect to action. # If the agent want to find the optimal path, the agent should notice the difference between current state and next state while taking an action. To help this, the environment generates next state, reward, and terminal flags. # # Some helper function offers to render the sample action in Jupyter Notebook. # In[12]: import base64 import glob import io from IPython.display import HTML from IPython import display def show_video(): mp4list = glob.glob('video/*.mp4') if len(mp4list) > 0: mp4 = mp4list[0] video = io.open(mp4, 'r+b').read() encoded = base64.b64encode(video) display.display(HTML(data=''''''.format(encoded.decode('ascii')))) else: print("Could not find video") # To help agent training easily, MiniGrid offers `FlatObsWrapper` for flattening observation (in other words, 1D array) # In[13]: import gym from gym import spaces from gym_minigrid.minigrid import OBJECT_TO_IDX, COLOR_TO_IDX max_env_steps = 50 class FlatObsWrapper(gym.core.ObservationWrapper): """Fully observable gridworld returning a flat grid encoding.""" def __init__(self, env): super().__init__(env) # Since the outer walls are always present, we remove left, right, top, bottom walls # from the observation space of the agent. There are 3 channels, but for simplicity # in this assignment, we will deal with flattened version of state. self.observation_space = spaces.Box( low=0, high=255, shape=((self.env.width-2) * (self.env.height-2) * 3,), # number of cells dtype='uint8' ) self.unwrapped.max_steps = max_env_steps def observation(self, obs): # this method is called in the step() function to get the observation # we provide code that gets the grid state and places the agent in it env = self.unwrapped full_grid = env.grid.encode() full_grid[env.agent_pos[0]][env.agent_pos[1]] = np.array([ OBJECT_TO_IDX['agent'], COLOR_TO_IDX['red'], env.agent_dir ]) full_grid = full_grid[1:-1, 1:-1] # remove outer walls of the environment (for efficiency) flattened_grid = full_grid.ravel() return flattened_grid def render(self, *args, **kwargs): """This removes the default visualization of the partially observable field of view.""" kwargs['highlight'] = False return self.unwrapped.render(*args, **kwargs) # So It's time to run with sample action! # In[14]: # Convert MiniGrid Environment with Flat Observable env = FlatObsWrapper(gym.make('MiniGrid-Empty-8x8-v0')) # Reset the environment env.reset() # Select the action right action = env.actions.right # Take a step in the environment and store it in appropriate variables obs, reward, done, info = env.step(action) # Render the current state of the environment img = env.render('rgb_array') ################# YOUR CODE ENDS HERE ############################### print('Observation:', obs, ', Observation Shape: ', obs.shape) print('Reward:', reward) print('Done:', done) print('Info:', info) print('Image shape:', img.shape) plt.imshow(img); # As you can see it in observation, the dimension of observation is changed from 2D to 1D. Using this observation, we will make some kind of neural network to help agent to notice the observation. Let's check the real-time video of random movement. # In[15]: from gym.wrappers import Monitor # Monitor is a gym wrapper, which helps easy rendering of videos of the wrapped environment. def wrap_env(env): env = Monitor(env, './video', force=True) return env def gen_wrapped_env(env_name): return wrap_env(FlatObsWrapper(gym.make(env_name))) # Currently, OpenAI Gym offers several utils to help understanding the training progress. Monitor is one of that tool to log the history data. If we set the rendering option to `rgb_array`, the video data will be stored in specific path. (Maybe it requires some additional apps such as ffmpeg) # ## Test with Random Policy # In[19]: # Random agent - we only use it in this cell for demonstration class RandPolicy: def __init__(self, action_space): self.action_space = action_space def act(self, *unused_args): return self.action_space.sample(), None # At first, we want check the operation of environment-agent interaction. To do this, Random Policy that generates the "random action" is defined. This policy just generates random action from pre-defined action space. And then run it. # > Note that `pytorch_policy` flag is set to `False` as a default. But to implement the policy gradient, the gradient calculation is required, and pytorch will be used. # In[18]: # This function plots videos of rollouts (episodes) of a given policy and environment def log_policy_rollout(policy, env_name, pytorch_policy=False): # Create environment with flat observation env = gen_wrapped_env(env_name) # Initialize environment observation = env.reset() done = False episode_reward = 0 episode_length = 0 # Run until done == True while not done: # Take a step if pytorch_policy: observation = torch.tensor(observation, dtype=torch.float32) action = policy.act(observation)[0].data.cpu().numpy() else: action = policy.act(observation)[0] observation, reward, done, info = env.step(action) episode_reward += reward episode_length += 1 print('Total reward:', episode_reward) print('Total length:', episode_length) env.close() show_video() # Test that the logging function is working test_env_name = 'MiniGrid-Empty-8x8-v0' rand_policy = RandPolicy(FlatObsWrapper(gym.make(test_env_name)).action_space) log_policy_rollout(rand_policy, test_env_name) # That's the agent work with Random Policy. We found out that Random Policy is not optimal policy since the agent (the red one) cannot reach the goal.(or maybe it'll reach the goal after infinite times go on...) So to reach the goal, it requires more intelligent policy. In natural sense of mind, it needs, # # - Remember the previous trajectory # - When it goes to unknown cell, based on the experience with memory, use it to find the way to goal # ## Implement Rollout Buffer # Before implementing Policy Gradient, it requires to implement memory object to store the previous trajectory or information offered from environment. Sometimes, it is called "Replay Buffer" or "Rollout Buffer", but in this page, RolloutBuffer will be used for expression. To implement Rollout Buffer, we need to consider such that, # # - how many trajectories stored in buffer? # - how to add trajectory into the buffer? # - (In view of Reinforcement Learning) how to calculate the future reward based on previous reward # - (+) how to sample the trajectory efficiently? # # So this is RolloutBuffer implementation! # In[20]: from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler class RolloutBuffer(): def __init__(self, rollout_size, obs_size): self.rollout_size = rollout_size self.obs_size = obs_size self.reset() def insert(self, step, done, action, log_prob, reward, obs): self.done[step].copy_(done) self.actions[step].copy_(action) self.log_probs[step].copy_(log_prob) self.rewards[step].copy_(reward) self.obs[step].copy_(obs) def reset(self): self.done = torch.zeros(self.rollout_size, 1) self.returns = torch.zeros(self.rollout_size + 1, 1, requires_grad=False) # Assuming Discrete Action Space self.actions = torch.zeros(self.rollout_size, 1, dtype=torch.int64) self.log_probs = torch.zeros(self.rollout_size, 1) self.rewards = torch.zeros(self.rollout_size, 1) self.obs = torch.zeros(self.rollout_size, self.obs_size) def compute_returns(self, gamma): # Compute Returns until the last finished episode self.last_done = (self.done == 1).nonzero().max() self.returns[self.last_done + 1] = 0. # Accumulate discounted returns for step in reversed(range(self.last_done + 1)): self.returns[step] = self.returns[step + 1] * \ gamma * (1 - self.done[step]) + self.rewards[step] def batch_sampler(self, batch_size, get_old_log_probs=False): sampler = BatchSampler( SubsetRandomSampler(range(self.last_done)), batch_size, drop_last=True) for indices in sampler: if get_old_log_probs: yield self.actions[indices], self.returns[indices], self.obs[indices], self.log_probs[indices] else: yield self.actions[indices], self.returns[indices], self.obs[indices] # There are couple of things to notice that, # # - All information stored in RolloutBuffer should get the type of `torch.Tensor` # - In this case, returns will be used for minimizing the loss. So returns object should set the `requires_grad` to `True` # - It is inefficient to use all information to train the policy. To handle it, it requires something special sampling strategy. In this code, `BatchSample` is used. # ## Construct Policy Network # # Now that we can store rollouts we need a policy to collect them. In the following you will complete the provided base code for the policy class. The policy is instantiated as a small neural network with simple fully-connected layers, the `ActorNetwork`. The role of policy is sort of strategy that generates the action. (Actually, it is just the probability to generate the action). # And Of course, the important work through `ActorNetwork` is to update policy per each iteration. With pytorch, we need to define, # # - What optimizer should we use? # - How can we define the loss function? # # At first, Let's look gradient function used in policy gradient, # # $$ \nabla J(\theta) = \mathbb{E}_{\pi}\big[ \nabla_{\theta} \log \pi_{\theta}(a, s) \; V_t(s) \big] $$ # # Here, $\theta$ are the parameters of the policy network $\pi_{\theta}$ and $V_t(s)$ is the observed future discounted reward from state $s$ onwards which should be **maximized** (we need to focus on this keyword, since the purpose of neural network training is to **minimize** the loss, not **maximize**). So anyway we need the calculate the gradient of $\log \pi_{\theta}(a, s)$ and calculate its mean. # # And Plus, there are some approaches to enhance the exploration. If we can consider the **entropy loss** to handle the overall loss, it takes diverse action. At that case gradient fuction will be, # # $$ \nabla J(\theta) = \mathbb{E}_{\pi}\big[ \nabla_{\theta} \log \pi_{\theta}(a, s) \; V_t(s) \big] + \nabla_{\theta}\mathcal{H}\big[\pi_\theta(a, s)\big]$$ # # And here is the implementation of Actor Network (and it's quite simple!) # In[22]: import torch import torch.nn as nn class ActorNetwork(nn.Module): def __init__(self, num_inputs, num_actions, hidden_dim): super().__init__() self.num_actions = num_actions self.fc = nn.Sequential( nn.Linear(num_inputs, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, hidden_dim), nn.Tanh(), nn.Linear(hidden_dim, num_actions) ) def forward(self, state): x = self.fc(state) return x # And Below is the implementation of Policy. We select the Adam Optimizer # In[24]: import torch.optim as optim import torch.nn.functional as F from torch.distributions.categorical import Categorical from utils.utils import count_model_params class Policy(): def __init__(self, num_inputs, num_actions, hidden_dim, learning_rate, batch_size, policy_epochs, entropy_coef=0.001): self.actor = ActorNetwork(num_inputs, num_actions, hidden_dim) self.optimizer = optim.Adam(self.actor.parameters(), lr=learning_rate) self.batch_size = batch_size self.policy_epochs = policy_epochs self.entropy_coef = entropy_coef def act(self, state): logits = self.actor(state) # To generate the probability of action, we assume its state has categorical distribution. dist = Categorical(logits=logits) action = dist.sample() log_prob = dist.log_prob(action) return action, log_prob def evaluate_actions(self, state, action): logits = self.actor(state) dist = Categorical(logits=logits) log_prob = dist.log_prob(action.squeeze(-1)).view(-1, 1) entropy = dist.entropy().view(-1, 1) return log_prob, entropy def update(self, rollouts): for epoch in range(self.policy_epochs): data = rollouts.batch_sampler(self.batch_size) for sample in data: actions_batch, returns_batch, obs_batch = sample log_probs_batch, entropy_batch = self.evaluate_actions(obs_batch, actions_batch) # Compute the mean loss for the policy update using # action log-probabilities and policy returns policy_loss = -(log_probs_batch * returns_batch).mean() # Compute the mean entropy for the policy update entropy_loss = -entropy_batch.mean() loss = policy_loss + self.entropy_coef * entropy_loss self.optimizer.zero_grad() loss.backward(retain_graph=False) self.optimizer.step() @property def num_params(self): return count_model_params(self.actor) # In[36]: from IPython.display import clear_output from utils.utils import AverageMeter, plot_learning_curve import time def train(env, rollouts, policy, params, seed=123): # SETTING SEED: it is good practice to set seeds when running experiments to keep results comparable np.random.seed(seed) torch.manual_seed(seed) env.seed(seed) rollout_time, update_time = AverageMeter(), AverageMeter() # Loggers rewards, success_rate = [], [] print("Training model with {} parameters...".format(policy.num_params)) # Training Loop for j in range(params.num_updates): ## Initialization avg_eps_reward, avg_success_rate = AverageMeter(), AverageMeter() done = False prev_obs = env.reset() prev_obs = torch.tensor(prev_obs, dtype=torch.float32) eps_reward = 0. start_time = time.time() ## Collect rollouts for step in range(rollouts.rollout_size): if done: # Store episode statistics avg_eps_reward.update(eps_reward) if 'success' in info: avg_success_rate.update(int(info['success'])) # Reset Environment obs = env.reset() obs = torch.tensor(obs, dtype=torch.float32) eps_reward = 0. else: obs = prev_obs action, log_prob = policy.act(obs) obs, reward, done, info = env.step(action) rollouts.insert(step, torch.tensor(done, dtype=torch.float32), action, log_prob, torch.tensor(reward, dtype=torch.float32), prev_obs) prev_obs = torch.tensor(obs, dtype=torch.float32) eps_reward += reward # Use the rollout buffer's function to compute the returns for all stored rollout steps. (requires just 1 line) rollouts.compute_returns(params['discount']) rollout_done_time = time.time() # Call the policy's update function using the collected rollouts policy.update(rollouts) update_done_time = time.time() rollouts.reset() ## log metrics rewards.append(avg_eps_reward.avg) if avg_success_rate.count > 0: success_rate.append(avg_success_rate.avg) rollout_time.update(rollout_done_time - start_time) update_time.update(update_done_time - rollout_done_time) print('it {}: avgR: {:.3f} -- rollout_time: {:.3f}sec -- update_time: {:.3f}sec'.format(j, avg_eps_reward.avg, rollout_time.avg, update_time.avg)) if j % params.plotting_iters == 0 and j != 0: plot_learning_curve(rewards, success_rate, params.num_updates) log_policy_rollout(policy, params.env_name, pytorch_policy=True) clear_output() # this removes all training outputs to keep the notebook clean, DON'T REMOVE THIS LINE! return rewards, success_rate # In[37]: from utils.utils import ParamDict import copy def instantiate(params_in, nonwrapped_env=None): params = copy.deepcopy(params_in) if nonwrapped_env is None: nonwrapped_env = gym.make(params.env_name) env = None env = FlatObsWrapper(nonwrapped_env) obs_size = env.observation_space.shape[0] num_actions = env.action_space.n rollouts = RolloutBuffer(params.rollout_size, obs_size) policy_class = params.policy_params.pop('policy_class') policy = policy_class(obs_size, num_actions, **params.policy_params) return env, rollouts, policy # In[38]: # hyperparameters policy_params = ParamDict( policy_class = Policy, # Policy class to use (replaced later) hidden_dim = 32, # dimension of the hidden state in actor network learning_rate = 1e-3, # learning rate of policy update batch_size = 1024, # batch size for policy update policy_epochs = 4, # number of epochs per policy update entropy_coef = 0.001, # hyperparameter to vary the contribution of entropy loss ) params = ParamDict( policy_params = policy_params, rollout_size = 2050, # number of collected rollout steps per policy update num_updates = 50, # number of training policy iterations discount = 0.99, # discount factor plotting_iters = 10, # interval for logging graphs and policy rollouts env_name = 'MiniGrid-Empty-5x5-v0', # we are using a tiny environment here for testing ) # In[39]: env, rollouts, policy = instantiate(params) rewards, success_rate = train(env, rollouts, policy, params) print("Training completed!") # In[40]: # final reward + policy plotting for easier evaluation plot_learning_curve(rewards, success_rate, params.num_updates) for _ in range(3): log_policy_rollout(policy, params.env_name, pytorch_policy=True) # In[ ]: