This notebook outlines the end-to-end mathematical and computational modelling of an inverted double pendulum with the integration of the Proximal Policy Optimisation algorithm as the control system. This project serves as a baseline study into advanced autonomous control systems facilitating the control of multibody, variable mass dynamical systems, such as docking and berthing of spacecraft, and rocket trajectory stabilisation.
Produced by Mughees Asif, under the supervision of Dr. Angadh Nanjangud (Lecturer in Aerospace/Spacecraft Engineering @ Queen Mary, University of London). This project was awarded with the Institution of Mechanical Engineers (IMechE) Project Prize, in recognition, for the best third-year project for a finalist on an IMechE accredited programme.
@misc{asif_nanjangud_2021,
title = {Deep Reinforcement Learning for Robotic Systems},
url = {https://github.com/mughees-asif/dip},
journal = {GitHub},
publisher = {Mughees Asif},
author = {Asif, Mughees and Nanjangud, Angadh},
year = {2021},
month = {Apr}
}
Asif, M., and Nanjangud, A.. (2021). Deep Reinforcement Learning for Robotic Systems.
Asif, M., & Nanjangud, A.. (2021). Deep Reinforcement Learning for Robotic Systems.
1. Overview
2. Model
2.1. Description
3. Governing Equations of Motion
3.1. Library Imports
3.2. Variable Declaration
3.3. Kinetic and Potential Energy
3.4. The Lagrangian
3.5. The Euler-Lagrange Equations
3.6. Linearisation and Acceleration
4. Proximal Policy Optimisation
4.1. Overview
4.2. Mathematical Model
4.3. Neural Network
4.3.1. Actor
4.3.2. Critic
4.3.3. Agent
4.4. Environment
4.5. Test
5. Conclusion
5.1. Variations in initial angle conditions
SymPy
NumPy
Matplotlib
PyTorch
seeding.py
file locally in the same folder as the notebook.Proximal Policy Optimisation is a deep reinforcement learning algorithm developed by OpenAI. It has proven to be successful in a variety of tasks ranging from enabling robotic systems in complex environments, to developing proficiency in computer gaming by using stochastic mathematical modelling to simulate real-life decision making. For the purposes of this research, the algorithm will be implemented to vertically stablise an inverted double pendulum, which is widely used in industry as a benchmark to validate the veracity of next-generation intelligent algorithms.
Name | Symbol |
---|---|
Mass of the cart | $$m$$ |
Mass of the pendulums | $$M_1 = M_2 = M$$ |
Length of the pendulums | $$l_1 = l_2 = l$$ |
Angle of the first pendulum w.r.t the vertical (CCW+) |
$$\theta$$ |
Angle of the second pendulum w.r.t the first pendulum (CCW+) |
$$\phi$$ |
Moment of inertia for the pendulums | $$I_1 = I_2 = I$$ |
Horizontal cart position | $$x$$ |
Horizontal force applied to the cart | $$u$$ |
Gravitational constant | $$g$$ |
An inverted double pendulum is a characteristic example of a holonomic, non-linear and chaotic mechanical system that relies on the Butterfly Effect: highly dependent on the initial conditions.
# mathematical
from sympy import symbols, Function, cos, sin, Eq, linsolve
from sympy.physics.mechanics import init_vprinting
# computational
import matplotlib.pyplot as plt
import math
import time
import gym
import seeding
import numpy as np
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical
# format mathematical output
init_vprinting()
# initiliase variables
t = symbols('t') # time
m = symbols('m') # mass of the cart
l = symbols('l') # length of the pendulums, l_1 = l_2 = l
M = symbols('M') # mass of the pendulums, M_1 = M_2 = M
I = symbols('I') # moment of inertia
g = symbols('g') # gravitational constant
u = symbols('u') # force applied to the cart (horizontal component)
x = Function('x')(t) # |
Θ = Function('Θ')(t) # | --- functions of time `t`
Φ = Function('Φ')(t) # |
# cart
x_dot = x.diff(t) # velocity
# pendulum(s)
Θ_dot = Θ.diff(t) # |
Θ_ddot = Θ_dot.diff(t) # |
Φ_dot = Φ.diff(t) # |
Φ_ddot = Φ_dot.diff(t) # |
cos_theta = cos(Θ) # | --- experimental parameters
sin_theta = sin(Θ) # |
cos_thetaphi = cos(Θ - Φ) # |
cos_phi = cos(Φ) # |
sin_phi = sin(Φ) # |
# kinetic energy components
# cart - linear
k_1 = m*x_dot**2
# pendulum(s) - angular
k_2 = (M*x_dot*(x_dot + 2*l*Θ_dot*cos_theta) + Θ_dot**2*(M*(l**2)+I))
k_3 = (x_dot**2) + (l**2*Θ_dot**2) + (Φ_dot**2*(M*(l**2)+I) \
+ (M*l*Θ_dot*Φ_dot*cos_thetaphi) + \
(2*M*l*x_dot*((Θ_dot*cos_theta) + (Φ_dot*cos_phi))))
# total kinetic energy
K = 0.5*(k_1 + k_2 + k_3)
print('----\nThe kinetic energy, K, of the system:\n----')
K
---- The kinetic energy, K, of the system: ----
# total potential energy
P = M*g*l*((2*cos_theta) + cos_phi)
print('----\nThe potential energy, P, of the system:\n----')
P
---- The potential energy, P, of the system: ----
The action $S$ of the cart (movement; left, right) is mathematically defined as:
$$\because S = \int_{t_{0}}^{t_{1}} K - P \,dt$$and since, $\mathcal{L} = K - P,$
$$\therefore S = \int_{t_{0}}^{t_{1}} \mathcal{L} \;dt$$where,
\begin{equation*} \boldsymbol{\mathcal{L}=\frac{1}{2}\left[m{\dot{x}}^2+M\dot{x}\left(\dot{x}+2l\dot{\theta{}}\cos{\theta{}}\right)+{\dot{\theta{}}}^2\left(Ml^2+I\right)+{\dot{x}}^2+l^2{\dot{\theta{}}}^2+\\{\dot{\phi{}}}^2\left(Ml^2+I\right)+Ml\dot{\theta{}}\dot{\phi{}}\cos{\left(\theta{}-\phi{}\right)}+2Ml\dot{x}\left(\dot{\theta{}}\cos{\theta{}}+\dot{\phi{}}\cos{\phi{}}\right)\right] - Mgl(2\cos{\theta} + \cos{\phi})} \tag{9} \end{equation*}# the lagrangian
L = K - P
print('----\nThe Lagrangian of the system is:\n----')
L
---- The Lagrangian of the system is: ----
The standard Euler-Lagrange equation is:
$$\frac{d}{dt}\frac{\partial \mathcal{L}}{\partial \dot{x}} - \frac{\partial \mathcal{L}}{\partial x} = 0$$To introduce the generalised force $Q^{P}$ acting on the cart, the Lagrange-D'Alembert Principle is used:
\begin{equation} \frac{d}{dt}\frac{\partial \mathcal{L}}{\partial \dot{x}} - \frac{\partial \mathcal{L}}{\partial x} = Q^{P} \tag{10} \end{equation}Therefore, for a three-dimensional working system, the equations of motion can be derived as:
\begin{equation} \frac{d}{dt}\frac{\partial \mathcal{L}}{\partial \dot{x}} - \frac{\partial \mathcal{L}}{\partial x} = u \tag{11} \end{equation}\begin{equation} \frac{d}{dt}\frac{\partial \mathcal{L}}{\partial \dot{\theta}} - \frac{\partial \mathcal{L}}{\partial \theta} = 0 \tag{12} \end{equation}\begin{equation} \frac{d}{dt}\frac{\partial \mathcal{L}}{\partial \dot{\phi}} - \frac{\partial \mathcal{L}}{\partial \phi} = 0 \tag{13} \end{equation}# euler-lagrange formulation
"""
`expand()`: allows cancellation of like terms
`collect()`: collects common powers of a term in an expression
"""
euler_1 = Eq((L.diff(x_dot).diff(t) - L.diff(x)).simplify().expand().collect(x.diff(t, t)), u)
euler_2 = Eq((L.diff(Θ_dot).diff(t) - L.diff(Θ)).simplify().expand().collect(Θ.diff(t, t)), 0)
euler_3 = Eq((L.diff(Φ_dot).diff(t) - L.diff(Φ)).simplify().expand().collect(Φ.diff(t, t)), 0)
print('----\nThe Euler-Lagrange equations:\n----\n1.\n----')
euler_1
---- The Euler-Lagrange equations: ---- 1. ----
print('----\n2.\n----')
euler_2
---- 2. ----
print('----\n3.\n----')
euler_3
---- 3. ----
$$\sin(\theta)$$ | $$\approx$$ | $$\theta$$ |
$$\cos(\theta)$$ | $$=$$ | $$1$$ |
$$\dot\theta^{2}$$ | $$=$$ | $$0$$ |
$$\sin(\phi)$$ | $$\approx$$ | $$\phi$$ |
$$\cos(\phi)$$ | $$=$$ | $$1$$ |
$$\dot\phi^{2}$$ | $$=$$ | $$0$$ |
$$\sin(\theta - \phi)$$ | $$\approx$$ | $$\theta - \phi$$ |
$$\cos(\theta - \phi)$$ | $$=$$ | $$1$$ |
The pendulum will achieve equilibrium when vertical, i.e. $\theta=0$ & $\phi=0$. Using the above small-angle approximations to simiplify the derived differential equations of motion, and solving for all three accelerations.
# linearise the system
matrix = [
(sin_theta, Θ),
(cos_theta, 1),
(Θ_dot**2, 0),
(sin_phi, Φ),
(cos_phi, 1),
(Φ_dot**2, 0),
(sin(Θ - Φ), Θ - Φ),
(cos(Θ - Φ), 1)
]
linear_1 = euler_1.subs(matrix)
linear_2 = euler_2.subs(matrix)
linear_3 = euler_3.subs(matrix)
print('----\nThe linearised equations are:\n----\n1.\n----')
linear_1
---- The linearised equations are: ---- 1. ----
print('----\n2.\n----')
linear_2
---- 2. ----
print('----\n3.\n----')
linear_3
---- 3. ----
# simplify for linear and angular acceleration
final_equations = linsolve([linear_1, linear_2, linear_3], [x.diff(t, t), Θ.diff(t, t), Φ.diff(t, t)])
x_ddot = final_equations.args[0][0].expand().collect((Θ, Θ_dot, x, x_dot, Φ, Φ_dot, u, M, m, l, I)).simplify()
Θ_ddot = final_equations.args[0][1].expand().collect((Θ, Θ_dot, x, x_dot, Φ, Φ_dot, u, M, m, l, I)).simplify()
Φ_ddot = final_equations.args[0][2].expand().collect((Θ, Θ_dot, x, x_dot, Φ, Φ_dot, u, M, m, l, I)).simplify()
print('----\nAcceleration of the cart:\n----')
x_ddot
---- Acceleration of the cart: ----
print('----\nAcceleration of the first pendulum:\n----')
Θ_ddot
---- Acceleration of the first pendulum: ----
print('----\nAcceleration of the second pendulum:\n----')
Φ_ddot
---- Acceleration of the second pendulum: ----
1Referenced from OpenAI
- $r_t(\theta)\:\hat{A}^t$: Surrogate objective is the probability ratio between a new policy network and an older policy network.
- $\epsilon$: Hyper-parameter; usually with a value of 0.2.
- clip$(r_t(\theta),\:\:1-\epsilon,\:\:1+\epsilon)\:\hat{A}^t$: Clipped version of the surrogate objective, where the probability ratio is truncated.
$c_1L^{VF}(\theta)$: Determines desirability of the current state.
$c_2S[\pi_\theta](s_t)$: The entropy term using Gaussian Distribution.
1Referenced from Rich Sutton et al.
The input is a representation of the current state. One output is a probability distribution over moves — the actor. The other output represents the expected return from the current position — the critic.3
Actor-critic methods exhibit two significant advantages2:
1Code referenced from Machine Learning With Phil
2Referenced from Deep Learning and the Game of Go
3Referenced from Actor-Critic Methods
The “Actor” updates the policy distribution in the direction suggested by the Critic (such as with policy gradients).1
1Referenced from Understanding Actor Critic Methods and A2C
class ActorNetwork(nn.Module):
# constructor
def __init__(self, num_actions, input_dimensions, learning_rate_alpha,
fully_connected_layer_1_dimensions=256, fully_connected_layer_2_dimensions=256):
# call super-constructor
super(ActorNetwork, self).__init__()
# neural network setup
self.actor = nn.Sequential(
# linear layers unpack input_dimensions
nn.Linear(*input_dimensions, fully_connected_layer_1_dimensions),
# ReLU: applies the rectified linear unit function element-wise
nn.ReLU(),
nn.Linear(fully_connected_layer_1_dimensions, fully_connected_layer_2_dimensions),
nn.ReLU(),
nn.Linear(fully_connected_layer_2_dimensions, num_actions),
# softmax activation function: a mathematical function that converts a vector of numbers
# into a vector of probabilities, where the probabilities of each value are proportional to the
# relative scale of each value in the vector.
nn.Softmax(dim=-1)
)
# optimizer: an optimization algorithm that can be used instead of the classical stochastic
# gradient descent procedure to update network weights iterative based in training data
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate_alpha)
# handle type of device
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
# pass state forward through the NN: calculate series of probabilities to draw from a distribution
# to get actual action. Use action to get log probabilities for the calculation of the two probablities
# for the learning function
def forward(self, state):
dist = self.actor(state)
dist = Categorical(dist)
return dist
The “Critic” estimates the value function. This could be the action-value (the Q value) or state-value (the V value).1
1Referenced from Understanding Actor Critic Methods and A2C
# [NOTE: See the above comments in the `ActorNetwork` for individual function explanation]
class CriticNetwork(nn.Module):
def __init__(self, input_dimensions, learning_rate_alpha, fully_connected_layer_1_dimensions=256,
fully_connected_layer_2_dimensions=256):
super(CriticNetwork, self).__init__()
self.critic = nn.Sequential(
nn.Linear(*input_dimensions, fully_connected_layer_1_dimensions),
nn.ReLU(),
nn.Linear(fully_connected_layer_1_dimensions, fully_connected_layer_2_dimensions),
nn.ReLU(),
nn.Linear(fully_connected_layer_2_dimensions, 1)
)
# same learning rate for both actor & critic -> actor is much more sensitive to the changes in the underlying
# parameters
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate_alpha)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, state):
value = self.critic(state)
return value
The agent is the component that makes the decision of what action to take.
The agent is allowed to use any observation from the environment, and any internal rules that it has. Those internal rules can be anything, but typically, it expects the current state to be provided by the environment, for that state to utilise the Markov chain, and then it processes that state using a policy function that decides what action to take.
In addition, to handle a reward signal (received from the environment) and optimise the agent towards maximising the expected reward in future, the agent will maintain some data which is influenced by the rewards it received in the past, and use that to construct a better policy.
1Referenced from Neil Slater
# storage class
class ExperienceCollector:
# constructor - init values to empty lists
def __init__(self, batch_size):
self.states_encountered = []
self.probability = []
self.values = []
self.actions = []
self.rewards = []
self.terminal_flag = []
self.batch_size = batch_size
# generate batches - defines the number of samples that will be propagated through the network
def generate_batches(self):
num_states = len(self.states_encountered)
batch_start = np.arange(0, num_states, self.batch_size)
idx = np.arange(num_states, dtype=np.int64)
np.random.shuffle(idx) # shuffle to handle stochastic gradient descent
batches = [idx[i:i+self.batch_size] for i in batch_start]
# NOTE: maintain return order
return np.array(self.states_encountered),\
np.array(self.actions),\
np.array(self.probability),\
np.array(self.values),\
np.array(self.rewards),\
np.array(self.terminal_flag),\
batches
# store results from previous state
def memory_storage(self, states_encountered, action, probability, values, reward, terminal_flag):
self.states_encountered.append(states_encountered)
self.actions.append(action)
self.probability.append(probability)
self.values.append(values)
self.rewards.append(reward)
self.terminal_flag.append(terminal_flag)
# clear memory after retrieving state
def memory_clear(self):
self.states_encountered = []
self.probability = []
self.actions = []
self.rewards = []
self.terminal_flag = []
self.values = []
# defines the agent
class Agent:
def __init__(self, num_actions, input_dimensions, gamma=0.99, learning_rate_alpha=3e-4, gae_lambda=0.95,
policy_clip=0.2, batch_size=64, num_epochs=10):
# save parameters
self.gamma = gamma
self.policy_clip = policy_clip
self.num_epochs = num_epochs
self.gae_lambda = gae_lambda
self.actor = ActorNetwork(num_actions, input_dimensions, learning_rate_alpha)
self.critic = CriticNetwork(input_dimensions, learning_rate_alpha)
self.memory = ExperienceCollector(batch_size)
# store memory; interface function
def interface_agent_memory(self, state, action, probability, values, reward, terminal_flag):
self.memory.memory_storage(state, action, probability, values, reward, terminal_flag)
# choosing an action
def action_choice(self, observation):
# convert numpy array to a tensor
state = T.tensor([observation], dtype=T.float).to(self.actor.device)
# distribution for choosing an action
dist = self.actor(state)
# value of the state
value = self.critic(state)
# sample distribution to get action
action = dist.sample()
# squeeze to eliminate batch dimensions
probability = T.squeeze(dist.log_prob(action)).item()
action = T.squeeze(action).item()
value = T.squeeze(value).item()
return action, probability, value
# learning from actions
def learn(self):
# iterate over the number of epochs
for _ in range(self.num_epochs):
state_array, action_array, old_probability_array, values_array,\
reward_array, terminal_flag_array, batches = \
self.memory.generate_batches()
values = values_array
# advantage
advantage = np.zeros(len(reward_array), dtype=np.float32)
# calculate advantage
for time_step in range(len(reward_array)-1):
discount = 1
advantage_time_step = 0
# from Schulman paper -> the advantage function
for k in range(time_step, len(reward_array)-1):
advantage_time_step += discount*(reward_array[k] + self.gamma*values[k+1]*\
(1-int(terminal_flag_array[k])) - values[k])
# multiplicative factor
discount *= self.gamma*self.gae_lambda
advantage[time_step] = advantage_time_step
# turn advantage into tensor
advantage = T.tensor(advantage).to(self.actor.device)
# convert values to a tensor
values = T.tensor(values).to(self.actor.device)
for batch in batches:
states = T.tensor(state_array[batch], dtype=T.float).to(self.actor.device)
old_probability = T.tensor(old_probability_array[batch]).to(self.actor.device)
actions = T.tensor(action_array[batch]).to(self.actor.device)
# pi(theta)_new: take states and pass to Actor to get the distribution for new probability
dist = self.actor(states)
critic_value = self.critic(states)
# new values of the state according to the Critic network
critic_value = T.squeeze(critic_value)
# calculate new probability
new_probability = dist.log_prob(actions)
# probability ratio; probabilities taken as exponential to get ratio
probability_ratio = new_probability.exp() / old_probability.exp()
# prob_ratio = (new_probs - old_probs).exp()
weighted_probability = advantage[batch] * probability_ratio
weighted_clipped_probability = T.clamp(probability_ratio, 1-self.policy_clip,
1+self.policy_clip)*advantage[batch]
# negative due to gradient ascent
actor_loss = -T.min(weighted_probability, weighted_clipped_probability).mean()
returns = advantage[batch] + values[batch]
critic_loss = (returns-critic_value)**2
critic_loss = critic_loss.mean()
total_loss = actor_loss + 0.5*critic_loss
# zero the gradients
self.actor.optimizer.zero_grad()
self.critic.optimizer.zero_grad()
# backpropagate total loss
total_loss.backward()
self.actor.optimizer.step()
self.critic.optimizer.step()
# at end of epochs clear memory
self.memory.memory_clear()
The environment is a modelled as a stochastic finite state machine with inputs (actions sent from the agent) and outputs (observations and rewards sent to the agent).1
The agent-environment boundary can be located at different places for different purposes. In practice, the agent-environment boundary is determined once one has selected particular states, actions, and rewards, and thus has identified a specific decision-making task of interest.2
1Referenced from A Brief Introduction to Reinforcement Learning
2Referenced from Reinforcement Learning:
An Introduction
The simplified acceleration of each component from Section 3.6: Linerisation and Acceleration:
\begin{equation} \ddot{x}=\frac{\begin{array}{l}-M^2{gl}^2\left[\theta{}\left(4I+Ml\left(4l-1\right)\right)+\phi{}\left(I+Ml\left(l-1\right)+l^2\right)\right]\\+u\left(I^2+Il^2\left(2M+1\right)+Ml^2\left(Ml^2-\frac{1}{4}M+l^2\right)\right)\end{array}}{X} \tag{2} \end{equation}\begin{equation} \ddot{\theta{}}=\frac{Ml\left[\begin{array}{l}-2Mgl\left(\frac{1}{4}\left(M+m+1\right)-Ml\right)\phi{}\\+2g\left(I\left(M+m+1\right)+Ml^2(m+1\right)\theta{}\\-2u\left(I+Ml\left(l-\frac{1}{4}\right)\right)\end{array}\right]}{X} \tag{3} \end{equation}\begin{equation} \ddot{\phi{}}=\frac{Ml\left[\begin{array}{l}-Mgl\left(-M\left(4l-1\right)+m+1\right)\theta{}\\+g\left(I\left(M+m+1\right)+Ml^2\left(m-3M+\left(\frac{m+1}{M}\right)+2\right)\right)\phi{}\\-u\left(I(I+1)+Ml\left(l-1\right)\right)\end{array}\right]}{X} \tag{4} \end{equation}where, \begin{equation} X=I\left[I+l^2+m\left(I+l^2\right)\right]-M^2l^2\left[M\left(2l\left(2l-1\right)+\frac{1}{4}\right)\\+\left(3I-l^2\left(m-1\right)+\frac{1}{4}\left(m+1\right)\right)\right]+M\left(I^2+l^2\left[I\left(2m+3\right)+l^2(m+1)\right]\right) \end{equation}
"""
Adapted from the classic cart-pole system implemented by Rich Sutton et al.
Original `CartPole-v0` environment available here: http://incompleteideas.net/sutton/book/code/pole.c
Permalink: https://perma.cc/C9ZM-652R
"""
class DoubleInvertedPendulum(gym.Env):
def __init__(self):
self.gravity = 9.81 # m/s^2
self.mass_cart = 1.0 # kg
self.masspole_1 = 0.1 # kg
self.masspole_2 = 0.1 # kg
self.mass_pole = (self.masspole_1 + self.masspole_2) # kg
self.lengthpole_1 = 0.25 # m
self.lengthpole_2 = 0.25 # m
self.length = (self.lengthpole_1 + self.lengthpole_2) # m
self.force = 10.0 # N
self.moment_inertia = (self.mass_pole*self.length**2) / 12 # kgm^2
self.tau = 0.02 # s
self.kinematics_integrator = 'euler'
self.theta = 0.05
self.phi = 0.05
# angle at which to fail the episode - ~ 0.2 rad, ~12 deg
self.theta_threshold_radians = 12 * 2 * math.pi / 360
self.phi_threshold_radians = 12 * 2 * math.pi / 360
# distance of cart to fail episode
self.x_threshold = 2.4
# angle limit set to 2 * theta_threshold_radians so failing observation
# is still within bounds.
high = np.array([self.x_threshold * 2,
np.finfo(np.float32).max,
self.theta_threshold_radians * 2,
np.finfo(np.float32).max,
self.phi_threshold_radians * 2,
np.finfo(np.float32).max],
dtype=np.float32)
self.action_space = gym.spaces.Discrete(2)
self.observation_space = gym.spaces.Box(-high, high, dtype=np.float32)
self.seed()
self.viewer = None
self.state = None
self.steps_beyond_done = None
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, action):
err_msg = "%r (%s) invalid" % (action, type(action))
assert self.action_space.contains(action), err_msg
x, x_dot, theta, theta_dot, phi, phi_dot = self.state
force = self.force if action == 1 else -self.force
#################################
# denominator
denominator_X = (self.moment_inertia**2) + (self.moment_inertia*self.length**2) + \
(self.moment_inertia*self.mass_cart*(self.moment_inertia+self.length**2)) - \
(self.mass_pole**3*self.length**2*(4*self.length**2-2*self.length+0.25)) - \
(self.mass_pole**2*self.length**2*(3*self.moment_inertia-self.length**2*self.mass_cart-self.length**2\
+0.25*self.mass_cart+0.25)) + \
(self.mass_pole*(self.moment_inertia**2+2*self.moment_inertia*self.length**2*self.mass_cart\
+3*self.moment_inertia*self.length**2+self.length**4*self.mass_cart+self.length**4))
# x-acceleration numerator
numerator_x_acc = (force*(self.moment_inertia**2+2*self.moment_inertia*self.mass_pole*self.length**2\
+self.moment_inertia*self.length**2+ \
self.mass_pole**2*self.length**4-0.25*self.mass_pole**2*self.length**2\
+self.mass_pole*self.length**4)) - \
(self.mass_pole**2*self.gravity*self.length**2*(4*self.moment_inertia+4*self.mass_pole**2\
-self.mass_pole*self.length)*self.theta) - \
(self.mass_pole**2*self.gravity*self.length**2*(self.moment_inertia+self.mass_pole*self.length**2\
-self.mass_pole*self.length+self.length**2)*self.phi)
# x-acceleration
x_acc = numerator_x_acc / denominator_X
# Θ-acceleration numerator
numerator_theta_acc = (self.mass_pole*self.length*(-self.mass_pole*self.gravity*self.length*\
(-2*self.mass_pole*self.length+0.5*self.mass_pole\
+0.5*self.mass_cart+0.2)*self.phi) + \
2*self.gravity*(self.moment_inertia*self.mass_pole+self.moment_inertia*self.mass_cart\
+self.moment_inertia+\
self.mass_pole*self.mass_cart*self.length**2+self.mass_pole*self.length**2)\
*self.theta - \
force*(2*self.moment_inertia+2*self.mass_pole*self.length**2-\
0.5*self.mass_pole*self.length))
# Θ-acceleration
theta_acc = numerator_theta_acc / denominator_X
# Φ-acceleration numerator
numerator_phi_acc = (self.mass_pole*self.length*(-self.mass_pole*self.gravity*self.length*\
(-4*self.mass_pole*self.length+self.mass_pole+self.mass_cart+1)\
*self.theta) + \
(self.gravity*(self.moment_inertia*self.mass_pole+self.moment_inertia*self.mass_cart\
+self.moment_inertia-3*self.mass_pole**2*self.length**2+\
self.mass_pole*self.length**2*self.mass_cart+2*self.mass_pole*self.length**2\
+self.length**2*self.mass_cart+self.length**2)*self.phi - \
force*(self.moment_inertia+self.mass_pole*self.length**2-self.mass_pole*self.length\
+self.moment_inertia**2)))
# Φ-acceleration
phi_acc = numerator_phi_acc / denominator_X
#################################
if self.kinematics_integrator == 'euler':
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * x_acc
theta = theta + self.tau * theta_dot
phi = phi + self.tau * phi_dot
theta_dot = theta_dot + self.tau * theta_acc
phi_dot = phi_dot + self.tau * phi_acc
else: # semi-implicit euler
x_dot = x_dot + self.tau * x_acc
x = x + self.tau * x_dot
theta_dot = theta_dot + self.tau * theta_acc
theta = theta + self.tau * theta_dot
phi_dot = phi_dot + self.tau * phi_acc
phi = phi + self.tau * phi_dot
self.state = (x, x_dot, theta, theta_dot, phi, phi_dot)
done = bool(
x < -self.x_threshold
or x > self.x_threshold
or theta < -self.theta_threshold_radians
or theta > self.theta_threshold_radians
or phi < -self.phi_threshold_radians
or phi > self.phi_threshold_radians
)
if not done:
reward = 1.0
elif self.steps_beyond_done is None:
self.steps_beyond_done = 0
reward = 1.0
else:
self.steps_beyond_done += 1
reward = 0.0
return np.array(self.state), reward, done, {}
def reset(self):
self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(6,))
self.steps_beyond_done = None
return np.array(self.state)
# utility function to measure time taken by each test run
def exec_time(start, end):
diff_time = end - start
m, s = divmod(diff_time, 60)
h, m = divmod(m, 60)
s,m,h = int(round(s, 0)), int(round(m, 0)), int(round(h, 0))
print("Execution Time: " + "{0:02d}:{1:02d}:{2:02d}".format(h, m, s))
# start time of each test run
start = time.time()
# create environment
env = DoubleInvertedPendulum()
# number of samples processed before the model is updated
batch_size = 5
# a full training pass over the entire dataset such that each example has been seen once
num_epochs = 4
# controls the rate or speed at which the model learns
learning_rate_alpha = 3e-4
# create agent
agent = Agent(num_actions=env.action_space.n, batch_size=batch_size,
learning_rate_alpha=learning_rate_alpha, num_epochs=num_epochs,
input_dimensions=env.observation_space.shape)
# number of games
num_games = 100
# track best score: minimum score for the environment
best_score = env.reward_range[0]
# record score history
score_history = []
# a reward function that informs the agent how well its current actions and states are doing
learn_iters = 0
# track average score
average_score = 0
# number of steps means using one batch size of training data to train the model
num_steps = 0
for i in range(num_games):
observation = env.reset()
terminal_flag = False
score = 0
while not terminal_flag:
# choose action based on the current state of the environment
action, probability, value = agent.action_choice(observation)
# get information back from environment
observation_, reward, terminal_flag, info = env.step(action)
# update step
num_steps += 1
# update score based on current reward
score += reward
# store transition in the agent memory
agent.interface_agent_memory(observation, action, probability, value, reward, terminal_flag)
if num_steps % num_steps == 0:
agent.learn()
learn_iters += 1
observation = observation_
score_history.append(score)
average_score = np.mean(score_history[-100:])
if average_score > best_score:
best_score = average_score
# format output
if i+1 < 10:
print('episode: ', i+1, ' | score: %.0f' % score)
elif i+1 < 100:
print('episode: ', i+1, ' | score: %.0f' % score)
else:
print('episode: ', i+1, '| score: %.0f' % score)
episodes = [i+1 for i in range(len(score_history))]
# end time of each test run
end = time.time()
# display time taken by test run
print('---------')
exec_time(start,end)
print('---------')
print(f"Total reward for {num_games} unsupervised episodes: {sum(score_history)}")
print('---------')
# visualise learning
def plot_learning_curve(episode, scores):
running_avg = np.zeros(len(scores))
for i in range(len(running_avg)):
running_avg[i] = np.mean(scores[max(0, i-100):(i+1)])
plt.plot(episode, running_avg)
plt.title(f"Unsupervised learning for {num_games} games", fontweight='bold')
plt.xlabel('No. of games', fontsize=11)
plt.ylabel('Average reward / episode', fontsize=11)
plot_learning_curve(episodes, score_history)
Various starting angle combinations were tried. Additionally, different parameters were modulated to observe the agent's behaviour.
Parameter | Value |
---|---|
gravitational constant | 9.81 m/s2 |
mass of the cart | 1 kg |
mass of the pendulum(s) | 0.1 kg |
length of the pendulum(s) | 0.25 m |
force | 10 N |
moment of inertia | 3.33e-3 |
$$\theta$$ (rad) | $$\phi$$ (rad) | Total reward | Total time $$(min)$$ | Average Accuracy $$(\%)$$ |
---|---|---|---|---|
0.05 | 0.05 | 72,283 | 29.14 | 72.10 |
0.10 | 0.10 | 62,919 | 27.45 | 62.86 |
0.15 | 0.15 | 56,180 | 17.28 | 56.57 |
0.05 | 0.10 | 62,744 | 22.12 | 62.39 |
0.10 | 0.05 | 72,783 | 23.40 | 72.84 |
0.00 | 0.15 | 55,859 | 17.44 | 55.74 |
0.15 | 0.00 | 91,709 | 28.11 | 91.91 |