#!/usr/bin/env python # coding: utf-8 # # *checkpoint_schedule* application: Adjoint-Based Gradient with the Burger's Equation # # This example shows adjoint-based gradient computation using the *checkpointing_schedules* package. We initially define the adjoint-based gradient problem and then present the forward and adjoint solvers prescribed by the *checkpointing_schedules* package. # # ## Defining the application # # Let us consider a one-dimensional (1D) problem aiming to compute the gradient/sensitivity of the kinetic energy at a time $t=\tau$ with respect to an initial condition $u_0$. Then, our functional can be expressed as: # $$ # I(u(x, \tau, u_0)) = \int_\Omega \frac{1}{2} u(x, \tau, u_0)u(x, \tau, u_0) \, dx. # \tag{1} # $$ # # In the current case, the velocity variable $u = u(x, t)$ is governed by the 1D viscous Burgers equation, a non-linear equation for the advection and diffusion of momentum: # # $$ # \frac{\partial u}{\partial t} + u \frac{\partial u}{\partial x} - \nu \frac{\partial^2 u}{\partial x^2} = 0. # \tag{2} # $$ # # Here, $x \in [0, L]$ is the space variable, and $t \in \mathbb{R}^{+}$ represents the time variable. The boundary condition is $u(0, t) = u(L, t) = 0$, where $L$ is the length of the 1D domain. The initial condition is given by $u_0 = \sin(\pi x)$. # # The control parameter is the initial condition $u_0$. Hence, the objective is to compute the adjoint-based gradient of the functional $I(u_0)$ with respect to $u_0$. # # This example uses the discrete adjoint formulation, meaning the adjoint system is derived after discretization of the forward problem. # ## Adjoint-based gradient # We seek to compute the sensitivity/gradient $\mathrm{d}\widehat{I}/\mathrm{d}u_0$, where $\widehat{I} = I(u_0)$ is the reduced functional. On applying the chain rule, we have the below expression on considering the functional $I$ as in Eq. (1). # $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d}u_0} = \frac{\partial I}{\partial u} \frac{\mathrm{d}u}{\mathrm{d}u_0}. \tag{3}$$ # Computing $\partial J/\partial u$ is straightforward since the functional (Eq. (1)) is written in terms of the solution variable $u$. In contrast, the Jacobian $\mathrm{d}u/\mathrm{d}u_0$ involves an expensive computational operation that is avoided by using the adjoint method. # # The strategy employed for the adjoint derivation is based on [1] (section 2.3.3), in which the relation between the functional $I$ and the forward equation $F(u, u_0)$ is reached by taking the total derivative of $F(u, u_0)$ with respect to $u_0$, yielding in a relationship that provides the Jacobian $\mathrm{d}u/\mathrm{d}u_0$, as shown below. # \begin{align*} # \frac{\mathrm{d}}{\mathrm{d}u_0} F(u, u_0) &= \frac{\partial F(u, u_0)}{\partial u} \frac{\mathrm{d}u}{\mathrm{d}u_0} + \frac{\partial F(u, u_0)}{\partial u_0} = 0 \\ # \\ # \implies \frac{\mathrm{d}u}{\mathrm{d}u_0} &= - \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{-1} \frac{\partial F(u, u_0)}{\partial u_0}. # \end{align*} # Substituting $\mathrm{d}u/\mathrm{d}u_0$ into Eq. (3), we have: # $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d}u_0} = - \frac{\partial I}{\partial u} \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{-1} \frac{\partial F(u, u_0)}{\partial u_0}.$$ # # Taking the adjoint (Hermitian transpose) of the above equation: # $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d}u_0}^* = - \frac{\partial F}{\partial u_0}^* \frac{\partial F}{\partial u}^{-*} \frac{\partial I}{\partial u}^{*}$$ # # Next, we define: # $$\lambda = \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{-*} \frac{\partial I}{\partial u}^* # \implies \left(\frac{\partial F(u, u_0)}{\partial u}\right)^{*} \lambda = \frac{\partial I}{\partial u}^*,$$ # which is the adjoint system. Finally, we obtain the following expression for the adjoint-based gradient computation: # $$\frac{\mathrm{d}\widehat{I}}{\mathrm{d} u_0}^* = - \frac{\partial F}{\partial u_0}^* \lambda.$$ # In this current case, Burger's equation is written in weak form and discretized in time with the backward finite difference method, we have: # $$ F(u^{n+1}, u^n, v) = \int_{\Omega} u^{n + 1} \cdot v - u^{n} \cdot v + \Delta t \left(u^{n + 1} \frac{\partial u^{n + 1}}{\partial x} \cdot v + \nu \frac{\partial u^{n + 1}}{\partial x} \cdot \frac{\partial v}{\partial x}\right) \, dx = 0 \quad \forall v\in V \tag{4},$$ # where $v \in V$ is an arbitrary test function. # # The time sequence of the adjoint system for the functional (Eq. (1)) is given by: # $$ # \begin{align*} # \lambda^{N+1} &= \frac{\partial I}{\partial u^{N + 1}}^{\ast}\\ # \\ # \frac{\partial F}{\partial u^{N + 1}}^* \lambda^{N} &= \lambda^{N+1} # \\ # \frac{\partial F}{\partial u^{N}}^* \lambda^{N - 1} &= \lambda^{N} # \\ # \vdots \\ # \frac{\partial F}{\partial u^{1}}^* \lambda^{0} &= \lambda^{1} # \end{align*} # \tag{5} # $$ # where $\lambda^{n}$ is the adjoint variable at time step $n$. # ## Burger's equation solver # We define a `BurgersSolver` class to compute the `forward` and `adjoint` solutions, and to handle storing of forward and adjoint data. # # The Burger's equation is discretised using the Finite Element Method (FEM). We use continuous first-order Lagrange basis functions to discretise the spatial domain. `BurgersSolver` uses the SymPy Python library to assemble the matrices from the weak form of the Burger's equation. # In[1]: import numpy as np import os import tempfile from scipy.sparse.linalg import spsolve from scipy.sparse import lil_matrix import copy from sympy import diff, integrate, symbols, lambdify from sympy.matrices import SparseMatrix, Matrix from checkpoint_schedules import StorageType class BurgersSolver: """A solver for the forward and adjoint one dimensional Burger's equation. Parameters ---------- model : dict The model parameters containing the essential information to solve the Burger's equation. init_condition : array The initial condition used to solve the forward Burger's equation. mesh : array The spatial mesh. """ def __init__(self, model, forward_initial_condition, mesh): self.model = dict(model) self.mesh = mesh self.forward_work_memory = {StorageType.WORK: {}} self.forward_work_memory[StorageType.WORK][0] = copy.deepcopy(forward_initial_condition) self.forward_final_solution = None self.initial_condition = copy.deepcopy(forward_initial_condition) self.adjoint_work_memory = {StorageType.WORK: {}} self.restart_forward = {StorageType.RAM: {}, StorageType.DISK: {}} self.adjoint_dependency = {StorageType.WORK: {}, StorageType.RAM: {}, StorageType.DISK: {}} self.mode = "forward" self._initialize_matrices() def forward(self, n0, n1, storage=None, write_adj_deps=False, write_ics=False): """Advance the forward solver. Parameters ---------- n0 : int Initial time step. n1 : int Final time step. storage : StorageType, optional The storage type, which can be StorageType.RAM, StorageType.DISK, StorageType.WORK, or StorageType.NONE. write_adj_deps : bool, optional Whether the adjoint dependency data will be stored. write_ics : bool, optional Whether the forward restart data will be stored. """ # Get the initial condition u = self.forward_work_memory[StorageType.WORK].pop(n0) if write_ics: self._store_data(u, n0, storage, write_adj_deps, write_ics) for step in range(n0, min(n1, self.model["max_n"])): if write_adj_deps: self._store_data(u, step, storage, write_adj_deps, write_ics) def _residual(u_new): # C is a vector containing the nonlinear term. C = np.array(self._C(*u_new)) residual = (self._M + self.model["dt"] * self._K).dot(u_new) \ + self.model["dt"] * C - self._M.dot(u) residual[0] = u_new[0] residual[-1] = u_new[-1] return residual u_new = self._solve_newton(u, _residual) u = copy.deepcopy(u_new) step += 1 if step == self.model["max_n"]: self.forward_final_solution = copy.deepcopy(u_new) if write_adj_deps: self.adjoint_dependency[StorageType.WORK][step] = copy.deepcopy(u_new) if (not write_adj_deps or (self.mode == "forward" and step < (self.model["max_n"])) ): self.forward_work_memory[StorageType.WORK][step] = copy.deepcopy(u_new) def adjoint(self, n0, n1, clear_adj_deps): """Advance the adjoint solver. Parameters --------- n0 : int Initial time step. n1 : int Final time step. clear_adj_deps : bool If `True`, the adjoint dependency data will be cleared. """ self.mode = "adjoint" if n1 == self.model["max_n"]: self._initialize_adjoint() u_adj = self.adjoint_work_memory[StorageType.WORK].pop(n1) M = copy.deepcopy(self._M) M[0, 1] = M[-1, -2] = 0 M[0, 0] = M[-1, -1] = 1 for step in range(n1, n0, - 1): J_T = self._jacobian(self.adjoint_dependency[StorageType.WORK][step]).T u_adj_coef = spsolve(J_T, u_adj) u_adj_coef[0] = u_adj_coef[-1] = 0 u_adj_new = M.T.dot(u_adj_coef) u_adj = copy.deepcopy(u_adj_new) if clear_adj_deps: del self.adjoint_dependency[StorageType.WORK][step] step -= 1 self.adjoint_work_memory[StorageType.WORK][step] = copy.deepcopy(u_adj_new) def gradient(self): """Compute the adjoint-based gradient. Returns ------- array The adjoint-based gradient. """ return self.adjoint_work_memory[StorageType.WORK][0] def functional(self): """Compute the functional. """ h = self.model["lx"] / (self.model["nx"] - 1) return 0.5 * np.sum(self.forward_final_solution ** 2) * h def copy_data(self, step, from_storage, to_storage, move=False): """Copy data from one storage to another. Parameters ---------- step : int The time step. from_storage : StorageType The storage type from which the data will be copied. to_storage : StorageType The storage type to which the data will be copied. move : bool, optional Whether the data will be moved or not. If `True`, the data will be removed from the `from_storage`. """ if from_storage == StorageType.DISK: if step in self.adjoint_dependency[StorageType.DISK]: file_name = self.adjoint_dependency[StorageType.DISK][step] with open(file_name, "rb") as f: self.adjoint_dependency[to_storage][step] = np.load(f) if step in self.restart_forward[StorageType.DISK]: file_name = self.restart_forward[StorageType.DISK][step] with open(file_name, "rb") as f: self.forward_work_memory[to_storage][step] = np.load(f) if move: os.remove(file_name) elif from_storage == StorageType.RAM: self.forward_work_memory[to_storage][step] = \ copy.deepcopy(self.restart_forward[from_storage][step]) if move: if step in self.adjoint_dependency[from_storage]: del self.adjoint_dependency[from_storage][step] if step in self.restart_forward[from_storage]: del self.restart_forward[from_storage][step] else: raise ValueError("This `StorageType` is not supported.") def _initialize_adjoint(self): self.adjoint_work_memory[StorageType.WORK][self.model["max_n"]] = self._functional_derivative() def _functional_derivative(self): phi = self._basis_function() # v = φ_0 + φ_1. v = phi[0] + phi[1] # Test function. h = self.model["lx"] / (self.model["nx"] - 1) dI_du = np.zeros(self.model["nx"]) for i in range(1, self.model["nx"] - 1): # dI_du = h * ∫ u * v d η, η = 0 to 1. dI_du[i] = h * integrate(self.forward_final_solution[i] * v, ("eta", 0, 1)) return dI_du def _initialize_matrices(self): # Initialize the mass and stiffness matrices. self._M = self._mass_matrix() self._K = self._stiffness_matrix() u = symbols("u_:{0}".format(len(self.initial_condition))) # Initialize the advection matrix. self._C = lambdify(u, self._advection_matrix_action(u)[:], 'numpy') # Initialize the Jacobian of the advection matrix. self._J_C = lambdify(u, self._jacobian_advection(u), 'numpy') def _jacobian(self, u): C_J = lil_matrix(self._J_C(*u)) # The Jacobian of the advection matrix. Jac = self._M + self.model["dt"] * (self._K + C_J) Jac[0, 1] = Jac[-1, -2] = 0 Jac[0, 0] = Jac[-1, -1] = 1 return Jac def _solve_newton(self, u_prev, residual, tol=1e-8, max_iter=50): i = 0 u = copy.deepcopy(u_prev) u[0] = u[-1] = 0 while i < max_iter: Jac = self._jacobian(u) delta_u = spsolve(Jac, residual(u)) u_new = u - delta_u u = copy.deepcopy(u_new) i += 1 if np.linalg.norm(delta_u) < tol: break if i == max_iter: print("Newton's method did not converge in {} iterations.".format(max_iter)) return u_new def _basis_function(self): eta = symbols("eta") # φ_0 = 1 - η, φ_1 = η. # φ_0 and φ_1 are the basis functions. # η = (x - x0) / h, where h is the mesh spacing. return [1 - eta, eta] def _mass_matrix(self): h = self.model["lx"] / (self.model["nx"] - 1) phi = self._basis_function() M_local = np.zeros((2, 2)) for i in range(2): for j in range(2): # M_(i,j) = h * ∫ φ_i * φ_j dη, η = 0 to 1. M_local[i, j] = h * integrate(phi[i] * phi[j], ("eta", 0, 1)) return self._assemble_global_matrix(M_local) def _assemble_global_matrix(self, local_matrix): num_nodes = self.model["nx"] global_matrix = lil_matrix((num_nodes, num_nodes)) global_matrix[0, 0] = local_matrix[0, 0] global_matrix[num_nodes - 1, num_nodes - 1] = local_matrix[1, 1] global_matrix[0, 1] = local_matrix[0, 1] global_matrix[num_nodes - 1, num_nodes - 2] = local_matrix[1, 0] for i in range(1, num_nodes - 1): global_matrix[i, i - 1] = local_matrix[1, 0] global_matrix[i, i + 1] = local_matrix[0, 1] global_matrix[i, i] = local_matrix[1, 1] + local_matrix[0, 0] return global_matrix def _stiffness_matrix(self): # 1D mesh is uniform. Thus, the mesh spacing is constant. h = self.model["lx"] / (self.model["nx"] - 1) b = self.model["nu"] / h phi = self._basis_function() dphi_deta = [diff(phi[i], "eta") for i in range(2)] K_local = np.zeros((2, 2)) for i in range(2): for j in range(2): # K_(i,j) = μ/h * ∫ dφ_i/dη * dφ_j/dη dη, η = 0 to 1. K_local[i, j] = b * integrate(dphi_deta[i] * dphi_deta[j], ("eta", 0, 1)) return self._assemble_global_matrix(K_local) def _advection_matrix_action(self, u): num_nodes = self.model["nx"] # u0 and u1 are the coefficients of the basis functions at the nodes. u0, u1 = symbols("u0 u1") coefficients = Matrix([u0, u1]) phi = self._basis_function() u_ = phi[0] * u0 + phi[1] * u1 dphi_deta = [diff(phi[i], "eta") for i in range(2)] # nonlinear_term = u * du/dη, where u = φ_0 * u0 + φ_1 * u1 # and du/dη = dφ_0/dη * u0 + dφ_1/dη * u1. nonlinear_term = u_ * coefficients.dot(dphi_deta) # local_vector_i = ∫ (u * du/dη) * φ_i dη, η = 0 to 1. local_vector = Matrix([integrate(nonlinear_term * phi[i], ("eta", 0, 1)) for i in range(2)]) local_vector_func = lambdify((u0, u1), local_vector, 'numpy') C = Matrix.zeros(1, num_nodes) C[0] = local_vector_func(u[0], u[1])[0] C[num_nodes - 1] = local_vector_func(u[num_nodes - 2], u[num_nodes - 1])[1] for i in range(1, num_nodes - 1): C[i] = local_vector_func(u[i], u[i + 1])[0] + local_vector_func(u[i - 1], u[i])[1] return C def _jacobian_advection(self, u): num_nodes = self.model["nx"] # u0 and u1 are the coefficients of the basis functions at the nodes. u0, u1 = symbols("u0 u1") phi = self._basis_function() coefficients = Matrix([u0, u1]) dphi_deta = [diff(phi[i], "eta") for i in range(2)] # u_ = φ_0 * u0 + φ_1 * u1. u_ = phi[0] * u0 + phi[1] * u1 Jc = SparseMatrix.zeros(num_nodes, num_nodes) local_matrix = Matrix.zeros(2, 2) for i in range(2): for j in range(2): # linearised term_0 = φ_j * du/dη, where du/dη = dφ_0/dη * u0 + dφ_1/dη * u1. term_0 = phi[j] * coefficients.dot(dphi_deta) # linearised term_1 = u_ * dφ_j/dη where u_ = φ_0 * u0 + φ_1 * u1. term_1 = u_ * dphi_deta[j] # local_matrix_(i,j) = ∫ (φ_j * du/dη + u_ * dφ_j/dη) * φ_i dη, η = 0 to 1. local_matrix[i, j] = integrate((term_0 + term_1) * phi[i], ("eta", 0, 1)) local_matrix_func = lambdify((u0, u1), local_matrix, 'numpy') Jc[0, 0] = local_matrix_func(u[0], u[1])[0, 0] Jc[0, 1] = local_matrix_func(u[0], u[1])[0, 1] Jc[num_nodes - 1, num_nodes - 2] = local_matrix_func(u[num_nodes - 2], u[num_nodes - 1])[1, 0] Jc[num_nodes - 1, num_nodes - 1] = local_matrix_func(u[num_nodes - 2], u[num_nodes - 1])[1, 1] for i in range(1, num_nodes - 1): Jc[i, i - 1] = local_matrix_func(u[i - 1], u[i])[1, 0] Jc[i, i] = (local_matrix_func(u[i - 1], u[i])[1, 1] + local_matrix_func(u[i], u[i + 1])[0, 0]) Jc[i, i + 1] = local_matrix_func(u[i], u[i + 1])[0, 1] return Jc def _store_data(self, data, step, storage, write_adj_deps, write_ics): if storage == StorageType.DISK: self._store_on_disk(data, step, write_adj_deps) elif write_adj_deps: self.adjoint_dependency[storage][step] = copy.deepcopy(data) elif write_ics: self.restart_forward[storage][step] = copy.deepcopy(data) def _store_on_disk(self, data, step, adj_deps): if adj_deps: tmp_adj_deps_directory = tempfile.gettempdir() file_name = os.path.join(tmp_adj_deps_directory, "fwd_" + str(step) + ".npy") self.adjoint_dependency[StorageType.DISK][step] = file_name np.save(file_name, data) else: tmp_rest_forward_directory = tempfile.gettempdir() file_name = os.path.join(tmp_rest_forward_directory, "fwd_" + str(step) + ".npy") self.restart_forward[StorageType.DISK][step] = file_name np.save(file_name, data) def _clean_disk(self): if len(self.adjoint_dependency[StorageType.DISK]) > 0: for step in self.adjoint_dependency[StorageType.DISK]: self.adjoint_dependency[StorageType.DISK][step].close() if len(self.restart_forward[StorageType.DISK]) > 0: for step in self.restart_forward[StorageType.DISK]: self.restart_forward[StorageType.DISK][step].close() # ## Checkpointing # The adjoint PDE is then solved in a reverse time order and it depends of the forward data. Storing the entire forward solution uses storage linear linear in the number of time steps, which can exhaust the available storage. To overcome this kind of problem, checkpointing algorithms are used to reduce the memory usage. # # To employ a checkpointing method in the adjoint-based sensitivity calculation, we define a `CheckpointingManager` to manage the execution of forward and adjoint models. The `CheckpointingManager` defines the `execute` method, which performs each action specified in a provided checkpoint schedule (`_schedule`). Within the `execute` method, we have the single-dispatch generic `action` function that is overloaded by particular functions. For instance, if the `checkpoint_schedule` action is `Forward`, the `action_forward` function is called, and this can advance the forward calculation. In this particular example, the forward solver is executed by calling `self.solver.forward`. Here, `self.solver` is an attribute of `CheckpointingManager`. Similarly, the adjoint solver is executed by calling `self.solver.adjoint` within the `action_reverse` function. # # In[2]: import functools, sys from checkpoint_schedules import Forward, Reverse, Copy, Move, EndForward, EndReverse class CheckpointingManager: """Manage the forward and adjoint solvers. Attributes ---------- schedule : CheckpointSchedule The schedule created by `checkpoint_schedules` package. solver : object A solver object used to solve the forward and adjoint solvers. Notes ----- The `solver` object contains methods to execute the forward and adjoint. In addition, it contains methods to copy data from one storage to another, and to set the initial condition for the adjoint. """ def __init__(self, schedule, solver): self.max_n = sys.maxsize self.solver = solver self.reverse_step = 0 self._schedule = schedule def execute(self): """Execute forward and adjoint using checkpointing. """ @functools.singledispatch def action(cp_action): raise TypeError("Unexpected action") @action.register(Forward) def action_forward(cp_action): n1 = cp_action.n1 self.solver.forward(cp_action.n0, n1, storage=cp_action.storage, write_adj_deps=cp_action.write_adj_deps, write_ics=cp_action.write_ics) if n1 >= self.solver.model["max_n"]: n1 = min(n1, self.solver.model["max_n"]) self._schedule.finalize(n1) @action.register(Reverse) def action_reverse(cp_action): self.solver.adjoint(cp_action.n0, cp_action.n1, cp_action.clear_adj_deps) self.reverse_step += cp_action.n1 - cp_action.n0 @action.register(Copy) def action_copy(cp_action): self.solver.copy_data(cp_action.n, cp_action.from_storage, cp_action.to_storage, move=False) @action.register(Move) def action_move(cp_action): self.solver.copy_data(cp_action.n, cp_action.from_storage, cp_action.to_storage, move=True) @action.register(EndForward) def action_end_forward(cp_action): if self._schedule.max_n is None: self._schedule._max_n = self.max_n @action.register(EndReverse) def action_end_reverse(cp_action): self.solver._clean_disk() if self._schedule.max_n != self.reverse_step: raise ValueError("The number of steps in the reverse phase" "is different from the number of steps in the" "forward phase.") self.reverse_step = 0 for _, cp_action in enumerate(self._schedule): action(cp_action) if isinstance(cp_action, EndReverse): break # ## Adjoint-based sensitivity computations # # The purpose of this adjoint-based sensitivity computation is to use different checkpointing approaches available in the `checkpoint_schedules` package and verify the consistency of the results. We employ a fundamental tool used in verification of gradients, which is the # *second order Taylor remainder convergence test*. Thus, let us consider the functional $I(u_0)$ and let $\nabla_{u_0}$ be its gradient with respect to the control parameter $u_0$. Let $u$ be the solution of the forward problem, and let $\delta u$ be a perturbation to $u$. Then the *Taylor remainder convergence test* is based on the expression: # $$ \left|I(u + \epsilon \delta u) - I(u) - \nabla_{u_0} I \cdot \epsilon \delta u\right| \rightarrow 0 \quad \mathrm{at} \ O(\epsilon^2).$$ # # In the next sections, we present the results of the adjoint-based sensitivity computation using the `checkpoint_schedules` package and the *Taylor remainder convergence test*. It is expected the convergence rate of the *Taylor remainder convergence test* is approximately $2$ in every checkpointing approach. # # Below, we define the `model` dictionary containing the parameters required for the forward and adjoint solvers. The `model` dictionary is then passed to the `BurgersSolver` class. Additionally, we set up the 1D mesh and the initial condition for the forward Burgers' solver. # In[3]: model = {"lx": 1, # domain length "nx": 80, # number of nodes "dt": 0.001, # time step "nu": 0.01, # viscosity "max_n": 200, # total steps "chk_ram": 50, # number of checkpoints in RAM "chk_disk": 50, # number of checkpoints on disk } mesh = np.linspace(0, model["lx"], model["nx"]) # create the spatial grid u0 = np.sin(np.pi*mesh) # initial condition # In[4]: def taylor_remainder_test(gradient, J, u0, h): epsilons = [0.001 / 2**i for i in range(4)] E = [] h0 = model["lx"] / (model["nx"] - 1) dJdm = np.dot(gradient, h) for eps in epsilons: up = u0 + eps*h burgers = BurgersSolver(model, up, mesh) burgers.forward(0, model["max_n"]) Jp = burgers.functional() E.append(abs(Jp - J - dJdm * eps)) print("Computed residuals: {}".format(E)) return E, epsilons def convergence_rates(E_values, eps_values, show=True): r = [] for i in range(1, len(eps_values)): r.append(np.log(E_values[i] / E_values[i - 1]) / np.log(eps_values[i] / eps_values[i - 1])) if show: print("Computed convergence rates: {}".format(r)) return r # We initially compute the adjoint-based gradient using `SingleMemoryStorageSchedule` checkpointing approach. The `SingleMemoryStorageSchedule` stores the forward data of each time-step in working memory. As explained in the [notebook with illustrative example](https://nbviewer.org/github/firedrakeproject/checkpoint_schedules/blob/main/docs/notebooks/tutorial.ipynb), this schedule does not require the maximal step (`model["max_n"]`). # In[5]: from checkpoint_schedules import SingleMemoryStorageSchedule schedule = SingleMemoryStorageSchedule() burger = BurgersSolver(model, u0, mesh) manager = CheckpointingManager(schedule, burger) # Create the checkpointing manager. manager.execute() # execute the checkpointing schedule using `SingleMemoryStorageSchedule` schedule. delta_u = np.ones(model["nx"]) E, epsilon = taylor_remainder_test(burger.gradient(), burger.functional(), u0, delta_u) convergence_rates(E, epsilon) # The following examples use `Revolve` [2] and `HRevolve` schedules [3]. The `Revolve` algorithm requires the definition of the maximal step `model["max_n"]` before the execution of the forward solver, and also the specification of the number of checkpoints stored in memory (`model["max_n"]`). Whereas `Revolve` allows only the storage in memory, the `HRevolve` algorithm allows both disk and memory storage. The values `model["max_n"]`, `model["chk_ram"]` and `model["chk_dins"]` are required to define the `HRevolve` schedule. # In[6]: from checkpoint_schedules import Revolve burger = BurgersSolver(model, u0, mesh) schedule = Revolve(model["max_n"], model["chk_ram"]) manager = CheckpointingManager(schedule, burger) manager.execute() E, epsilon = taylor_remainder_test(burger.gradient(), burger.functional(), u0, delta_u) convergence_rates(E, epsilon) # In[7]: from checkpoint_schedules import HRevolve burger = BurgersSolver(model, u0, mesh) schedule = HRevolve(model["max_n"], model["chk_ram"], model["chk_disk"]) manager = CheckpointingManager(schedule, burger) manager.execute() E, epsilon = taylor_remainder_test(burger.gradient(), burger.functional(), u0, delta_u) convergence_rates(E, epsilon) # Feel free to explore alternative schedules provided by the `checkpoint_schedules` package. Our [notebook with illustrative example](https://nbviewer.org/github/firedrakeproject/checkpoint_schedules/blob/main/docs/notebooks/tutorial.ipynb) offers a demonstration of their usage. You need follow the steps outlined in the above code: instantiate a `BurgersSolver` object, define the `schedule`, create a `CheckpointingManager` object, and then execute the solvers using the `execute` method. # ### References # # 1. Gunzburger, Max D. Perspectives in flow control and optimization. Society for Industrial and Applied Mathematics, 2002. doi: https://doi.org/10.1137/1.9780898718720 # 2. Griewank, A., & Walther, A. (2000). Algorithm 799: revolve: an implementation of checkpointing for the reverse or adjoint mode of computational differentiation. ACM Transactions on Mathematical Software (TOMS), 26(1), 19-45., doi: https://doi.org/10.1145/347837.347846. # 3. Herrmann, J. and Pallez (Aupy), G. (2020). H-Revolve: a framework for adjoint computation on synchronous hierarchical platforms. ACM Transactions on Mathematical Software (TOMS), 46(2), 1-25. DOI: https://doi.org/10.1145/3378672. #