In this notebook we investigate the use of pylops-distributed CG and CGLS solvers with distributed operators.
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import matplotlib.pyplot as plt
import scipy as sp
import skfmm
import dask
import dask.array as da
import pylops
import pylops_distributed
from scipy.sparse.linalg.interface import MatrixLinearOperator, aslinearoperator
from scipy.linalg import lstsq, solve
from scipy.sparse.linalg import cg, lsqr
from dask import persist
from dask.distributed import Client, LocalCluster, performance_report
os.getenv('OMP_NUM_THREADS'), os.getenv('MKL_NUM_THREADS'), os.getenv('OPENBLAS_NUM_THREADS')
('1', '1', '1')
#nchunks = [2, 1]
#nchunks = [2, 2]
nchunks = [4, 4]
#client = pylops_distributed.utils.backend.dask(processes=True, threads_per_worker=1, n_workers=4)
#client = Client() # same as processes=True
client = pylops_distributed.utils.backend.dask(processes=False, threads_per_worker=1, n_workers=4)
client
Client
|
Cluster
|
Let's just try out the solver using numpy inputs (matrix and vector). As da.xx is never explicitely invoked when compute=False
and client=None
the solver will simply operate on numpy arrays
n = 3000
niter = 1000 # premature stop... # n * 2 to convergence
x = np.ones(n)
np.random.seed(0)
A = np.random.randn(n, n)
A = np.dot(A.T, A)
#print('eigs', np.linalg.eig(A)[0])
Aop = MatrixLinearOperator(A)
#Aop = aslinearoperator(A)
y = Aop.matvec(x)
xinv_sp = cg(Aop, y, tol=0, maxiter=niter)[0]
xinv = pylops_distributed.optimization.cg.cg(Aop, y, np.zeros_like(x), tol=0, niter=niter)[0]
#print(xinv_sp)
#print(xinv)
plt.figure()
plt.plot(xinv_sp,'r', xinv, '--g', x,'b')
plt.ylim(0.9, 1.1);
Let's now apply the forward using the LinearOperator interface
Ada = da.from_array(A, chunks=(n//nchunks[0], n//nchunks[1])).persist() # move the data to the workers once
x = da.ones(n) #, chunks=(n//nchunks[1]))
x0 = da.zeros(n) #, chunks=(n//nchunks[1]))
Aop = MatrixLinearOperator(A)
Adaop = pylops_distributed.MatrixMult(Ada, compute=(False, False))
Ada1op = pylops_distributed.MatrixMult(Ada, compute=(True, True))
# both of these take and return a numpy array
print(Aop.matvec(np.ones(n)), Ada1op.matvec(np.ones(n)))
# takes and returns a dask array
print(Adaop.matvec(np.ones(n)))
[-1913.40090063 -3343.24939275 -423.66494977 ... 477.63396435 134.07361261 1897.29151543] [-1913.40090063 -3343.24939275 -423.66494977 ... 477.63396435 134.07361261 1897.29151543] dask.array<sum-aggregate, shape=(3000,), dtype=float64, chunksize=(750,), chunktype=numpy.ndarray>
Ada
|
And the inverse problem with different approches when it comes to the use of dask
niter = 100
y = Aop * np.ones(n)
yy = Adaop * da.ones(n)
# scipy
xinv_sp = cg(Aop, y, maxiter=niter)[0]
# dask with compute at each iter
xinv = pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, compute=True)[0]
# dask with persist at each iter
xinv1 = pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, client=client)[0]
# dask with all graph computed in one go
xinv2 = pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter)[0]
# dask for only the matvec operation (anything else numpy)
xinv3 = pylops_distributed.optimization.cg.cg(Ada1op, y, np.zeros(n), tol=0, niter=niter)[0]
print(xinv_sp)
print(xinv)
print(xinv1.compute())
print(xinv2.compute())
print(xinv3)
[1.02463913 0.90384892 0.95533622 ... 0.9619385 0.97546219 1.11433985] [1.02463913 0.90384892 0.95533622 ... 0.9619385 0.97546219 1.11433985] [1.02463913 0.90384892 0.95533622 ... 0.9619385 0.97546219 1.11433985] [1.02463913 0.90384892 0.95533622 ... 0.9619385 0.97546219 1.11433985] [1.02463913 0.90384892 0.95533622 ... 0.9619385 0.97546219 1.11433985]
#client.cancel(xinv1)
#client.cancel(xinv2)
# scipy
%timeit -n 1 -r 3 cg(Aop, y, maxiter=niter)[0]
588 ms ± 17.6 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
# dask with compute at each iter
#%timeit -n 1 -r 3 pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, compute=True)[0]
# dask with persist at each iter
%timeit -n 1 -r 3 pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, client=client)[0].compute()
23.4 s ± 46.6 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
# dask with all graph computed in one go
%timeit -n 1 -r 3 pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter)[0].compute()
7.7 s ± 60.6 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
# dask for only the matvec operation (anything else numpy)
%timeit -n 1 -r 3 pylops_distributed.optimization.cg.cg(Ada1op, y, np.zeros(n), tol=0, niter=niter)[0]
12.4 s ± 79.9 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
with performance_report(filename="dask-report-cg_compute.html"):
pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, compute=True)[0]
with performance_report(filename="dask-report-cg_persist.html"):
pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, client=client)[0].compute()
with performance_report(filename="dask-report-cg_postponed.html"):
pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter)[0].compute()
with performance_report(filename="dask-report-cg_onlymatvec.html"):
pylops_distributed.optimization.cg.cg(Ada1op, y, np.zeros(n), tol=0, niter=niter)[0]
n, m = 1000, 50
x = da.ones(m, chunks=(m//nchunks[1]))
x0 = da.zeros(m, chunks=(m//nchunks[1]))
A = np.random.randn(n, m)
Ada = da.from_array(A, chunks=(n//nchunks[0], m//nchunks[1]))
Aop = pylops_distributed.MatrixMult(Ada, compute=(False, False))
y = Aop * x
xinv = pylops_distributed.optimization.cg.cgls(Aop, y, x0, m)[0]
xinv.compute()
array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])
#y.visualize(rankdir="LR")
#xinv.visualize(rankdir="LR")
%timeit -n 1 -r 3 pylops_distributed.optimization.cg.cgls(Aop, y, x0, tol=0, niter=20, compute=True)
9.66 s ± 44.4 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
%timeit -n 1 -r 3 pylops_distributed.optimization.cg.cgls(Aop, y, x0, tol=0, niter=20, client=client)[0].compute()
10.7 s ± 154 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
%timeit -n 1 -r 3 pylops_distributed.optimization.cg.cgls(Aop, y, x0, tol=0, niter=20)[0].compute()
7.13 s ± 81.5 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
import numpy as np
import dask.array as da
import pylops_distributed
from scipy.sparse.linalg import lsqr
from scipy.sparse.linalg.interface import MatrixLinearOperator
n, m = 100, 100
A = da.random.random((n, m), chunks=(n//nchunks[0], m//nchunks[1]))
# dask for everything (suboptimal as the solver is not written with dask
#in mind and bring back to numpy here and there)
#Aop = pylops_distributed.MatrixMult(A)
# dask for only the matvec operation (anything else numpy)
Aop = MatrixLinearOperator(A)
Aop1 = pylops_distributed.MatrixMult(A, compute=(True, True))
y = Aop * np.ones(m)
xinv = lsqr(Aop1, y, iter_lim=n)[0]
y, xinv
(array([49.1663997 , 50.56491025, 46.33127647, 49.91828739, 52.6984893 , 55.47269824, 54.461011 , 52.8146886 , 48.5026307 , 40.83736659, 45.85238495, 46.39221617, 49.76043088, 53.63444822, 49.7066871 , 53.99700797, 45.55617584, 48.09187034, 48.2602893 , 49.9416108 , 52.82253291, 47.7674436 , 50.10957465, 55.37026497, 52.80309789, 48.1086633 , 50.16474483, 50.89657529, 47.04575089, 53.82232973, 48.26146657, 52.57065493, 47.23123338, 50.71375005, 49.81424411, 46.81939422, 50.03993458, 55.20772363, 47.13274771, 53.26142395, 45.3254019 , 51.07767304, 49.87357891, 52.15761628, 47.63258035, 49.54250868, 48.12370408, 51.44552688, 47.35080766, 48.34678681, 51.55254009, 52.59149541, 45.20839298, 48.7127196 , 57.0026704 , 45.35623158, 46.63943708, 47.36902913, 51.22398442, 53.34852573, 49.54704973, 56.50673386, 49.47172904, 47.59244381, 53.38177431, 53.96599057, 42.58664758, 48.64074614, 46.83191883, 47.62945896, 51.24305588, 46.71508471, 51.62726875, 49.67296692, 48.75013858, 52.52510135, 49.33720209, 49.0015986 , 48.7028421 , 48.77727954, 52.36310927, 52.80453925, 45.46823084, 52.41624985, 52.48094079, 48.99219033, 48.05087661, 46.97029063, 52.47243947, 48.14992253, 48.99236362, 50.62071484, 48.1088664 , 50.47942405, 47.35118824, 49.72057154, 49.25304843, 50.14191351, 54.58070386, 47.95908154]), array([1.00123791, 0.99965144, 1.00107446, 1.00019625, 0.99778706, 0.99754414, 1.00068116, 1.00158694, 1.0024654 , 1.00132535, 1.00100949, 1.00167346, 1.00064151, 1.00010104, 0.99952415, 1.00053871, 1.00158629, 0.99895122, 0.99933006, 0.9988364 , 0.99704597, 1.0008912 , 1.00265668, 0.99942655, 1.00071859, 1.00167863, 1.0002998 , 1.00219057, 0.99839334, 0.99903575, 1.0018371 , 0.9982381 , 1.00099775, 1.00054205, 1.00028819, 1.00025125, 1.00074573, 0.99930032, 1.00163945, 0.99947026, 1.00032072, 0.99847433, 0.99958077, 0.99807002, 1.00037942, 0.99795971, 1.00071912, 1.00075464, 1.00018163, 0.99888905, 0.99692533, 0.99999603, 0.99843641, 1.00224598, 1.00047645, 1.00055994, 1.00064719, 0.99808737, 1.00214344, 0.99980978, 0.99949129, 1.0015822 , 1.00251748, 0.99915141, 0.99975071, 1.00216164, 1.00048747, 0.99971042, 0.99776691, 0.99724029, 0.99966459, 0.99915097, 0.99944858, 1.00155234, 1.00058847, 0.99757651, 0.99716088, 1.00042966, 1.0023474 , 0.99943304, 0.99917602, 1.00312469, 0.99841468, 1.00041369, 0.99883748, 1.00100147, 1.0013569 , 0.99956794, 0.99783262, 0.9979013 , 0.99989043, 1.00178703, 1.00049715, 1.00007657, 0.99955769, 0.99727333, 0.9988159 , 1.00125449, 1.00050019, 0.99911199]))
%%time
xinv = lsqr(Aop, y, iter_lim=n)[0]
CPU times: user 33.8 s, sys: 2.46 s, total: 36.3 s Wall time: 35 s
%%time
xinv = lsqr(Aop1, y, iter_lim=n)[0]
CPU times: user 32.9 s, sys: 2.44 s, total: 35.3 s Wall time: 34.1 s
with performance_report(filename="dask-report-lsqr_MatrixLinearOperator.html"):
lsqr(Aop, y, iter_lim=n)[0]
with performance_report(filename="dask-report-lsqr_pylopsMatrixMult.html"):
lsqr(Aop1, y, iter_lim=n)[0]
client.close()