PCA is a dimensionality reduction algorithm that works really well for datasets which have correlated columns. It combines the features of X in linear combination such that the new components capture the most information, or variance, of the data.
Unlike the single-GPU implementation, The MNMG PCA API currently requires a Dask cuDF Dataframe as input. transform()
also returns a Dask cuDF Dataframe. The Dask cuDF Dataframe API is very similar to the Dask DataFrame API, but underlying Dataframes are cuDF, rather than Pandas.
For information on converting your dataset to Dask cuDF format: https://rapidsai.github.io/projects/cudf/en/stable/dask-cudf.html#multi-gpu-with-dask-cudf
For more information about cuML's PCA implementation: https://rapidsai.github.io/projects/cuml/en/stable/api.html#cuml.dask.decomposition.PCA
import os
import numpy as np
import pandas as pd
import cudf as gd
from cuml.dask.common import to_dask_df
from cuml.dask.datasets import make_blobs
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
from dask_ml.decomposition import PCA as skPCA
from cuml.dask.decomposition import PCA as cumlPCA
We can use the LocalCUDACluster
to start a Dask cluster on a single machine with one worker mapped to each GPU. This is called one-process-per-GPU (OPG).
cluster = LocalCUDACluster(threads_per_worker=1)
client = Client(cluster)
n_samples = 2**18
n_features = 20
n_components = 2
whiten = False
random_state = 32
svd_solver = "full"
%%time
X_dcudf, _ = make_blobs(n_samples,
n_features,
centers=1,
cluster_std=0.01,
random_state=random_state)
wait(X_dcudf)
Dask-ML accepts a Dask.Array, instead of Dask.Dataframe, as input. Dask ML also wants to know the exact sizes of the partitions so we use the argument lengths=True
to get this information from the workers.
X_ddf = to_dask_df(X_dcudf).to_dask_array(lengths=True)
%%time
pca_sk = skPCA(n_components=n_components,
svd_solver=svd_solver,
whiten=whiten,
random_state=random_state)
result_sk = pca_sk.fit_transform(X_ddf)
%%time
pca_cuml = cumlPCA(n_components=n_components,
svd_solver=svd_solver,
whiten=whiten,
random_state=random_state)
result_cuml = pca_cuml.fit_transform(X_dcudf)
passed = np.allclose(pca_sk.singular_values_,
pca_cuml.singular_values_.to_array(),
atol=0.01)
print('compare pca: cuml vs sklearn singular_values_ {}'.format('equal' if passed else 'NOT equal'))
passed = np.allclose(pca_sk.explained_variance_,
pca_cuml.explained_variance_.to_array(),
atol=1e-6)
print('compare pca: cuml vs sklearn explained_variance_ {}'.format('equal' if passed else 'NOT equal'))
passed = np.allclose(pca_sk.explained_variance_ratio_,
pca_cuml.explained_variance_ratio_.to_array(),
atol=1e-6)
print('compare pca: cuml vs sklearn explained_variance_ratio_ {}'.format('equal' if passed else 'NOT equal'))
sk_components = np.abs(pca_sk.components_)
cuml_components = np.abs(np.asarray(pca_cuml.components_.as_gpu_matrix()))
passed = np.allclose(sk_components, cuml_components, atol=1e-3)
print('compare pca: cuml vs sklearn components_ {}'.format('equal' if passed else 'NOT equal'))
passed = np.allclose(result_sk, np.asarray(result_cuml.compute().as_gpu_matrix()), atol=1e-1)
print('compare pca: cuml vs sklearn transformed results %s'%('equal'if passed else 'NOT equal'))