Fugue is a low-code unified interface for different computing frameworks such as Spark, Dask and Pandas. PyCaret is using Fugue to support distributed computing scenarios.
Let's start with the most standard example, the code is exactly the same as the local version, there is no magic.
from pycaret.datasets import get_data
from pycaret.classification import *
setup(data=get_data("juice", verbose=False), target = 'Purchase', n_jobs=1)
test_models = models().index.tolist()[:5]
Description | Value | |
---|---|---|
0 | Session id | 4292 |
1 | Target | Purchase |
2 | Target type | Binary |
3 | Target mapping | CH: 0, MM: 1 |
4 | Original data shape | (1070, 19) |
5 | Transformed data shape | (1070, 19) |
6 | Transformed train set shape | (748, 19) |
7 | Transformed test set shape | (322, 19) |
8 | Ordinal features | 1 |
9 | Numeric features | 17 |
10 | Categorical features | 1 |
11 | Preprocess | True |
12 | Imputation type | simple |
13 | Numeric imputation | mean |
14 | Categorical imputation | constant |
15 | Maximum one-hot encoding | 5 |
16 | Encoding method | None |
17 | Fold Generator | StratifiedKFold |
18 | Fold Number | 10 |
19 | CPU Jobs | 1 |
20 | Use GPU | False |
21 | Log Experiment | False |
22 | Experiment Name | clf-default-name |
23 | USI | 9c46 |
compare_model
is also exactly the same if you don't want to use a distributed system
compare_models(include=test_models, n_select=2)
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
lr | Logistic Regression | 0.8330 | 0.8975 | 0.7532 | 0.8097 | 0.7791 | 0.6451 | 0.6475 | 0.3270 |
dt | Decision Tree Classifier | 0.7715 | 0.7625 | 0.7224 | 0.7058 | 0.7106 | 0.5224 | 0.5256 | 0.0780 |
nb | Naive Bayes | 0.7608 | 0.8337 | 0.7802 | 0.6693 | 0.7179 | 0.5129 | 0.5206 | 0.0780 |
knn | K Neighbors Classifier | 0.7594 | 0.7989 | 0.6093 | 0.7323 | 0.6620 | 0.4782 | 0.4856 | 0.1080 |
svm | SVM - Linear Kernel | 0.4881 | 0.0000 | 0.7590 | 0.3346 | 0.4628 | 0.0615 | 0.1061 | 0.0590 |
Processing: 0%| | 0/26 [00:00<?, ?it/s]
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1, l1_ratio=None, max_iter=1000, multi_class='auto', n_jobs=None, penalty='l2', random_state=4292, solver='lbfgs', tol=0.0001, verbose=0, warm_start=False), DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini', max_depth=None, max_features=None, max_leaf_nodes=None, min_impurity_decrease=0.0, min_samples_leaf=1, min_samples_split=2, min_weight_fraction_leaf=0.0, random_state=4292, splitter='best')]
Now let's make it distributed, as a toy case, on dask. The only thing changed is an additional parameter parallel_backend
from pycaret.parallel import FugueBackend
compare_models(include=test_models, n_select=2, parallel=FugueBackend("dask"))
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
lr | Logistic Regression | 0.8330 | 0.8975 | 0.7532 | 0.8097 | 0.7791 | 0.6451 | 0.6475 | 0.214 |
dt | Decision Tree Classifier | 0.7715 | 0.7625 | 0.7224 | 0.7058 | 0.7106 | 0.5224 | 0.5256 | 0.078 |
nb | Naive Bayes | 0.7608 | 0.8337 | 0.7802 | 0.6693 | 0.7179 | 0.5129 | 0.5206 | 0.209 |
knn | K Neighbors Classifier | 0.7594 | 0.7989 | 0.6093 | 0.7323 | 0.6620 | 0.4782 | 0.4856 | 0.134 |
svm | SVM - Linear Kernel | 0.4881 | 0.0000 | 0.7590 | 0.3346 | 0.4628 | 0.0615 | 0.1061 | 0.058 |
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1, l1_ratio=None, max_iter=1000, multi_class='auto', n_jobs=None, penalty='l2', random_state=4292, solver='lbfgs', tol=0.0001, verbose=0, warm_start=False), DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini', max_depth=None, max_features=None, max_leaf_nodes=None, min_impurity_decrease=0.0, min_samples_leaf=1, min_samples_split=2, min_weight_fraction_leaf=0.0, random_state=4292, splitter='best')]
In order to use Spark as the execution engine, you must have access to a Spark cluster, and you must have a SparkSession
, let's initialize a local Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Now just change parallel_backend
to this session object, you make it run on Spark. You must understand this is a toy case. In the real situation, you need to have a SparkSession pointing to a real Spark cluster to enjoy the power of Spark
compare_models(include=test_models, n_select=2, parallel=FugueBackend(spark))
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
lr | Logistic Regression | 0.8330 | 0.8975 | 0.7532 | 0.8097 | 0.7791 | 0.6451 | 0.6475 | 0.678 |
dt | Decision Tree Classifier | 0.7715 | 0.7625 | 0.7224 | 0.7058 | 0.7106 | 0.5224 | 0.5256 | 0.208 |
nb | Naive Bayes | 0.7608 | 0.8337 | 0.7802 | 0.6693 | 0.7179 | 0.5129 | 0.5206 | 0.213 |
knn | K Neighbors Classifier | 0.7594 | 0.7989 | 0.6093 | 0.7323 | 0.6620 | 0.4782 | 0.4856 | 0.573 |
svm | SVM - Linear Kernel | 0.4881 | 0.0000 | 0.7590 | 0.3346 | 0.4628 | 0.0615 | 0.1061 | 0.059 |
[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1, l1_ratio=None, max_iter=1000, multi_class='auto', n_jobs=None, penalty='l2', random_state=4292, solver='lbfgs', tol=0.0001, verbose=0, warm_start=False), DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini', max_depth=None, max_features=None, max_leaf_nodes=None, min_impurity_decrease=0.0, min_samples_leaf=1, min_samples_split=2, min_weight_fraction_leaf=0.0, random_state=4292, splitter='best')]
In the end, you can pull
to get the metrics table
pull()
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
lr | Logistic Regression | 0.8330 | 0.8975 | 0.7532 | 0.8097 | 0.7791 | 0.6451 | 0.6475 | 0.678 |
dt | Decision Tree Classifier | 0.7715 | 0.7625 | 0.7224 | 0.7058 | 0.7106 | 0.5224 | 0.5256 | 0.208 |
nb | Naive Bayes | 0.7608 | 0.8337 | 0.7802 | 0.6693 | 0.7179 | 0.5129 | 0.5206 | 0.213 |
knn | K Neighbors Classifier | 0.7594 | 0.7989 | 0.6093 | 0.7323 | 0.6620 | 0.4782 | 0.4856 | 0.573 |
svm | SVM - Linear Kernel | 0.4881 | 0.0000 | 0.7590 | 0.3346 | 0.4628 | 0.0615 | 0.1061 | 0.059 |
It follows the same pattern as classification.
from pycaret.datasets import get_data
from pycaret.regression import *
setup(data=get_data("insurance", verbose=False), target = 'charges', n_jobs=1)
test_models = models().index.tolist()[:5]
Description | Value | |
---|---|---|
0 | Session id | 3514 |
1 | Target | charges |
2 | Target type | Regression |
3 | Data shape | (1338, 10) |
4 | Train data shape | (936, 10) |
5 | Test data shape | (402, 10) |
6 | Ordinal features | 2 |
7 | Numeric features | 3 |
8 | Categorical features | 3 |
9 | Preprocess | True |
10 | Imputation type | simple |
11 | Numeric imputation | mean |
12 | Categorical imputation | constant |
13 | Maximum one-hot encoding | 5 |
14 | Encoding method | None |
15 | Fold Generator | KFold |
16 | Fold Number | 10 |
17 | CPU Jobs | 1 |
18 | Use GPU | False |
19 | Log Experiment | False |
20 | Experiment Name | reg-default-name |
21 | USI | 478f |
compare_model
is also exactly the same if you don't want to use a distributed system
compare_models(include=test_models, n_select=2, sort="MAE")
Model | MAE | MSE | RMSE | R2 | RMSLE | MAPE | TT (Sec) | |
---|---|---|---|---|---|---|---|---|
lar | Least Angle Regression | 4215.3750 | 36942784.9091 | 6056.6512 | 0.7412 | 0.5944 | 0.4301 | 0.0540 |
lr | Linear Regression | 4216.0692 | 36946939.1774 | 6057.0115 | 0.7412 | 0.5956 | 0.4303 | 0.1540 |
lasso | Lasso Regression | 4216.0766 | 36944721.4684 | 6056.8051 | 0.7412 | 0.5943 | 0.4303 | 0.0590 |
ridge | Ridge Regression | 4226.7264 | 36949983.8412 | 6057.1250 | 0.7413 | 0.5923 | 0.4319 | 0.0550 |
en | Elastic Net | 7260.0035 | 90321787.1218 | 9448.8041 | 0.3861 | 0.7217 | 0.8981 | 0.0540 |
Processing: 0%| | 0/26 [00:00<?, ?it/s]
[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True, jitter=None, n_nonzero_coefs=500, normalize='deprecated', precompute='auto', random_state=3514, verbose=False), LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize='deprecated', positive=False)]
Now let's make it distributed, as a toy case, on dask. The only thing changed is an additional parameter parallel_backend
from pycaret.parallel import FugueBackend
compare_models(include=test_models, n_select=2, sort="MAE", parallel=FugueBackend("dask"))
Model | MAE | MSE | RMSE | R2 | RMSLE | MAPE | TT (Sec) | |
---|---|---|---|---|---|---|---|---|
lar | Least Angle Regression | 4215.3750 | 3.694278e+07 | 6056.6512 | 0.7412 | 0.5944 | 0.4301 | 0.055 |
lr | Linear Regression | 4216.0692 | 3.694694e+07 | 6057.0115 | 0.7412 | 0.5956 | 0.4303 | 0.054 |
lasso | Lasso Regression | 4216.0766 | 3.694472e+07 | 6056.8051 | 0.7412 | 0.5943 | 0.4303 | 0.056 |
ridge | Ridge Regression | 4226.7264 | 3.694998e+07 | 6057.1250 | 0.7413 | 0.5923 | 0.4319 | 0.111 |
en | Elastic Net | 7260.0035 | 9.032179e+07 | 9448.8041 | 0.3861 | 0.7217 | 0.8981 | 0.236 |
[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True, jitter=None, n_nonzero_coefs=500, normalize='deprecated', precompute='auto', random_state=3514, verbose=False), LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize='deprecated', positive=False)]
In order to use Spark as the execution engine, you must have access to a Spark cluster, and you must have a SparkSession
, let's initialize a local Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Now just change parallel_backend
to this session object, you make it run on Spark. You must understand this is a toy case. In the real situation, you need to have a SparkSession pointing to a real Spark cluster to enjoy the power of Spark
compare_models(include=test_models, n_select=2, sort="MAE", parallel=FugueBackend(spark))
Model | MAE | MSE | RMSE | R2 | RMSLE | MAPE | TT (Sec) | |
---|---|---|---|---|---|---|---|---|
lar | Least Angle Regression | 4215.3750 | 3.694278e+07 | 6056.6512 | 0.7412 | 0.5944 | 0.4301 | 0.098 |
lr | Linear Regression | 4216.0692 | 3.694694e+07 | 6057.0115 | 0.7412 | 0.5956 | 0.4303 | 0.100 |
lasso | Lasso Regression | 4216.0766 | 3.694472e+07 | 6056.8051 | 0.7412 | 0.5943 | 0.4303 | 0.094 |
ridge | Ridge Regression | 4226.7264 | 3.694998e+07 | 6057.1250 | 0.7413 | 0.5923 | 0.4319 | 0.053 |
en | Elastic Net | 7260.0035 | 9.032179e+07 | 9448.8041 | 0.3861 | 0.7217 | 0.8981 | 0.092 |
[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True, jitter=None, n_nonzero_coefs=500, normalize='deprecated', precompute='auto', random_state=3514, verbose=False), LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize='deprecated', positive=False)]
In the end, you can pull
to get the metrics table
pull()
Model | MAE | MSE | RMSE | R2 | RMSLE | MAPE | TT (Sec) | |
---|---|---|---|---|---|---|---|---|
lar | Least Angle Regression | 4215.3750 | 3.694278e+07 | 6056.6512 | 0.7412 | 0.5944 | 0.4301 | 0.098 |
lr | Linear Regression | 4216.0692 | 3.694694e+07 | 6057.0115 | 0.7412 | 0.5956 | 0.4303 | 0.100 |
lasso | Lasso Regression | 4216.0766 | 3.694472e+07 | 6056.8051 | 0.7412 | 0.5943 | 0.4303 | 0.094 |
ridge | Ridge Regression | 4226.7264 | 3.694998e+07 | 6057.1250 | 0.7413 | 0.5923 | 0.4319 | 0.053 |
en | Elastic Net | 7260.0035 | 9.032179e+07 | 9448.8041 | 0.3861 | 0.7217 | 0.8981 | 0.092 |
As you see, the results from the distributed versions can be different from your local versions. In the later sections, we will show how to make them identical.
It follows the same pattern as classification.
from pycaret.datasets import get_data
from pycaret.time_series import *
exp = TSForecastingExperiment()
exp.setup(data=get_data('airline', verbose=False), fh=12, fold=3, fig_kwargs={'renderer': 'notebook'}, session_id=42)
test_models = exp.models().index.tolist()[:5]
Description | Value | |
---|---|---|
0 | session_id | 42 |
1 | Target | Number of airline passengers |
2 | Approach | Univariate |
3 | Exogenous Variables | Not Present |
4 | Original data shape | (144, 1) |
5 | Transformed data shape | (144, 1) |
6 | Transformed train set shape | (132, 1) |
7 | Transformed test set shape | (12, 1) |
8 | Rows with missing values | 0.0% |
9 | Fold Generator | ExpandingWindowSplitter |
10 | Fold Number | 3 |
11 | Enforce Prediction Interval | False |
12 | Seasonal Period(s) Tested | 12 |
13 | Seasonality Present | True |
14 | Seasonalities Detected | [12] |
15 | Primary Seasonality | 12 |
16 | Target Strictly Positive | True |
17 | Target White Noise | No |
18 | Recommended d | 1 |
19 | Recommended Seasonal D | 1 |
20 | Preprocess | False |
21 | CPU Jobs | -1 |
22 | Use GPU | False |
23 | Log Experiment | False |
24 | Experiment Name | ts-default-name |
25 | USI | 49cf |
best_baseline_models = exp.compare_models(include=test_models, n_select=3)
best_baseline_models
Model | MASE | RMSSE | MAE | RMSE | MAPE | SMAPE | R2 | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
arima | ARIMA | 0.6830 | 0.6735 | 20.0069 | 22.2199 | 0.0501 | 0.0507 | 0.8677 | 0.3200 |
snaive | Seasonal Naive Forecaster | 1.1479 | 1.0945 | 33.3611 | 35.9139 | 0.0832 | 0.0879 | 0.6072 | 0.0200 |
polytrend | Polynomial Trend Forecaster | 1.6523 | 1.9202 | 48.6301 | 63.4299 | 0.1170 | 0.1216 | -0.0784 | 0.0167 |
naive | Naive Forecaster | 2.3599 | 2.7612 | 69.0278 | 91.0322 | 0.1569 | 0.1792 | -1.2216 | 1.0600 |
grand_means | Grand Means Forecaster | 5.5306 | 5.2596 | 162.4117 | 173.6492 | 0.4000 | 0.5075 | -7.0462 | 1.2700 |
Processing: 0%| | 0/27 [00:00<?, ?it/s]
[ARIMA(maxiter=50, method='lbfgs', order=(1, 0, 0), out_of_sample_size=0, scoring='mse', scoring_args=None, seasonal_order=(0, 1, 0, 12), start_params=None, suppress_warnings=False, trend=None, with_intercept=True), NaiveForecaster(sp=12, strategy='last', window_length=None), PolynomialTrendForecaster(degree=1, regressor=None, with_intercept=True)]
from pycaret.parallel import FugueBackend
best_baseline_models = exp.compare_models(include=test_models, n_select=3, parallel=FugueBackend("dask"))
best_baseline_models
Model | MASE | RMSSE | MAE | RMSE | MAPE | SMAPE | R2 | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
arima | ARIMA | 0.683 | 0.6735 | 20.0069 | 22.2199 | 0.0501 | 0.0507 | 0.8677 | 0.1267 |
snaive | Seasonal Naive Forecaster | 1.1479 | 1.0945 | 33.3611 | 35.9139 | 0.0832 | 0.0879 | 0.6072 | 0.0367 |
polytrend | Polynomial Trend Forecaster | 1.6523 | 1.9202 | 48.6301 | 63.4299 | 0.117 | 0.1216 | -0.0784 | 0.0133 |
naive | Naive Forecaster | 2.3599 | 2.7612 | 69.0278 | 91.0322 | 0.1569 | 0.1792 | -1.2216 | 0.0200 |
grand_means | Grand Means Forecaster | 5.5306 | 5.2596 | 162.4117 | 173.6492 | 0.4 | 0.5075 | -7.0462 | 0.0233 |
[ARIMA(maxiter=50, method='lbfgs', order=(1, 0, 0), out_of_sample_size=0, scoring='mse', scoring_args=None, seasonal_order=(0, 1, 0, 12), start_params=None, suppress_warnings=False, trend=None, with_intercept=True), NaiveForecaster(sp=12, strategy='last', window_length=None), PolynomialTrendForecaster(degree=1, regressor=None, with_intercept=True)]
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pycaret.parallel import FugueBackend
best_baseline_models = exp.compare_models(include=test_models[:2], n_select=3, parallel=FugueBackend(spark))
best_baseline_models
Model | MASE | RMSSE | MAE | RMSE | MAPE | SMAPE | R2 | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
naive | Naive Forecaster | 2.3599 | 2.7612 | 69.0278 | 91.0322 | 0.1569 | 0.1792 | -1.2216 | 2.5600 |
grand_means | Grand Means Forecaster | 5.5306 | 5.2596 | 162.4117 | 173.6492 | 0.4 | 0.5075 | -7.0462 | 2.5267 |
[NaiveForecaster(sp=1, strategy='last', window_length=None), NaiveForecaster(sp=1, strategy='mean', window_length=None)]
exp.pull()
Model | MASE | RMSSE | MAE | RMSE | MAPE | SMAPE | R2 | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|
naive | Naive Forecaster | 2.3599 | 2.7612 | 69.0278 | 91.0322 | 0.1569 | 0.1792 | -1.2216 | 2.5600 |
grand_means | Grand Means Forecaster | 5.5306 | 5.2596 | 162.4117 | 173.6492 | 0.4 | 0.5075 | -7.0462 | 2.5267 |
The above examples are pure toys, to make things work perfectly in a distributed system you must be careful about a few things
If you directly provide a dataframe in setup
, this dataset will need to be sent to all worker nodes. If the dataframe is 1G, you have 100 workers, then it is possible your dirver machine will need to send out up to 100G data (depending on specific framework's implementation), then this data transfer becomes a bottleneck itself. Instead, if you provide a lambda function, it doesn't change the local compute scenario, but the driver will only send the function reference to workers, and each worker will be responsible to load the data by themselves, so there is no heavy traffic on the driver side.
You should always use session_id
to make the distributed compute deterministic.
It is important to be explicit on n_jobs when you want to run something distributedly, so it will not overuse the local/remote resources. This can also avoid resrouce contention, and make the compute faster.
from pycaret.datasets import get_data
from pycaret.classification import *
setup(data_func=lambda: get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=1);
Description | Value | |
---|---|---|
0 | Session id | 0 |
1 | Target | Purchase |
2 | Target type | Binary |
3 | Target mapping | CH: 0, MM: 1 |
4 | Original data shape | (1070, 19) |
5 | Transformed data shape | (1070, 19) |
6 | Transformed train set shape | (748, 19) |
7 | Transformed test set shape | (322, 19) |
8 | Ordinal features | 1 |
9 | Numeric features | 17 |
10 | Categorical features | 1 |
11 | Preprocess | True |
12 | Imputation type | simple |
13 | Numeric imputation | mean |
14 | Categorical imputation | constant |
15 | Maximum one-hot encoding | 5 |
16 | Encoding method | None |
17 | Fold Generator | StratifiedKFold |
18 | Fold Number | 10 |
19 | CPU Jobs | 1 |
20 | Use GPU | False |
21 | Log Experiment | False |
22 | Experiment Name | clf-default-name |
23 | USI | ae18 |
batch_size
parameter helps adjust between load balence and overhead. For each batch, setup will be called only once. So
Choice | Load Balance | Overhead | Best Scenario |
---|---|---|---|
Smaller batch size | Better | Worse | training time >> data loading time or models ~= workers |
Larger batch size | Worse | Better | training time << data loading time or models >> workers |
The default value is set to 1
, meaning we want the best load balance.
In development, you can enable visual effect by display_remote=True
, but meanwhile you must also enable Fugue Callback so that the driver can monitor worker progress. But it is recommended to turn off display in production.
from pycaret.parallel import FugueBackend
fconf = {
"fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer", # keep this value
"fugue.rpc.flask_server.host": "0.0.0.0", # the driver ip address workers can access
"fugue.rpc.flask_server.port": "3333", # the open port on the dirver
"fugue.rpc.flask_server.timeout": "2 sec", # the timeout for worker to talk to driver
}
be = FugueBackend("dask", fconf, display_remote=True, batch_size=3, top_only=False)
compare_models(n_select=2, parallel=be)
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | DUMMY | DUMMY2 | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|---|---|
ridge | Ridge Classifier | 0.8383 | 0.0000 | 0.7802 | 0.8085 | 0.7896 | 0.6585 | 0.6637 | 0.0 | 0.0 | 0.099 |
lda | Linear Discriminant Analysis | 0.8329 | 0.8986 | 0.7701 | 0.8044 | 0.7824 | 0.6472 | 0.6522 | 0.0 | 1.0 | 0.132 |
lr | Logistic Regression | 0.8303 | 0.8959 | 0.7530 | 0.8053 | 0.7748 | 0.6391 | 0.6433 | 0.0 | 1.0 | 0.271 |
gbc | Gradient Boosting Classifier | 0.8195 | 0.8982 | 0.7562 | 0.7870 | 0.7656 | 0.6193 | 0.6260 | 0.0 | 1.0 | 0.263 |
lightgbm | Light Gradient Boosting Machine | 0.8047 | 0.8828 | 0.7492 | 0.7585 | 0.7482 | 0.5893 | 0.5950 | 0.0 | 1.0 | 0.128 |
ada | Ada Boost Classifier | 0.7968 | 0.8789 | 0.7326 | 0.7499 | 0.7388 | 0.5727 | 0.5751 | 0.0 | 1.0 | 0.178 |
rf | Random Forest Classifier | 0.7955 | 0.8731 | 0.7256 | 0.7500 | 0.7338 | 0.5682 | 0.5727 | 0.0 | 1.0 | 0.243 |
dt | Decision Tree Classifier | 0.7795 | 0.7711 | 0.7328 | 0.7168 | 0.7201 | 0.5389 | 0.5441 | 0.0 | 1.0 | 0.082 |
et | Extra Trees Classifier | 0.7714 | 0.8479 | 0.6951 | 0.7213 | 0.7038 | 0.5183 | 0.5225 | 0.0 | 1.0 | 0.214 |
nb | Naive Bayes | 0.7621 | 0.8255 | 0.7255 | 0.6825 | 0.7009 | 0.5039 | 0.5074 | 0.0 | 1.0 | 0.080 |
knn | K Neighbors Classifier | 0.7528 | 0.8053 | 0.6231 | 0.7208 | 0.6642 | 0.4703 | 0.4770 | 0.0 | 1.0 | 0.083 |
qda | Quadratic Discriminant Analysis | 0.6510 | 0.6349 | 0.4546 | 0.7617 | 0.4426 | 0.2377 | 0.3086 | 0.0 | 1.0 | 0.077 |
dummy | Dummy Classifier | 0.6096 | 0.5000 | 0.0000 | 0.0000 | 0.0000 | 0.0000 | 0.0000 | 0.0 | 1.0 | 0.072 |
svm | SVM - Linear Kernel | 0.5677 | 0.0000 | 0.2690 | 0.2077 | 0.1901 | 0.0290 | 0.0396 | 0.0 | 0.0 | 0.201 |
Processing: 0%| | 0/14 [00:00<?, ?it/s]
[RidgeClassifier(alpha=1.0, class_weight=None, copy_X=True, fit_intercept=True, max_iter=None, normalize='deprecated', positive=False, random_state=0, solver='auto', tol=0.001), LinearDiscriminantAnalysis(covariance_estimator=None, n_components=None, priors=None, shrinkage=None, solver='svd', store_covariance=False, tol=0.0001)]
You can add custom metrics like before. But in order to make the scorer distributable, it must be serializable. A common function should be fine, but if inside the function, it is using some global variables that are not serializable (for example an RLock
object), it can cause issues. So try to make the custom function independent from global variables.
def score_dummy(y_true, y_pred, axis=0):
return 0.0
add_metric(id = 'mydummy',
name = 'DUMMY',
score_func = score_dummy,
target = 'pred',
greater_is_better = False,
)
Name DUMMY Display Name DUMMY Score Function <function score_dummy at 0x7f8aa0dc0ca0> Scorer make_scorer(score_dummy, greater_is_better=False) Target pred Args {} Greater is Better False Multiclass True Custom True Name: mydummy, dtype: object
Adding a function in a class instance is also ok, but make sure all member variables in the class are serializable.
test_models = models().index.tolist()[:5]
compare_models(include=test_models, n_select=2, sort="DUMMY", parallel=FugueBackend("dask"))
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | DUMMY | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|---|
dt | Decision Tree Classifier | 0.7795 | 0.7711 | 0.7328 | 0.7168 | 0.7201 | 0.5389 | 0.5441 | 0.0 | 0.240 |
lr | Logistic Regression | 0.8303 | 0.8959 | 0.7530 | 0.8053 | 0.7748 | 0.6391 | 0.6433 | 0.0 | 0.306 |
nb | Naive Bayes | 0.7621 | 0.8255 | 0.7255 | 0.6825 | 0.7009 | 0.5039 | 0.5074 | 0.0 | 0.130 |
knn | K Neighbors Classifier | 0.7528 | 0.8053 | 0.6231 | 0.7208 | 0.6642 | 0.4703 | 0.4770 | 0.0 | 0.097 |
svm | SVM - Linear Kernel | 0.5677 | 0.0000 | 0.2690 | 0.2077 | 0.1901 | 0.0290 | 0.0396 | 0.0 | 0.102 |
[DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini', max_depth=None, max_features=None, max_leaf_nodes=None, min_impurity_decrease=0.0, min_samples_leaf=1, min_samples_split=2, min_weight_fraction_leaf=0.0, random_state=0, splitter='best'), LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1, l1_ratio=None, max_iter=1000, multi_class='auto', n_jobs=None, penalty='l2', random_state=0, solver='lbfgs', tol=0.0001, verbose=0, warm_start=False)]
pull()
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | DUMMY | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|---|
dt | Decision Tree Classifier | 0.7795 | 0.7711 | 0.7328 | 0.7168 | 0.7201 | 0.5389 | 0.5441 | 0.0 | 0.240 |
lr | Logistic Regression | 0.8303 | 0.8959 | 0.7530 | 0.8053 | 0.7748 | 0.6391 | 0.6433 | 0.0 | 0.306 |
nb | Naive Bayes | 0.7621 | 0.8255 | 0.7255 | 0.6825 | 0.7009 | 0.5039 | 0.5074 | 0.0 | 0.130 |
knn | K Neighbors Classifier | 0.7528 | 0.8053 | 0.6231 | 0.7208 | 0.6642 | 0.4703 | 0.4770 | 0.0 | 0.097 |
svm | SVM - Linear Kernel | 0.5677 | 0.0000 | 0.2690 | 0.2077 | 0.1901 | 0.0290 | 0.0396 | 0.0 | 0.102 |
class Scores:
def score_dummy2(self, y_true, y_prob, axis=0):
return 1.0
scores = Scores()
add_metric(id = 'mydummy2',
name = 'DUMMY2',
score_func = scores.score_dummy2,
target = 'pred_proba',
greater_is_better = True,
)
Name DUMMY2 Display Name DUMMY2 Score Function <bound method Scores.score_dummy2 of <__main__... Scorer make_scorer(score_dummy2, needs_proba=True, er... Target pred_proba Args {} Greater is Better True Multiclass True Custom True Name: mydummy2, dtype: object
compare_models(include=test_models, n_select=2, sort="DUMMY2", parallel=FugueBackend("dask"))
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | DUMMY | DUMMY2 | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|---|---|
dt | Decision Tree Classifier | 0.7795 | 0.7711 | 0.7328 | 0.7168 | 0.7201 | 0.5389 | 0.5441 | 0.0 | 1.0 | 0.237 |
lr | Logistic Regression | 0.8303 | 0.8959 | 0.7530 | 0.8053 | 0.7748 | 0.6391 | 0.6433 | 0.0 | 1.0 | 0.399 |
nb | Naive Bayes | 0.7621 | 0.8255 | 0.7255 | 0.6825 | 0.7009 | 0.5039 | 0.5074 | 0.0 | 1.0 | 0.077 |
knn | K Neighbors Classifier | 0.7528 | 0.8053 | 0.6231 | 0.7208 | 0.6642 | 0.4703 | 0.4770 | 0.0 | 1.0 | 0.082 |
svm | SVM - Linear Kernel | 0.5677 | 0.0000 | 0.2690 | 0.2077 | 0.1901 | 0.0290 | 0.0396 | 0.0 | 0.0 | 0.104 |
[DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini', max_depth=None, max_features=None, max_leaf_nodes=None, min_impurity_decrease=0.0, min_samples_leaf=1, min_samples_split=2, min_weight_fraction_leaf=0.0, random_state=0, splitter='best'), LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1, l1_ratio=None, max_iter=1000, multi_class='auto', n_jobs=None, penalty='l2', random_state=0, solver='lbfgs', tol=0.0001, verbose=0, warm_start=False)]
pull()
Model | Accuracy | AUC | Recall | Prec. | F1 | Kappa | MCC | DUMMY | DUMMY2 | TT (Sec) | |
---|---|---|---|---|---|---|---|---|---|---|---|
dt | Decision Tree Classifier | 0.7795 | 0.7711 | 0.7328 | 0.7168 | 0.7201 | 0.5389 | 0.5441 | 0.0 | 1.0 | 0.237 |
lr | Logistic Regression | 0.8303 | 0.8959 | 0.7530 | 0.8053 | 0.7748 | 0.6391 | 0.6433 | 0.0 | 1.0 | 0.399 |
nb | Naive Bayes | 0.7621 | 0.8255 | 0.7255 | 0.6825 | 0.7009 | 0.5039 | 0.5074 | 0.0 | 1.0 | 0.077 |
knn | K Neighbors Classifier | 0.7528 | 0.8053 | 0.6231 | 0.7208 | 0.6642 | 0.4703 | 0.4770 | 0.0 | 1.0 | 0.082 |
svm | SVM - Linear Kernel | 0.5677 | 0.0000 | 0.2690 | 0.2077 | 0.1901 | 0.0290 | 0.0396 | 0.0 | 0.0 | 0.104 |
It is highly recommended to have only 1 worker on each Spark executor, so the worker can fully utilize all cpus (set spark.task.cpus
). Also when you do this you should explicitly set n_jobs
in setup
to the number of cpus of each executor.
executor_cores = 4
spark = SparkSession.builder.config("spark.task.cpus", executor_cores).config("spark.executor.cores", executor_cores).getOrCreate()
setup(data=get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=executor_cores)
compare_models(n_select=2, parallel=FugueBackend(spark))
On Databricks, spark
is the magic variable representing a SparkSession. But there is no difference to use. You do the exactly same thing as before:
compare_models(parallel=FugueBackend(spark))
But Databricks, the visualization is difficult, so it may be a good idea to do two things:
verbose
to False in setup
display_remote
to False in FugueBackend
Dask has fake distributed modes such as the default (multi-thread) and multi-process modes. The default mode will just work fine (but they are actually running sequentially), and multi-process doesn't work for PyCaret for now because it messes up with PyCaret's global variables. On the other hand, any Spark execution mode will just work fine.
For practical use where you try non-trivial data and models, local parallelization (The eaiest way is to use local Dask as backend as shown above) normally doesn't have performance advantage. Because it's very easy to overload the CPUS on training, increasing the contention of resources. The value of local parallelization is to verify the code and give you confidence that the distributed environment will provide the expected result with much shorter time.
Distributed systems are powerful but you must follow some good practices to use them:
compare_model
limit the models you want to try to a small number of cheap models, and when you verify they work, you can change to a larger model collection.parallel=None
-> parallel=FugueBackend()
-> parallel=FugueBackend(spark)
. In the second step, you can replace with a local SparkSession or local dask.