In this example we will train a model in Ray AIR using a Sklearn classifier.
Let's start with installing our dependencies:
!pip install -qU "ray[tune]" sklearn
Then we need some imports:
from typing import Tuple
import ray
from ray.data import Dataset
from ray.train.batch_predictor import BatchPredictor
from ray.train.sklearn import SklearnPredictor
from ray.data.preprocessors import Chain, OrdinalEncoder, StandardScaler
from ray.air.result import Result
from ray.train.sklearn import SklearnTrainer
from ray.air.config import ScalingConfig
from sklearn.ensemble import RandomForestClassifier
try:
from cuml.ensemble import RandomForestClassifier as cuMLRandomForestClassifier
except ImportError:
cuMLRandomForestClassifier = None
Next we define a function to load our train, validation, and test datasets.
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer_with_categorical.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
test_dataset = valid_dataset.drop_columns(["target"])
return train_dataset, valid_dataset, test_dataset
The following function will create a Sklearn trainer, train it, and return the result.
def train_sklearn(num_cpus: int, use_gpu: bool = False) -> Result:
if use_gpu and not cuMLRandomForestClassifier:
raise RuntimeError("cuML must be installed for GPU enabled sklearn estimators.")
train_dataset, valid_dataset, _ = prepare_data()
# Scale some random columns
columns_to_scale = ["mean radius", "mean texture"]
preprocessor = Chain(
OrdinalEncoder(["categorical_column"]), StandardScaler(columns=columns_to_scale)
)
if use_gpu:
trainer_resources = {"CPU": 1, "GPU": 1}
estimator = cuMLRandomForestClassifier()
else:
trainer_resources = {"CPU": num_cpus}
estimator = RandomForestClassifier()
trainer = SklearnTrainer(
estimator=estimator,
label_column="target",
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
cv=5,
scaling_config=ScalingConfig(trainer_resources=trainer_resources),
)
result = trainer.fit()
print(result.metrics)
return result
Once we have the result, we can do batch inference on the obtained model. Let's define a utility function for this.
def predict_sklearn(result: Result, use_gpu: bool = False):
_, _, test_dataset = prepare_data()
batch_predictor = BatchPredictor.from_checkpoint(
result.checkpoint, SklearnPredictor
)
predicted_labels = (
batch_predictor.predict(
test_dataset,
num_gpus_per_worker=int(use_gpu),
)
.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
)
print(f"PREDICTED LABELS")
predicted_labels.show()
Now we can run the training:
result = train_sklearn(num_cpus=2, use_gpu=False)
2022-06-22 17:27:37,741 INFO services.py:1477 -- View the Ray dashboard at http://127.0.0.1:8269
2022-06-22 17:27:39,822 WARNING read_api.py:260 -- The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Map_Batches: 100%|██████████| 1/1 [00:00<00:00, 44.05it/s]
Trial name | status | loc | iter | total time (s) | fit_time |
---|---|---|---|---|---|
SklearnTrainer_9dec8_00000 | TERMINATED | 172.31.43.110:1492629 | 1 | 15.6842 | 2.31571 |
(SklearnTrainer pid=1492629) 2022-06-22 17:27:45,647 WARNING pool.py:591 -- The 'context' argument is not supported using ray. Please refer to the documentation for how to control ray initialization.
Result for SklearnTrainer_9dec8_00000: cv: fit_time: - 2.221003770828247 - 2.215489387512207 - 2.2075674533843994 - 2.222351312637329 - 2.312389612197876 fit_time_mean: 2.235760307312012 fit_time_std: 0.03866614559685742 score_time: - 0.022464990615844727 - 0.0230865478515625 - 0.02564835548400879 - 0.029137849807739258 - 0.021221637725830078 score_time_mean: 0.02431187629699707 score_time_std: 0.0028120522003997595 test_score: - 0.9625 - 0.9125 - 0.9875 - 1.0 - 0.9367088607594937 test_score_mean: 0.9598417721518986 test_score_std: 0.032128186960552516 date: 2022-06-22_17-27-59 done: false experiment_id: f8215019c10e4a81ba2187c38e875365 fit_time: 2.3157050609588623 hostname: ip-172-31-43-110 iterations_since_restore: 1 node_ip: 172.31.43.110 pid: 1492629 should_checkpoint: true time_since_restore: 15.684244871139526 time_this_iter_s: 15.684244871139526 time_total_s: 15.684244871139526 timestamp: 1655918879 timesteps_since_restore: 0 training_iteration: 1 trial_id: 9dec8_00000 valid: score_time: 0.03549623489379883 test_score: 0.9532163742690059 warmup_time: 0.0057866573333740234 Result for SklearnTrainer_9dec8_00000: cv: fit_time: - 2.221003770828247 - 2.215489387512207 - 2.2075674533843994 - 2.222351312637329 - 2.312389612197876 fit_time_mean: 2.235760307312012 fit_time_std: 0.03866614559685742 score_time: - 0.022464990615844727 - 0.0230865478515625 - 0.02564835548400879 - 0.029137849807739258 - 0.021221637725830078 score_time_mean: 0.02431187629699707 score_time_std: 0.0028120522003997595 test_score: - 0.9625 - 0.9125 - 0.9875 - 1.0 - 0.9367088607594937 test_score_mean: 0.9598417721518986 test_score_std: 0.032128186960552516 date: 2022-06-22_17-27-59 done: true experiment_id: f8215019c10e4a81ba2187c38e875365 experiment_tag: '0' fit_time: 2.3157050609588623 hostname: ip-172-31-43-110 iterations_since_restore: 1 node_ip: 172.31.43.110 pid: 1492629 should_checkpoint: true time_since_restore: 15.684244871139526 time_this_iter_s: 15.684244871139526 time_total_s: 15.684244871139526 timestamp: 1655918879 timesteps_since_restore: 0 training_iteration: 1 trial_id: 9dec8_00000 valid: score_time: 0.03549623489379883 test_score: 0.9532163742690059 warmup_time: 0.0057866573333740234
2022-06-22 17:27:59,333 INFO tune.py:734 -- Total run time: 19.09 seconds (18.31 seconds for the tuning loop).
{'valid': {'score_time': 0.03549623489379883, 'test_score': 0.9532163742690059}, 'cv': {'fit_time': array([2.22100377, 2.21548939, 2.20756745, 2.22235131, 2.31238961]), 'score_time': array([0.02246499, 0.02308655, 0.02564836, 0.02913785, 0.02122164]), 'test_score': array([0.9625 , 0.9125 , 0.9875 , 1. , 0.93670886]), 'fit_time_mean': 2.235760307312012, 'fit_time_std': 0.03866614559685742, 'score_time_mean': 0.02431187629699707, 'score_time_std': 0.0028120522003997595, 'test_score_mean': 0.9598417721518986, 'test_score_std': 0.032128186960552516}, 'fit_time': 2.3157050609588623, 'time_this_iter_s': 15.684244871139526, 'should_checkpoint': True, 'done': True, 'timesteps_total': None, 'episodes_total': None, 'training_iteration': 1, 'trial_id': '9dec8_00000', 'experiment_id': 'f8215019c10e4a81ba2187c38e875365', 'date': '2022-06-22_17-27-59', 'timestamp': 1655918879, 'time_total_s': 15.684244871139526, 'pid': 1492629, 'hostname': 'ip-172-31-43-110', 'node_ip': '172.31.43.110', 'config': {}, 'time_since_restore': 15.684244871139526, 'timesteps_since_restore': 0, 'iterations_since_restore': 1, 'warmup_time': 0.0057866573333740234, 'experiment_tag': '0'}
And perform inference on the obtained model:
predict_sklearn(result, use_gpu=False)
2022-06-22 17:27:59,658 WARNING read_api.py:260 -- The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks. Map_Batches: 100%|██████████| 1/1 [00:00<00:00, 64.73it/s] Map Progress (1 actors 1 pending): 100%|██████████| 1/1 [00:01<00:00, 1.60s/it] Map_Batches: 100%|██████████| 1/1 [00:00<00:00, 71.41it/s]
PREDICTED LABELS {'predictions': 1} {'predictions': 1} {'predictions': 0} {'predictions': 1} {'predictions': 1} {'predictions': 1} {'predictions': 1} {'predictions': 1} {'predictions': 0} {'predictions': 1} {'predictions': 0} {'predictions': 1} {'predictions': 1} {'predictions': 1} {'predictions': 1} {'predictions': 0} {'predictions': 1} {'predictions': 1} {'predictions': 1} {'predictions': 0}