This notebook contains examples which are expected to be run with exactly 4 MPI processes; not because they wouldn't work otherwise, but simply because it's what their description assumes. For this, you need to:
mpi4py
and ipyparallel
; from the root Devito directory, runpip install -r requirements-mpi.txt
ipyparallel
MPI profile, by running our simple setup script. From the root directory, run./scripts/create_ipyparallel_mpi_profile.sh
We're finally ready to launch an ipyparallel cluster. Open a new terminal and run the following command
ipcluster start -n 4 --profile=mpi --engines=mpi
Wait until the logs report that engines have started successfully:
2022-05-20 11:57:31.730 [IPClusterStart] Starting ipcluster with [daemonize=False]
2022-05-20 11:57:32.754 [IPClusterStart] Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
2022-05-20 11:58:02.785 [IPClusterStart] Engines appear to have started successfully
Once the engines have started successfully, we can connect to the cluster
import ipyparallel as ipp
c = ipp.Client(profile='mpi')
In this tutorial, to run commands in parallel over the engines, we will use the %px line magic.
%%px --no-stream --group-outputs=engine
from mpi4py import MPI
print(f"Hi, I'm rank %d." % MPI.COMM_WORLD.rank)
[stdout:0] Hi, I'm rank 0. [stdout:1] Hi, I'm rank 1. [stdout:2] Hi, I'm rank 2. [stdout:3] Hi, I'm rank 3.
Distributed-memory parallelism via MPI is designed so that users can "think sequentially" for as much as possible. The few things requested to the user are:
mpirun -np X python ...
To enable MPI, users have two options. Either export the environment variable DEVITO_MPI=1
or, programmatically:
%%px
from devito import configuration
configuration['mpi'] = True
# Feel free to change the log level, and see more detailed logging
configuration['log-level'] = 'INFO'
%%px
# Keep generated code as simple as possible
configuration['language'] = 'C'
# Fix platform so that this notebook can have asserted output
# when tested by ``py.test --nbval" in any platform
configuration['platform'] = 'knl7210'
An Operator
will then generate MPI code, including sends/receives for halo exchanges. Below, we introduce a running example through which we explain how domain decomposition as well as data access (read/write) and distribution work. Performance optimizations are discussed in a later section.
Let's start by creating a TimeFunction
.
%%px
from devito import Grid, TimeFunction, Eq, Operator
grid = Grid(shape=(4, 4))
u = TimeFunction(name="u", grid=grid, space_order=2)
Domain decomposition is performed when creating a Grid
. Users may supply their own domain decomposition, but this is not shown in this notebook. Devito exploits the MPI Cartesian topology abstraction to logically split the Grid
over the available MPI processes. Since u
is defined over a decomposed Grid
, its data get distributed too.
%%px --no-stream --group-outputs=engine
print(u.data[0])
[stdout:0] [[0. 0.] [0. 0.]] [stdout:1] [[0. 0.] [0. 0.]] [stdout:2] [[0. 0.] [0. 0.]] [stdout:3] [[0. 0.] [0. 0.]]
Globally, u
consists of 2 time-buffer slices of 4x4 points -- this is what users "see". But locally, as shown above, each rank has got a 2x2 subdomain per time slice. The key point is: for the user, the fact that u.data
is distributed is completely abstracted away -- the perception is that of indexing into a classic NumPy array, regardless of whether MPI is enabled or not. All sort of NumPy indexing schemes (basic, slicing, etc.) are supported. For example, we can write into a slice-generated view of our data.
%%px
u.data[0, 1:-1, 1:-1] = 1.
%%px --no-stream --group-outputs=engine
print(u.data[0])
[stdout:0] [[0. 0.] [0. 1.]] [stdout:1] [[0. 0.] [1. 0.]] [stdout:2] [[0. 1.] [0. 0.]] [stdout:3] [[1. 0.] [0. 0.]]
The only limitation, currently, is that a data access cannot require a direct data exchange among two or more processes (e.g., the assignment u.data[0, 0, 0] = u.data[0, 3, 3]
will raise an exception unless both entries belong to the same MPI rank).
We can finally write out a trivial Operator
to try running something.
%%px --no-stream --group-outputs=engine
#NBVAL_IGNORE_OUTPUT
op = Operator(Eq(u.forward, u + 1))
summary = op.apply(time_M=0)
[stderr:0] Operator `Kernel` ran in 0.01 s
And we can now check again the (distributed) content of our u.data
, at u.data[1] since the computation ran for one timestep (writing to the next time-buffer slice).
%%px --no-stream --group-outputs=engine
print(u.data[1])
[stdout:0] [[1. 1.] [1. 2.]] [stdout:1] [[1. 1.] [2. 1.]] [stdout:2] [[1. 2.] [1. 1.]] [stdout:3] [[2. 1.] [1. 1.]]
Everything as expected. We could also peek at the generated code, because we may be curious to see what sort of MPI calls Devito has generated...
%%px --targets 0
print(op)
[stdout:0] #define _POSIX_C_SOURCE 200809L #define START(S) struct timeval start_ ## S , end_ ## S ; gettimeofday(&start_ ## S , NULL); #define STOP(S,T) gettimeofday(&end_ ## S, NULL); T->S += (double)(end_ ## S .tv_sec-start_ ## S.tv_sec)+(double)(end_ ## S .tv_usec-start_ ## S .tv_usec)/1000000; #include "stdlib.h" #include "math.h" #include "sys/time.h" #include "xmmintrin.h" #include "pmmintrin.h" #include "mpi.h" struct dataobj { void *restrict data; unsigned long * size; unsigned long * npsize; unsigned long * dsize; int * hsize; int * hofs; int * oofs; void * dmap; } ; struct profiler { double section0; } ; int Kernel(struct dataobj *restrict u_vec, const int time_M, const int time_m, const int x_M, const int x_m, const int y_M, const int y_m, struct profiler * timers) { float (*restrict u)[u_vec->size[1]][u_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[u_vec->size[1]][u_vec->size[2]]) u_vec->data; /* Flush denormal numbers to zero in hardware */ _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON); _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON); for (int time = time_m, t0 = (time)%(2), t1 = (time + 1)%(2); time <= time_M; time += 1, t0 = (time)%(2), t1 = (time + 1)%(2)) { START(section0) for (int x = x_m; x <= x_M; x += 1) { #pragma omp simd aligned(u:64) for (int y = y_m; y <= y_M; y += 1) { u[t1][x + 2][y + 2] = u[t0][x + 2][y + 2] + 1; } } STOP(section0,timers) } return 0; }
Hang on. There's nothing MPI-specific here! At least apart from the header file #include "mpi.h"
. What's going on? Well, it's simple. Devito was smart enough to realize that this trivial Operator
doesn't even need any sort of halo exchange -- the Eq
implements a pure "map computation" (i.e., fully parallel), so it can just let each MPI process do its job without ever synchronizing with halo exchanges. We might want try again with a proper stencil Eq
.
%%px
op = Operator(Eq(u.forward, u.dx + 1))
%%px --targets 0
print(op)
[stdout:0] #define _POSIX_C_SOURCE 200809L #define START(S) struct timeval start_ ## S , end_ ## S ; gettimeofday(&start_ ## S , NULL); #define STOP(S,T) gettimeofday(&end_ ## S, NULL); T->S += (double)(end_ ## S .tv_sec-start_ ## S.tv_sec)+(double)(end_ ## S .tv_usec-start_ ## S .tv_usec)/1000000; #include "stdlib.h" #include "math.h" #include "sys/time.h" #include "xmmintrin.h" #include "pmmintrin.h" #include "mpi.h" struct dataobj { void *restrict data; unsigned long * size; unsigned long * npsize; unsigned long * dsize; int * hsize; int * hofs; int * oofs; void * dmap; } ; struct neighborhood { int ll, lc, lr; int cl, cc, cr; int rl, rc, rr; } ; struct profiler { double section0; } ; static void sendrecv0(struct dataobj *restrict u_vec, const int x_size, const int y_size, int ogtime, int ogx, int ogy, int ostime, int osx, int osy, int fromrank, int torank, MPI_Comm comm); static void haloupdate0(struct dataobj *restrict u_vec, MPI_Comm comm, struct neighborhood * nb, int otime); static void gather0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy); static void scatter0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy); int Kernel(struct dataobj *restrict u_vec, const float h_x, const int time_M, const int time_m, const int x_M, const int x_m, const int y_M, const int y_m, MPI_Comm comm, struct neighborhood * nb, struct profiler * timers) { float (*restrict u)[u_vec->size[1]][u_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[u_vec->size[1]][u_vec->size[2]]) u_vec->data; /* Flush denormal numbers to zero in hardware */ _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON); _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON); float r0 = 1.0F/h_x; for (int time = time_m, t0 = (time)%(2), t1 = (time + 1)%(2); time <= time_M; time += 1, t0 = (time)%(2), t1 = (time + 1)%(2)) { START(section0) haloupdate0(u_vec,comm,nb,t0); for (int x = x_m; x <= x_M; x += 1) { #pragma omp simd aligned(u:64) for (int y = y_m; y <= y_M; y += 1) { u[t1][x + 2][y + 2] = -r0*u[t0][x + 2][y + 2] + r0*u[t0][x + 3][y + 2] + 1; } } STOP(section0,timers) } return 0; } static void sendrecv0(struct dataobj *restrict u_vec, const int x_size, const int y_size, int ogtime, int ogx, int ogy, int ostime, int osx, int osy, int fromrank, int torank, MPI_Comm comm) { MPI_Request rrecv; MPI_Request rsend; float *restrict bufg_vec __attribute__ ((aligned (64))); posix_memalign((void**)(&bufg_vec),64,x_size*y_size*sizeof(float)); float *restrict bufs_vec __attribute__ ((aligned (64))); posix_memalign((void**)(&bufs_vec),64,x_size*y_size*sizeof(float)); MPI_Irecv(bufs_vec,x_size*y_size,MPI_FLOAT,fromrank,13,comm,&(rrecv)); if (torank != MPI_PROC_NULL) { gather0(bufg_vec,x_size,y_size,u_vec,ogtime,ogx,ogy); } MPI_Isend(bufg_vec,x_size*y_size,MPI_FLOAT,torank,13,comm,&(rsend)); MPI_Wait(&(rsend),MPI_STATUS_IGNORE); MPI_Wait(&(rrecv),MPI_STATUS_IGNORE); if (fromrank != MPI_PROC_NULL) { scatter0(bufs_vec,x_size,y_size,u_vec,ostime,osx,osy); } free(bufg_vec); free(bufs_vec); } static void haloupdate0(struct dataobj *restrict u_vec, MPI_Comm comm, struct neighborhood * nb, int otime) { sendrecv0(u_vec,u_vec->hsize[3],u_vec->npsize[2],otime,u_vec->oofs[2],u_vec->hofs[4],otime,u_vec->hofs[3],u_vec->hofs[4],nb->rc,nb->lc,comm); } static void gather0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy) { float (*restrict buf)[bx_size][by_size] __attribute__ ((aligned (64))) = (float (*)[bx_size][by_size]) buf_vec; float (*restrict u)[u_vec->size[1]][u_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[u_vec->size[1]][u_vec->size[2]]) u_vec->data; const int x_m = 0; const int y_m = 0; const int x_M = bx_size - 1; const int y_M = by_size - 1; for (int x = x_m; x <= x_M; x += 1) { #pragma omp simd aligned(u:64) for (int y = y_m; y <= y_M; y += 1) { buf[0][x][y] = u[otime][x + ox][y + oy]; } } } static void scatter0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy) { float (*restrict buf)[bx_size][by_size] __attribute__ ((aligned (64))) = (float (*)[bx_size][by_size]) buf_vec; float (*restrict u)[u_vec->size[1]][u_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[u_vec->size[1]][u_vec->size[2]]) u_vec->data; const int x_m = 0; const int y_m = 0; const int x_M = bx_size - 1; const int y_M = by_size - 1; for (int x = x_m; x <= x_M; x += 1) { #pragma omp simd aligned(u:64) for (int y = y_m; y <= y_M; y += 1) { u[otime][x + ox][y + oy] = buf[0][x][y]; } } }
Uh-oh -- now the generated code looks more complicated than before, though it still is pretty much human-readable. We can spot the following routines:
haloupdate0
performs a blocking halo exchange, relying on three additional functions, gather0
, sendrecv0
, and scatter0
;gather0
copies the (generally non-contiguous) boundary data into a contiguous buffer;sendrecv0
takes the buffered data and sends it to one or more neighboring processes; then it waits until all data from the neighboring processes is received;scatter0
copies the received data into the proper array locations.This is the simplest halo exchange scheme available in Devito. There are a few, and some of them apply aggressive optimizations, as shown later on.
Before looking at other scenarios and performance optimizations, there is one last thing it is worth discussing -- the data_with_halo
view.
%%px --no-stream --group-outputs=engine
print(u.data_with_halo[0])
# Uncomment to see halo for the next (computed) timestep
# u.data_with_halo[1]
[stdout:0] [[0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 1.]] [stdout:1] [[0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [1. 0. 0. 0.]] [stdout:2] [[0. 0. 0. 1.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.]] [stdout:3] [[1. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.] [0. 0. 0. 0.]]
This is again a global data view. The shown with_halo is the "true" halo surrounding the physical domain (often referred to as the "outer halo"), not the halo used for the MPI halo exchanges (often referred to as "ghost region" or "inner halo"). A user can straightforwardly initialize the "true" halo region (which is typically read by a stencil Eq
when an Operator
iterates in proximity of the domain bounday).
Note: This "halo" is often encountered as "ghost cell area" in literature
%%px
u.data_with_halo[:] = 1.
%%px --no-stream --group-outputs=engine
# Note: Both time buffer slices are now printed
print(u.data_with_halo[:])
[stdout:0] [[[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]] [[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]]] [stdout:1] [[[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]] [[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]]] [stdout:2] [[[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]] [[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]]] [stdout:3] [[[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]] [[1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.] [1. 1. 1. 1.]]]
A SparseFunction
represents a sparse set of points which are generically unaligned with the Grid
. A sparse point could be anywhere within a grid, and is therefore attached some coordinates. Given a sparse point, Devito looks at its coordinates and, based on the domain decomposition, logically assigns it to a given MPI process; this is purely logical ownership, as in Python-land, before running an Operator, the sparse point physically lives on the MPI rank which created it. Within op.apply
, right before jumping to C-land, the sparse points are scattered to their logical owners; upon returning to Python-land, the sparse points are gathered back to their original location.
In the following example, we attempt injection of four sparse points into the neighboring grid points via linear interpolation.
%%px
from devito import Function, SparseFunction
grid = Grid(shape=(4, 4), extent=(3.0, 3.0))
x, y = grid.dimensions
f = Function(name='f', grid=grid)
coords = [(0.5, 0.5), (1.5, 2.5), (1.5, 1.5), (2.5, 1.5)]
sf = SparseFunction(name='sf', grid=grid, npoint=len(coords), coordinates=coords)
Let:
We show the global view, that is what the user "sees".
O --- O --- O --- O
| A | | |
O --- O --- O --- O
| | C | B |
O --- O --- O --- O
| | D | |
O --- O --- O --- O
And now the local view, that is what the MPI ranks own when jumping to C-land.
Rank 0 Rank 1
O --- O --- x x --- O --- O
| A | | | | |
O --- O --- x x --- O --- O
| | C | | C | B |
x --- x --- x x --- x --- x
Rank 2 Rank 3
x --- x --- x x --- x --- x
| | C | | C | B |
O --- O --- x x --- O --- O
| | D | | D | |
O --- O --- x x --- O --- O
We observe that the sparse points along the boundary of two or more MPI ranks are duplicated and thus redundantly computed over multiple processes. However, the contributions from these points to the neighboring halo points are naturally ditched, so the final result of the interpolation is as expected. Let's convince ourselves that this is the case. We assign a value of $5$ to each sparse point. Since we are using linear interpolation and all points are placed at the exact center of a grid quadrant, we expect that the contribution of each sparse point to a neighboring grid point will be $5 * 0.25 = 1.25$. Based on the global view above, we eventually expect f
to look like as follows:
1.25 --- 1.25 --- 0.00 --- 0.00
| | | |
1.25 --- 2.50 --- 2.50 --- 1.25
| | | |
0.00 --- 2.50 --- 3.75 --- 1.25
| | | |
0.00 --- 1.25 --- 1.25 --- 0.00
Let's check this out.
%%px --no-stream --group-outputs=engine
#NBVAL_IGNORE_OUTPUT
sf.data[:] = 5.
op = Operator(sf.inject(field=f, expr=sf))
summary = op.apply()
[stderr:0] Operator `Kernel` ran in 0.01 s
%%px --no-stream --group-outputs=engine
print(f.data)
[stdout:0] [[1.25 1.25] [1.25 2.5 ]] [stdout:1] [[0. 0. ] [2.5 1.25]] [stdout:2] [[0. 2.5 ] [0. 1.25]] [stdout:3] [[3.75 1.25] [1.25 0. ]]
The Devito compiler applies several optimizations before generating code.
Function
update and the data is not “dirty” yet.Additionally, the Devito compiler offers a few modes of different computation and communication strategies, each exhibiting superiority under specific conditions for a kernel, such as operational intensity, memory footprint, the number of utilized ranks, and the characteristics of the cluster’s interconnect. Some of the best patterns are namely basic
, diagonal
, and full
. Those have proven to be effective in improving the efficiency and scalability of computations, under several scnarios.
basic
: The basic pattern is the simplest among the methods presented in this section and targets CPUs and GPUs. This mode, illustrated in Figure 5a, involves blocking point-to-point (P2P) data exchanges perpendicular to the 2D and 3D planes of the Cartesian topology between MPI ranks. Forexample, each rank issues 4 in 2D and 6 communications in 3D. While this mode benefits from fewer communications, it may encounter synchronization bottlenecks during grid updates before computing the next timestep. This method allocates the memory needed to exchange halos in C-land before every communication, only adding negligible overhead.
diag2
: Compared to the basic
, this pattern also performs diagonal data exchanges, facilitating the communication of the corner points in our domains in a single step. This results in more communications, with 8 in 2D and 26 in 3D. Although it involves more communications, they are issuedin a single step, and the messages are smaller compared to basic. Compared to basic, this mode slightly benefits from preallocated buffers in python-land, eliminating the need to allocate data in C-land before every communication. The latter is why this version is not supported on GPUs since the mechanism of pre-allocating buffers on device memory still needs to be supported.
full
: This pattern leverages communication/computation overlap. The local-per-rank domain is logically decomposed into an inner (CORE) and an outer (OWNED/remainder) area. In a 3D example, the remainder areas take the form of faces and vector-like areas along the decomposed dimensions. The number of communications is the same as in the diagonal mode. This mode benefits from overlappingtwo steps: halo updating and the stencil computations in the CORE area. After this step, stencil updates are computed in the ``remainder” areas. In the ideal case, assuming that communication is perfectly hidden, the execution time should converge to the time needed to compute the CORE plus the time needed to compute the remainder. An important drawback of this mode is the slower GPts/s achieved at the remainder areas. The elements in the remainder are not contiguous; therefore, we have less efficient memory access patterns (strides) along vectorizable dimensions. These areas have lower cache utilization and vectorization efficiency.
Now, let's see the diag2
method:
%%px
configuration['mpi'] = 'diag2'
op = Operator(Eq(u.forward, u.dx + 1))
# Uncomment below to show code (it's quite verbose)
# print(op)
The body of the time-stepping loop has slightly changed compared to basic
:
Some differences are:
bufg
, bufs
are not allocated at C-land, as this already happens in Python-landncomms
communications which are not only vertical or horizontal, but also diagonal.This leads to more messages, but slightly smaller compared to basic
.
We could now peek at the generated code of the full
mode and see that things now look differently.
%%px
configuration['mpi'] = 'full'
op = Operator(Eq(u.forward, u.dx + 1))
# Uncomment below to show code (it's quite verbose)
# print(op)
The body of the time-stepping loop has changed, as it now implements a classic computation/communication overlap scheme:
haloupdate0
triggers non-blocking communications;compute0
executes the core domain region, that is the sub-region which doesn't require reading from halo data to be computed;halowait0
wait and terminates the non-blocking communications;remainder0
, which internally calls compute0
, computes the boundary region requiring the now up-to-date halo data.More information on Devito's MPI, can be found in this pre-print: Automated MPI-X code generation for scalable finite-difference solvers