This example shows adjoint-based gradient computation using the checkpointing_schedules package. We initially define the adjoint-based gradient problem and then present the forward and adjoint solvers prescribed by the checkpointing_schedules package.
Let us consider a one-dimensional (1D) problem aiming to compute the gradient/sensitivity of the kinetic energy at a time $t=\tau$ with respect to an initial condition $u_0$. Then, our functional can be expressed as: $$ I(u(x, \tau, u_0)) = \int_\Omega \frac{1}{2} u(x, \tau, u_0)u(x, \tau, u_0) \, dx. \tag{1} $$
In the current case, the velocity variable $u = u(x, t)$ is governed by the 1D viscous Burgers equation, a non-linear equation for the advection and diffusion of momentum:
$$ \frac{\partial u}{\partial t} + u \frac{\partial u}{\partial x} - \nu \frac{\partial^2 u}{\partial x^2} = 0. \tag{2} $$Here, $x \in [0, L]$ is the space variable, and $t \in \mathbb{R}^{+}$ represents the time variable. The boundary condition is $u(0, t) = u(L, t) = 0$, where $L$ is the length of the 1D domain. The initial condition is given by $u_0 = \sin(\pi x)$.
The control parameter is the initial condition $u_0$. Hence, the objective is to compute the adjoint-based gradient of the functional $I(u_0)$ with respect to $u_0$.
This example uses the discrete adjoint formulation, meaning the adjoint system is derived after discretization of the forward problem.
We seek to compute the sensitivity/gradient $\mathrm{d}\widehat{I}/\mathrm{d}u_0$, where $\widehat{I} = I(u_0)$ is the reduced functional. On applying the chain rule, we have the below expression on considering the functional $I$ as in Eq. (1). $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d}u_0} = \frac{\partial I}{\partial u} \frac{\mathrm{d}u}{\mathrm{d}u_0}. \tag{3}$$ Computing $\partial J/\partial u$ is straightforward since the functional (Eq. (1)) is written in terms of the solution variable $u$. In contrast, the Jacobian $\mathrm{d}u/\mathrm{d}u_0$ involves an expensive computational operation that is avoided by using the adjoint method.
The strategy employed for the adjoint derivation is based on [1] (section 2.3.3), in which the relation between the functional $I$ and the forward equation $F(u, u_0)$ is reached by taking the total derivative of $F(u, u_0)$ with respect to $u_0$, yielding in a relationship that provides the Jacobian $\mathrm{d}u/\mathrm{d}u_0$, as shown below. \begin{align*} \frac{\mathrm{d}}{\mathrm{d}u_0} F(u, u_0) &= \frac{\partial F(u, u_0)}{\partial u} \frac{\mathrm{d}u}{\mathrm{d}u_0} + \frac{\partial F(u, u_0)}{\partial u_0} = 0 \\ \\ \implies \frac{\mathrm{d}u}{\mathrm{d}u_0} &= - \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{-1} \frac{\partial F(u, u_0)}{\partial u_0}. \end{align*} Substituting $\mathrm{d}u/\mathrm{d}u_0$ into Eq. (3), we have: $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d}u_0} = - \frac{\partial I}{\partial u} \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{-1} \frac{\partial F(u, u_0)}{\partial u_0}.$$
Taking the adjoint (Hermitian transpose) of the above equation: $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d}u_0}^* = - \frac{\partial F}{\partial u_0}^* \frac{\partial F}{\partial u}^{-*} \frac{\partial I}{\partial u}^{*}$$
Next, we define: $$\lambda = \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{-*} \frac{\partial I}{\partial u}^* \implies \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{*} \lambda = \frac{\partial I}{\partial u}^*,$$ which is the adjoint system. Finally, we obtain the following expression for the adjoint-based gradient computation: $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d} u_0}^* = - \frac{\partial F}{\partial u_0}^* \lambda.$$ In this current case, Burger's equation is written in weak form and discretized in time with the backward finite difference method, we have: $$ F(u^{n+1}, u^n, v) = \int_{\Omega} u^{n + 1} \cdot v - u^{n} \cdot v + \Delta t \left(u^{n + 1} \frac{\partial u^{n + 1}}{\partial x} \cdot v + \nu \frac{\partial u^{n + 1}}{\partial x} \cdot \frac{\partial v}{\partial x}\right) \, dx = 0 \quad \forall v\in V \tag{4},$$ where $v \in V$ is an arbitrary test function.
The time sequence of the adjoint system for the functional (Eq. (1)) is given by: $$ \begin{align*} \lambda^{N+1} &= \frac{\partial I}{\partial u^{N + 1}}^{\ast}\\ \\ \frac{\partial F}{\partial u^{N + 1}}^* \lambda^{N} &= \lambda^{N+1} \\ \frac{\partial F}{\partial u^{N}}^* \lambda^{N - 1} &= \lambda^{N} \\ \vdots \\ \frac{\partial F}{\partial u^{1}}^* \lambda^{0} &= \lambda^{1} \end{align*} \tag{5} $$ where $\lambda^{n}$ is the adjoint variable at time step $n$.
We define a BurgersSolver
class to compute the forward
and adjoint
solutions, and to handle storing of forward and adjoint data.
The Burger's equation is discretised using the Finite Element Method (FEM). We use continuous first-order Lagrange basis functions to discretise the spatial domain. BurgersSolver
uses the SymPy Python library to assemble the matrices from the weak form of the Burger's equation.
import numpy as np
import os
import tempfile
from scipy.sparse.linalg import spsolve
from scipy.sparse import lil_matrix
import copy
from sympy import diff, integrate, symbols, lambdify
from sympy.matrices import SparseMatrix, Matrix
from checkpoint_schedules import StorageType
class BurgersSolver:
"""A solver for the forward and adjoint one dimensional Burger's equation.
Parameters
----------
model : dict
The model parameters containing the essential information to solve
the Burger's equation.
init_condition : array
The initial condition used to solve the forward Burger's equation.
mesh : array
The spatial mesh.
"""
def __init__(self, model, forward_initial_condition, mesh):
self.model = dict(model)
self.mesh = mesh
self.forward_work_memory = {StorageType.WORK: {}}
self.forward_work_memory[StorageType.WORK][0] = copy.deepcopy(forward_initial_condition)
self.forward_final_solution = None
self.initial_condition = copy.deepcopy(forward_initial_condition)
self.adjoint_work_memory = {StorageType.WORK: {}}
self.restart_forward = {StorageType.RAM: {}, StorageType.DISK: {}}
self.adjoint_dependency = {StorageType.WORK: {}, StorageType.RAM: {}, StorageType.DISK: {}}
self.mode = "forward"
self._initialize_matrices()
def forward(self, n0, n1, storage=None, write_adj_deps=False, write_ics=False):
"""Advance the forward solver.
Parameters
----------
n0 : int
Initial time step.
n1 : int
Final time step.
storage : StorageType, optional
The storage type, which can be StorageType.RAM, StorageType.DISK,
StorageType.WORK, or StorageType.NONE.
write_adj_deps : bool, optional
Whether the adjoint dependency data will be stored.
write_ics : bool, optional
Whether the forward restart data will be stored.
"""
# Get the initial condition
u = self.forward_work_memory[StorageType.WORK].pop(n0)
if write_ics:
self._store_data(u, n0, storage, write_adj_deps, write_ics)
for step in range(n0, min(n1, self.model["max_n"])):
if write_adj_deps:
self._store_data(u, step, storage, write_adj_deps, write_ics)
def _residual(u_new):
# C is a vector containing the nonlinear term.
C = np.array(self._C(*u_new))
residual = (self._M + self.model["dt"] * self._K).dot(u_new) \
+ self.model["dt"] * C - self._M.dot(u)
residual[0] = u_new[0]
residual[-1] = u_new[-1]
return residual
u_new = self._solve_newton(u, _residual)
u = copy.deepcopy(u_new)
step += 1
if step == self.model["max_n"]:
self.forward_final_solution = copy.deepcopy(u_new)
if write_adj_deps:
self.adjoint_dependency[StorageType.WORK][step] = copy.deepcopy(u_new)
if (not write_adj_deps
or (self.mode == "forward" and step < (self.model["max_n"]))
):
self.forward_work_memory[StorageType.WORK][step] = copy.deepcopy(u_new)
def adjoint(self, n0, n1, clear_adj_deps):
"""Advance the adjoint solver.
Parameters
---------
n0 : int
Initial time step.
n1 : int
Final time step.
clear_adj_deps : bool
If `True`, the adjoint dependency data will be cleared.
"""
self.mode = "adjoint"
if n1 == self.model["max_n"]:
self._initialize_adjoint()
u_adj = self.adjoint_work_memory[StorageType.WORK].pop(n1)
M = copy.deepcopy(self._M)
M[0, 1] = M[-1, -2] = 0
M[0, 0] = M[-1, -1] = 1
for step in range(n1, n0, - 1):
J_T = self._jacobian(self.adjoint_dependency[StorageType.WORK][step]).T
u_adj_coef = spsolve(J_T, u_adj)
u_adj_coef[0] = u_adj_coef[-1] = 0
u_adj_new = M.T.dot(u_adj_coef)
u_adj = copy.deepcopy(u_adj_new)
if clear_adj_deps:
del self.adjoint_dependency[StorageType.WORK][step]
step -= 1
self.adjoint_work_memory[StorageType.WORK][step] = copy.deepcopy(u_adj_new)
def gradient(self):
"""Compute the adjoint-based gradient.
Returns
-------
array
The adjoint-based gradient.
"""
return self.adjoint_work_memory[StorageType.WORK][0]
def functional(self):
"""Compute the functional.
"""
h = self.model["lx"] / (self.model["nx"] - 1)
return 0.5 * np.sum(self.forward_final_solution ** 2) * h
def copy_data(self, step, from_storage, to_storage, move=False):
"""Copy data from one storage to another.
Parameters
----------
step : int
The time step.
from_storage : StorageType
The storage type from which the data will be copied.
to_storage : StorageType
The storage type to which the data will be copied.
move : bool, optional
Whether the data will be moved or not. If `True`, the data will be
removed from the `from_storage`.
"""
if from_storage == StorageType.DISK:
if step in self.adjoint_dependency[StorageType.DISK]:
file_name = self.adjoint_dependency[StorageType.DISK][step]
with open(file_name, "rb") as f:
self.adjoint_dependency[to_storage][step] = np.load(f)
if step in self.restart_forward[StorageType.DISK]:
file_name = self.restart_forward[StorageType.DISK][step]
with open(file_name, "rb") as f:
self.forward_work_memory[to_storage][step] = np.load(f)
if move:
os.remove(file_name)
elif from_storage == StorageType.RAM:
self.forward_work_memory[to_storage][step] = \
copy.deepcopy(self.restart_forward[from_storage][step])
if move:
if step in self.adjoint_dependency[from_storage]:
del self.adjoint_dependency[from_storage][step]
if step in self.restart_forward[from_storage]:
del self.restart_forward[from_storage][step]
else:
raise ValueError("This `StorageType` is not supported.")
def _initialize_adjoint(self):
self.adjoint_work_memory[StorageType.WORK][self.model["max_n"]] = self._functional_derivative()
def _functional_derivative(self):
phi = self._basis_function()
# v = φ_0 + φ_1.
v = phi[0] + phi[1] # Test function.
h = self.model["lx"] / (self.model["nx"] - 1)
dI_du = np.zeros(self.model["nx"])
for i in range(1, self.model["nx"] - 1):
# dI_du = h * ∫ u * v d η, η = 0 to 1.
dI_du[i] = h * integrate(self.forward_final_solution[i] * v, ("eta", 0, 1))
return dI_du
def _initialize_matrices(self):
# Initialize the mass and stiffness matrices.
self._M = self._mass_matrix()
self._K = self._stiffness_matrix()
u = symbols("u_:{0}".format(len(self.initial_condition)))
# Initialize the advection matrix.
self._C = lambdify(u, self._advection_matrix_action(u)[:], 'numpy')
# Initialize the Jacobian of the advection matrix.
self._J_C = lambdify(u, self._jacobian_advection(u), 'numpy')
def _jacobian(self, u):
C_J = lil_matrix(self._J_C(*u)) # The Jacobian of the advection matrix.
Jac = self._M + self.model["dt"] * (self._K + C_J)
Jac[0, 1] = Jac[-1, -2] = 0
Jac[0, 0] = Jac[-1, -1] = 1
return Jac
def _solve_newton(self, u_prev, residual, tol=1e-8, max_iter=50):
i = 0
u = copy.deepcopy(u_prev)
u[0] = u[-1] = 0
while i < max_iter:
Jac = self._jacobian(u)
delta_u = spsolve(Jac, residual(u))
u_new = u - delta_u
u = copy.deepcopy(u_new)
i += 1
if np.linalg.norm(delta_u) < tol:
break
if i == max_iter:
print("Newton's method did not converge in {} iterations.".format(max_iter))
return u_new
def _basis_function(self):
eta = symbols("eta")
# φ_0 = 1 - η, φ_1 = η.
# φ_0 and φ_1 are the basis functions.
# η = (x - x0) / h, where h is the mesh spacing.
return [1 - eta, eta]
def _mass_matrix(self):
h = self.model["lx"] / (self.model["nx"] - 1)
phi = self._basis_function()
M_local = np.zeros((2, 2))
for i in range(2):
for j in range(2):
# M_(i,j) = h * ∫ φ_i * φ_j dη, η = 0 to 1.
M_local[i, j] = h * integrate(phi[i] * phi[j], ("eta", 0, 1))
return self._assemble_global_matrix(M_local)
def _assemble_global_matrix(self, local_matrix):
num_nodes = self.model["nx"]
global_matrix = lil_matrix((num_nodes, num_nodes))
global_matrix[0, 0] = local_matrix[0, 0]
global_matrix[num_nodes - 1, num_nodes - 1] = local_matrix[1, 1]
global_matrix[0, 1] = local_matrix[0, 1]
global_matrix[num_nodes - 1, num_nodes - 2] = local_matrix[1, 0]
for i in range(1, num_nodes - 1):
global_matrix[i, i - 1] = local_matrix[1, 0]
global_matrix[i, i + 1] = local_matrix[0, 1]
global_matrix[i, i] = local_matrix[1, 1] + local_matrix[0, 0]
return global_matrix
def _stiffness_matrix(self):
# 1D mesh is uniform. Thus, the mesh spacing is constant.
h = self.model["lx"] / (self.model["nx"] - 1)
b = self.model["nu"] / h
phi = self._basis_function()
dphi_deta = [diff(phi[i], "eta") for i in range(2)]
K_local = np.zeros((2, 2))
for i in range(2):
for j in range(2):
# K_(i,j) = μ/h * ∫ dφ_i/dη * dφ_j/dη dη, η = 0 to 1.
K_local[i, j] = b * integrate(dphi_deta[i] * dphi_deta[j], ("eta", 0, 1))
return self._assemble_global_matrix(K_local)
def _advection_matrix_action(self, u):
num_nodes = self.model["nx"]
# u0 and u1 are the coefficients of the basis functions at the nodes.
u0, u1 = symbols("u0 u1")
coefficients = Matrix([u0, u1])
phi = self._basis_function()
u_ = phi[0] * u0 + phi[1] * u1
dphi_deta = [diff(phi[i], "eta") for i in range(2)]
# nonlinear_term = u * du/dη, where u = φ_0 * u0 + φ_1 * u1
# and du/dη = dφ_0/dη * u0 + dφ_1/dη * u1.
nonlinear_term = u_ * coefficients.dot(dphi_deta)
# local_vector_i = ∫ (u * du/dη) * φ_i dη, η = 0 to 1.
local_vector = Matrix([integrate(nonlinear_term * phi[i], ("eta", 0, 1)) for i in range(2)])
local_vector_func = lambdify((u0, u1), local_vector, 'numpy')
C = Matrix.zeros(1, num_nodes)
C[0] = local_vector_func(u[0], u[1])[0]
C[num_nodes - 1] = local_vector_func(u[num_nodes - 2], u[num_nodes - 1])[1]
for i in range(1, num_nodes - 1):
C[i] = local_vector_func(u[i], u[i + 1])[0] + local_vector_func(u[i - 1], u[i])[1]
return C
def _jacobian_advection(self, u):
num_nodes = self.model["nx"]
# u0 and u1 are the coefficients of the basis functions at the nodes.
u0, u1 = symbols("u0 u1")
phi = self._basis_function()
coefficients = Matrix([u0, u1])
dphi_deta = [diff(phi[i], "eta") for i in range(2)]
# u_ = φ_0 * u0 + φ_1 * u1.
u_ = phi[0] * u0 + phi[1] * u1
Jc = SparseMatrix.zeros(num_nodes, num_nodes)
local_matrix = Matrix.zeros(2, 2)
for i in range(2):
for j in range(2):
# linearised term_0 = φ_j * du/dη, where du/dη = dφ_0/dη * u0 + dφ_1/dη * u1.
term_0 = phi[j] * coefficients.dot(dphi_deta)
# linearised term_1 = u_ * dφ_j/dη where u_ = φ_0 * u0 + φ_1 * u1.
term_1 = u_ * dphi_deta[j]
# local_matrix_(i,j) = ∫ (φ_j * du/dη + u_ * dφ_j/dη) * φ_i dη, η = 0 to 1.
local_matrix[i, j] = integrate((term_0 + term_1) * phi[i], ("eta", 0, 1))
local_matrix_func = lambdify((u0, u1), local_matrix, 'numpy')
Jc[0, 0] = local_matrix_func(u[0], u[1])[0, 0]
Jc[0, 1] = local_matrix_func(u[0], u[1])[0, 1]
Jc[num_nodes - 1, num_nodes - 2] = local_matrix_func(u[num_nodes - 2], u[num_nodes - 1])[1, 0]
Jc[num_nodes - 1, num_nodes - 1] = local_matrix_func(u[num_nodes - 2], u[num_nodes - 1])[1, 1]
for i in range(1, num_nodes - 1):
Jc[i, i - 1] = local_matrix_func(u[i - 1], u[i])[1, 0]
Jc[i, i] = (local_matrix_func(u[i - 1], u[i])[1, 1]
+ local_matrix_func(u[i], u[i + 1])[0, 0])
Jc[i, i + 1] = local_matrix_func(u[i], u[i + 1])[0, 1]
return Jc
def _store_data(self, data, step, storage, write_adj_deps, write_ics):
if storage == StorageType.DISK:
self._store_on_disk(data, step, write_adj_deps)
elif write_adj_deps:
self.adjoint_dependency[storage][step] = copy.deepcopy(data)
elif write_ics:
self.restart_forward[storage][step] = copy.deepcopy(data)
def _store_on_disk(self, data, step, adj_deps):
if adj_deps:
tmp_adj_deps_directory = tempfile.gettempdir()
file_name = os.path.join(tmp_adj_deps_directory, "fwd_" + str(step) + ".npy")
self.adjoint_dependency[StorageType.DISK][step] = file_name
np.save(file_name, data)
else:
tmp_rest_forward_directory = tempfile.gettempdir()
file_name = os.path.join(tmp_rest_forward_directory, "fwd_" + str(step) + ".npy")
self.restart_forward[StorageType.DISK][step] = file_name
np.save(file_name, data)
def _clean_disk(self):
if len(self.adjoint_dependency[StorageType.DISK]) > 0:
for step in self.adjoint_dependency[StorageType.DISK]:
self.adjoint_dependency[StorageType.DISK][step].close()
if len(self.restart_forward[StorageType.DISK]) > 0:
for step in self.restart_forward[StorageType.DISK]:
self.restart_forward[StorageType.DISK][step].close()
The adjoint PDE is then solved in a reverse time order and it depends of the forward data. Storing the entire forward solution uses storage linear linear in the number of time steps, which can exhaust the available storage. To overcome this kind of problem, checkpointing algorithms are used to reduce the memory usage.
To employ a checkpointing method in the adjoint-based sensitivity calculation, we define a CheckpointingManager
to manage the execution of forward and adjoint models. The CheckpointingManager
defines the execute
method, which performs each action specified in a provided checkpoint schedule (_schedule
). Within the execute
method, we have the single-dispatch generic action
function that is overloaded by particular functions. For instance, if the checkpoint_schedule
action is Forward
, the action_forward
function is called, and this can advance the forward calculation. In this particular example, the forward solver is executed by calling self.solver.forward
. Here, self.solver
is an attribute of CheckpointingManager
. Similarly, the adjoint solver is executed by calling self.solver.adjoint
within the action_reverse
function.
import functools, sys
from checkpoint_schedules import Forward, Reverse, Copy, Move, EndForward, EndReverse
class CheckpointingManager:
"""Manage the forward and adjoint solvers.
Attributes
----------
schedule : CheckpointSchedule
The schedule created by `checkpoint_schedules` package.
solver : object
A solver object used to solve the forward and adjoint solvers.
Notes
-----
The `solver` object contains methods to execute the forward and adjoint. In
addition, it contains methods to copy data from one storage to another, and
to set the initial condition for the adjoint.
"""
def __init__(self, schedule, solver):
self.max_n = sys.maxsize
self.solver = solver
self.reverse_step = 0
self._schedule = schedule
def execute(self):
"""Execute forward and adjoint using checkpointing.
"""
@functools.singledispatch
def action(cp_action):
raise TypeError("Unexpected action")
@action.register(Forward)
def action_forward(cp_action):
n1 = cp_action.n1
self.solver.forward(cp_action.n0, n1, storage=cp_action.storage,
write_adj_deps=cp_action.write_adj_deps,
write_ics=cp_action.write_ics)
if n1 >= self.solver.model["max_n"]:
n1 = min(n1, self.solver.model["max_n"])
self._schedule.finalize(n1)
@action.register(Reverse)
def action_reverse(cp_action):
self.solver.adjoint(cp_action.n0, cp_action.n1, cp_action.clear_adj_deps)
self.reverse_step += cp_action.n1 - cp_action.n0
@action.register(Copy)
def action_copy(cp_action):
self.solver.copy_data(cp_action.n, cp_action.from_storage,
cp_action.to_storage, move=False)
@action.register(Move)
def action_move(cp_action):
self.solver.copy_data(cp_action.n, cp_action.from_storage,
cp_action.to_storage, move=True)
@action.register(EndForward)
def action_end_forward(cp_action):
if self._schedule.max_n is None:
self._schedule._max_n = self.max_n
@action.register(EndReverse)
def action_end_reverse(cp_action):
self.solver._clean_disk()
if self._schedule.max_n != self.reverse_step:
raise ValueError("The number of steps in the reverse phase"
"is different from the number of steps in the"
"forward phase.")
self.reverse_step = 0
for _, cp_action in enumerate(self._schedule):
action(cp_action)
if isinstance(cp_action, EndReverse):
break
The purpose of this adjoint-based sensitivity computation is to use different checkpointing approaches available in the checkpoint_schedules
package and verify the consistency of the results. We employ a fundamental tool used in verification of gradients, which is the
second order Taylor remainder convergence test. Thus, let us consider the functional $I(u_0)$ and let $\nabla_{u_0}$ be its gradient with respect to the control parameter $u_0$. Let $u$ be the solution of the forward problem, and let $\delta u$ be a perturbation to $u$. Then the Taylor remainder convergence test is based on the expression:
$$ \left|I(u + \epsilon \delta u) - I(u) - \nabla_{u_0} I \cdot \epsilon \delta u\right| \rightarrow 0 \quad \mathrm{at} \ O(\epsilon^2).$$
In the next sections, we present the results of the adjoint-based sensitivity computation using the checkpoint_schedules
package and the Taylor remainder convergence test. It is expected the convergence rate of the Taylor remainder convergence test is approximately $2$ in every checkpointing approach.
Below, we define the model
dictionary containing the parameters required for the forward and adjoint solvers. The model
dictionary is then passed to the BurgersSolver
class. Additionally, we set up the 1D mesh and the initial condition for the forward Burgers' solver.
model = {"lx": 1, # domain length
"nx": 80, # number of nodes
"dt": 0.001, # time step
"nu": 0.01, # viscosity
"max_n": 200, # total steps
"chk_ram": 50, # number of checkpoints in RAM
"chk_disk": 50, # number of checkpoints on disk
}
mesh = np.linspace(0, model["lx"], model["nx"]) # create the spatial grid
u0 = np.sin(np.pi*mesh) # initial condition
def taylor_remainder_test(gradient, J, u0, h):
epsilons = [0.001 / 2**i for i in range(4)]
E = []
h0 = model["lx"] / (model["nx"] - 1)
dJdm = np.dot(gradient, h)
for eps in epsilons:
up = u0 + eps*h
burgers = BurgersSolver(model, up, mesh)
burgers.forward(0, model["max_n"])
Jp = burgers.functional()
E.append(abs(Jp - J - dJdm * eps))
print("Computed residuals: {}".format(E))
return E, epsilons
def convergence_rates(E_values, eps_values, show=True):
r = []
for i in range(1, len(eps_values)):
r.append(np.log(E_values[i] / E_values[i - 1])
/ np.log(eps_values[i] / eps_values[i - 1]))
if show:
print("Computed convergence rates: {}".format(r))
return r
We initially compute the adjoint-based gradient using SingleMemoryStorageSchedule
checkpointing approach. The SingleMemoryStorageSchedule
stores the forward data of each time-step in working memory. As explained in the notebook with illustrative example, this schedule does not require the maximal step (model["max_n"]
).
from checkpoint_schedules import SingleMemoryStorageSchedule
schedule = SingleMemoryStorageSchedule()
burger = BurgersSolver(model, u0, mesh)
manager = CheckpointingManager(schedule, burger) # Create the checkpointing manager.
manager.execute() # execute the checkpointing schedule using `SingleMemoryStorageSchedule` schedule.
delta_u = np.ones(model["nx"])
E, epsilon = taylor_remainder_test(burger.gradient(), burger.functional(), u0, delta_u)
convergence_rates(E, epsilon)
Computed residuals: [3.9314210254935353e-07] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08, 2.4575142825995593e-08] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08, 2.4575142825995593e-08, 6.143942141015931e-09] Computed convergence rates: [1.9998527430492066, 1.9999264243585173, 1.9999632662271236]
[1.9998527430492066, 1.9999264243585173, 1.9999632662271236]
The following examples use Revolve
[2] and HRevolve
schedules [3]. The Revolve
algorithm requires the definition of the maximal step model["max_n"]
before the execution of the forward solver, and also the specification of the number of checkpoints stored in memory (model["max_n"]
). Whereas Revolve
allows only the storage in memory, the HRevolve
algorithm allows both disk and memory storage. The values model["max_n"]
, model["chk_ram"]
and model["chk_dins"]
are required to define the HRevolve
schedule.
from checkpoint_schedules import Revolve
burger = BurgersSolver(model, u0, mesh)
schedule = Revolve(model["max_n"], model["chk_ram"])
manager = CheckpointingManager(schedule, burger)
manager.execute()
E, epsilon = taylor_remainder_test(burger.gradient(), burger.functional(), u0, delta_u)
convergence_rates(E, epsilon)
Computed residuals: [3.9314210254935353e-07] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08, 2.4575142825995593e-08] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08, 2.4575142825995593e-08, 6.143942141015931e-09] Computed convergence rates: [1.9998527430492066, 1.9999264243585173, 1.9999632662271236]
[1.9998527430492066, 1.9999264243585173, 1.9999632662271236]
from checkpoint_schedules import HRevolve
burger = BurgersSolver(model, u0, mesh)
schedule = HRevolve(model["max_n"], model["chk_ram"], model["chk_disk"])
manager = CheckpointingManager(schedule, burger)
manager.execute()
E, epsilon = taylor_remainder_test(burger.gradient(), burger.functional(), u0, delta_u)
convergence_rates(E, epsilon)
Computed residuals: [3.9314210254935353e-07] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08, 2.4575142825995593e-08] Computed residuals: [3.9314210254935353e-07, 9.829555822570522e-08, 2.4575142825995593e-08, 6.143942141015931e-09] Computed convergence rates: [1.9998527430492066, 1.9999264243585173, 1.9999632662271236]
[1.9998527430492066, 1.9999264243585173, 1.9999632662271236]
Feel free to explore alternative schedules provided by the checkpoint_schedules
package. Our notebook with illustrative example offers a demonstration of their usage. You need follow the steps outlined in the above code: instantiate a BurgersSolver
object, define the schedule
, create a CheckpointingManager
object, and then execute the solvers using the execute
method.