# !pip install feast==0.20.1 ray[air]>=1.13 xgboost_ray
In this example, we showcase how to use Ray AIR with Feast feature store, leveraging both historical features for training a model and online features for inference.
The task is adapted from Feast credit scoring tutorial. In this example, we train a xgboost model and run some prediction on an incoming loan request to see if it is approved or rejected.
Let's first set up our workspace and prepare the data to work with.
! wget --no-check-certificate https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip
! unzip air-feast-example.zip
%cd air-feast-example
--2022-09-12 19:24:21-- https://github.com/ray-project/air-sample-data/raw/main/air-feast-example.zip Loaded CA certificate '/etc/ssl/certs/ca-certificates.crt' Resolving github.com (github.com)... 192.30.255.113 Connecting to github.com (github.com)|192.30.255.113|:443... connected. HTTP request sent, awaiting response... 302 Found Location: https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip [following] --2022-09-12 19:24:21-- https://raw.githubusercontent.com/ray-project/air-sample-data/main/air-feast-example.zip Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ... Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 23715107 (23M) [application/zip] Saving to: ‘air-feast-example.zip’ air-feast-example.z 100%[===================>] 22.62M 8.79MB/s in 2.6s 2022-09-12 19:24:25 (8.79 MB/s) - ‘air-feast-example.zip’ saved [23715107/23715107] Archive: air-feast-example.zip creating: air-feast-example/ creating: air-feast-example/feature_repo/ inflating: air-feast-example/feature_repo/.DS_Store extracting: air-feast-example/feature_repo/__init__.py inflating: air-feast-example/feature_repo/features.py creating: air-feast-example/feature_repo/data/ inflating: air-feast-example/feature_repo/data/.DS_Store inflating: air-feast-example/feature_repo/data/credit_history_sample.csv inflating: air-feast-example/feature_repo/data/zipcode_table_sample.csv inflating: air-feast-example/feature_repo/data/credit_history.parquet inflating: air-feast-example/feature_repo/data/zipcode_table.parquet inflating: air-feast-example/feature_repo/feature_store.yaml inflating: air-feast-example/.DS_Store creating: air-feast-example/data/ inflating: air-feast-example/data/loan_table.parquet inflating: air-feast-example/data/loan_table_sample.csv /home/ray/Desktop/workspace/ray/doc/source/ray-air/examples/air-feast-example
! ls
data feature_repo
There is already a feature repository set up in feature_repo/
. It isn't necessary to create a new feature repository, but it can be done using the following command: feast init -t local feature_repo
.
Now let's take a look at the schema in Feast feature store, which is defined by feature_repo/features.py
. There are mainly two features: zipcode_feature and credit_history, both are generated from parquet files - feature_repo/data/zipcode_table.parquet
and feature_repo/data/credit_history.parquet
.
!pygmentize feature_repo/features.py
from datetime import timedelta from feast import (Entity, Field, FeatureView, FileSource, ValueType) from feast.types import Float32, Int64, String zipcode = Entity(name="zipcode", value_type=Int64) zipcode_source = FileSource( path="feature_repo/data/zipcode_table.parquet", timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) zipcode_features = FeatureView( name="zipcode_features", entities=["zipcode"], ttl=timedelta(days=3650), schema=[ Field(name="city", dtype=String), Field(name="state", dtype=String), Field(name="location_type", dtype=String), Field(name="tax_returns_filed", dtype=Int64), Field(name="population", dtype=Int64), Field(name="total_wages", dtype=Int64), ], source=zipcode_source, ) dob_ssn = Entity( name="dob_ssn", value_type=ValueType.STRING, description="Date of birth and last four digits of social security number", ) credit_history_source = FileSource( path="feature_repo/data/credit_history.parquet", timestamp_field="event_timestamp", created_timestamp_column="created_timestamp", ) credit_history = FeatureView( name="credit_history", entities=["dob_ssn"], ttl=timedelta(days=90), schema=[ Field(name="credit_card_due", dtype=Int64), Field(name="mortgage_due", dtype=Int64), Field(name="student_loan_due", dtype=Int64), Field(name="vehicle_loan_due", dtype=Int64), Field(name="hard_pulls", dtype=Int64), Field(name="missed_payments_2y", dtype=Int64), Field(name="missed_payments_1y", dtype=Int64), Field(name="missed_payments_6m", dtype=Int64), Field(name="bankruptcies", dtype=Int64), ], source=credit_history_source, )
Deploy the above defined feature store by running apply
from within the feature_repo/ folder.
! cd feature_repo && feast apply
Created entity zipcode Created entity dob_ssn Created feature view credit_history Created feature view zipcode_features Created sqlite table feature_repo_credit_history Created sqlite table feature_repo_zipcode_features
import feast
fs = feast.FeatureStore(repo_path="feature_repo")
On top of the features in Feast, we also have labeled training data at data/loan_table.parquet
. At the time of training, loan table will be passed into Feast as an entity dataframe for training data generation. Feast will intelligently join credit_history and zipcode_feature tables to create relevant feature vectors to augment the training data.
import pandas as pd
loan_df = pd.read_parquet("data/loan_table.parquet")
display(loan_df)
loan_id | dob_ssn | zipcode | person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | loan_status | event_timestamp | created_timestamp | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 10000 | 19530219_5179 | 76104 | 22 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | 1 | 2021-08-25 20:34:41.361000+00:00 | 2021-08-25 20:34:41.361000+00:00 |
1 | 10001 | 19520816_8737 | 70380 | 21 | 9600 | OWN | 5.0 | EDUCATION | 1000 | 11.14 | 0 | 2021-08-25 20:16:20.128000+00:00 | 2021-08-25 20:16:20.128000+00:00 |
2 | 10002 | 19860413_2537 | 97039 | 25 | 9600 | MORTGAGE | 1.0 | MEDICAL | 5500 | 12.87 | 1 | 2021-08-25 19:57:58.896000+00:00 | 2021-08-25 19:57:58.896000+00:00 |
3 | 10003 | 19760701_8090 | 63785 | 23 | 65500 | RENT | 4.0 | MEDICAL | 35000 | 15.23 | 1 | 2021-08-25 19:39:37.663000+00:00 | 2021-08-25 19:39:37.663000+00:00 |
4 | 10004 | 19830125_8297 | 82223 | 24 | 54400 | RENT | 8.0 | MEDICAL | 35000 | 14.27 | 1 | 2021-08-25 19:21:16.430000+00:00 | 2021-08-25 19:21:16.430000+00:00 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
28633 | 38633 | 19491126_1487 | 43205 | 57 | 53000 | MORTGAGE | 1.0 | PERSONAL | 5800 | 13.16 | 0 | 2020-08-25 21:48:06.292000+00:00 | 2020-08-25 21:48:06.292000+00:00 |
28634 | 38634 | 19681208_6537 | 24872 | 54 | 120000 | MORTGAGE | 4.0 | PERSONAL | 17625 | 7.49 | 0 | 2020-08-25 21:29:45.059000+00:00 | 2020-08-25 21:29:45.059000+00:00 |
28635 | 38635 | 19880422_2592 | 68826 | 65 | 76000 | RENT | 3.0 | HOMEIMPROVEMENT | 35000 | 10.99 | 1 | 2020-08-25 21:11:23.826000+00:00 | 2020-08-25 21:11:23.826000+00:00 |
28636 | 38636 | 19901017_6108 | 92014 | 56 | 150000 | MORTGAGE | 5.0 | PERSONAL | 15000 | 11.48 | 0 | 2020-08-25 20:53:02.594000+00:00 | 2020-08-25 20:53:02.594000+00:00 |
28637 | 38637 | 19960703_3449 | 69033 | 66 | 42000 | RENT | 2.0 | MEDICAL | 6475 | 9.99 | 0 | 2020-08-25 20:34:41.361000+00:00 | 2020-08-25 20:34:41.361000+00:00 |
28638 rows × 13 columns
feast_features = [
"zipcode_features:city",
"zipcode_features:state",
"zipcode_features:location_type",
"zipcode_features:tax_returns_filed",
"zipcode_features:population",
"zipcode_features:total_wages",
"credit_history:credit_card_due",
"credit_history:mortgage_due",
"credit_history:student_loan_due",
"credit_history:vehicle_loan_due",
"credit_history:hard_pulls",
"credit_history:missed_payments_2y",
"credit_history:missed_payments_1y",
"credit_history:missed_payments_6m",
"credit_history:bankruptcies",
]
loan_w_offline_feature = fs.get_historical_features(
entity_df=loan_df, features=feast_features
).to_df()
# Drop some unnecessary columns for simplicity
loan_w_offline_feature = loan_w_offline_feature.drop(["event_timestamp", "created_timestamp__", "loan_id", "zipcode", "dob_ssn"], axis=1)
Now let's take a look at the training data as it is augmented by Feast.
display(loan_w_offline_feature)
person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | loan_status | city | state | ... | total_wages | credit_card_due | mortgage_due | student_loan_due | vehicle_loan_due | hard_pulls | missed_payments_2y | missed_payments_1y | missed_payments_6m | bankruptcies | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1358886 | 55 | 24543 | RENT | 3.0 | VENTURE | 4000 | 13.92 | 0 | SLIDELL | LA | ... | 315061217 | 1777 | 690650 | 46372 | 10439 | 5 | 1 | 2 | 1 | 0 |
1358815 | 58 | 20000 | RENT | 0.0 | EDUCATION | 4000 | 9.99 | 0 | CHOUTEAU | OK | ... | 59412230 | 1791 | 462670 | 19421 | 3583 | 8 | 7 | 1 | 0 | 2 |
1353348 | 64 | 24000 | RENT | 1.0 | MEDICAL | 3000 | 6.99 | 0 | BISMARCK | ND | ... | 469621263 | 5917 | 1780959 | 11835 | 27910 | 8 | 3 | 2 | 1 | 0 |
1354200 | 55 | 34000 | RENT | 0.0 | DEBTCONSOLIDATION | 12000 | 6.92 | 1 | SANTA BARBARA | CA | ... | 24537583 | 8091 | 364271 | 30248 | 22640 | 2 | 7 | 3 | 0 | 0 |
1354271 | 51 | 74628 | MORTGAGE | 3.0 | PERSONAL | 3000 | 13.49 | 0 | HUNTINGTON BEACH | CA | ... | 19749601 | 3679 | 1659968 | 37582 | 20284 | 0 | 1 | 0 | 0 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
674285 | 23 | 74000 | RENT | 3.0 | MEDICAL | 25000 | 10.36 | 1 | MANSFIELD | MO | ... | 33180988 | 5176 | 1089963 | 44642 | 2877 | 1 | 6 | 1 | 0 | 0 |
668250 | 21 | 200000 | MORTGAGE | 2.0 | DEBTCONSOLIDATION | 25000 | 13.99 | 0 | SALISBURY | MD | ... | 470634058 | 5297 | 1288915 | 22471 | 22630 | 0 | 5 | 2 | 1 | 0 |
668321 | 24 | 200000 | MORTGAGE | 3.0 | VENTURE | 24000 | 7.49 | 0 | STRUNK | KY | ... | 10067358 | 6549 | 22399 | 11806 | 13005 | 0 | 1 | 0 | 0 | 0 |
670025 | 23 | 215000 | MORTGAGE | 7.0 | MEDICAL | 35000 | 14.79 | 0 | HAWTHORN | PA | ... | 5956835 | 9079 | 876038 | 4556 | 21588 | 0 | 1 | 0 | 0 | 0 |
2034006 | 22 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | 1 | FORT WORTH | TX | ... | 142325465 | 8419 | 91803 | 22328 | 15078 | 0 | 1 | 0 | 0 | 0 |
28638 rows × 23 columns
# Convert into Train and Validation datasets.
import ray
loan_ds = ray.data.from_pandas(loan_w_offline_feature)
train_ds, validation_ds = loan_ds.split_proportionately([0.8])
2022-09-12 19:25:14,018 INFO worker.py:1508 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Preprocessor does last mile processing on Ray Data before feeding into training model.
categorical_features = [
"person_home_ownership",
"loan_intent",
"city",
"state",
"location_type",
]
from ray.data.preprocessors import Chain, OrdinalEncoder, SimpleImputer
imputer = SimpleImputer(categorical_features, strategy="most_frequent")
encoder = OrdinalEncoder(columns=categorical_features)
chained_preprocessor = Chain(imputer, encoder)
Ray AIR provides a variety of Trainers that are integrated with popular machine learning frameworks. You can train a distributed model at scale leveraging Ray using the intuitive API trainer.fit()
. The output is a Ray AIR Checkpoint, that will seamlessly transfer the workload from training to prediction. Let's take a look!
LABEL = "loan_status"
CHECKPOINT_PATH = "checkpoint"
NUM_WORKERS = 1 # Change this based on the resources in the cluster.
from ray.train.xgboost import XGBoostTrainer
from ray.train import ScalingConfig
params = {
"tree_method": "approx",
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
num_workers=NUM_WORKERS,
use_gpu=0,
),
label_column=LABEL,
params=params,
datasets={"train": train_ds, "validation": validation_ds},
preprocessor=chained_preprocessor,
num_boost_round=100,
)
checkpoint = trainer.fit().checkpoint
# This saves the checkpoint onto disk
checkpoint.to_directory(CHECKPOINT_PATH)
Current time: | 2022-09-12 19:25:28 |
Running for: | 00:00:09.09 |
Memory: | 12.3/62.7 GiB |
Trial name | status | loc | iter | total time (s) | train-logloss | train-error | validation-logloss |
---|---|---|---|---|---|---|---|
XGBoostTrainer_4f411_00000 | TERMINATED | 10.108.96.251:348845 | 101 | 7.67137 | 0.0578837 | 0.0127019 | 0.225994 |
(XGBoostTrainer pid=348845) /home/ray/.pyenv/versions/mambaforge/envs/ray/lib/python3.9/site-packages/xgboost_ray/main.py:431: UserWarning: `num_actors` in `ray_params` is smaller than 2 (1). XGBoost will NOT be distributed! (XGBoostTrainer pid=348845) warnings.warn( (_RemoteRayXGBoostActor pid=348922) [19:25:23] task [xgboost.ray]:140319682474864 got new rank 0
Trial name | date | done | episodes_total | experiment_id | experiment_tag | hostname | iterations_since_restore | node_ip | pid | time_since_restore | time_this_iter_s | time_total_s | timestamp | timesteps_since_restore | timesteps_total | train-error | train-logloss | training_iteration | trial_id | validation-error | validation-logloss | warmup_time |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
XGBoostTrainer_4f411_00000 | 2022-09-12_19-25-28 | True | 83cacc5068a84efc8998c269bc054088 | 0 | corvus | 101 | 10.108.96.251 | 348845 | 7.67137 | 1.01445 | 7.67137 | 1663035928 | 0 | 0.0127019 | 0.0578837 | 101 | 4f411_00000 | 0.0825768 | 0.225994 | 0.00293422 |
2022-09-12 19:25:28,422 INFO tune.py:762 -- Total run time: 9.86 seconds (9.09 seconds for the tuning loop).
'checkpoint'
from ray.train import Checkpoint
from ray.train.xgboost import XGBoostPredictor
predictor = XGBoostPredictor.from_checkpoint(Checkpoint.from_directory(CHECKPOINT_PATH))
import numpy as np
## Now let's do some prediciton.
loan_request_dict = {
"zipcode": [76104],
"dob_ssn": ["19630621_4278"],
"person_age": [133],
"person_income": [59000],
"person_home_ownership": ["RENT"],
"person_emp_length": [123.0],
"loan_intent": ["PERSONAL"],
"loan_amnt": [35000],
"loan_int_rate": [16.02],
}
# Now augment the request with online features.
zipcode = loan_request_dict["zipcode"][0]
dob_ssn = loan_request_dict["dob_ssn"][0]
online_features = fs.get_online_features(
entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
features=feast_features,
).to_dict()
loan_request_dict.update(online_features)
loan_request_df = pd.DataFrame.from_dict(loan_request_dict)
loan_request_df = loan_request_df.drop(["zipcode", "dob_ssn"], axis=1)
display(loan_request_df)
person_age | person_income | person_home_ownership | person_emp_length | loan_intent | loan_amnt | loan_int_rate | location_type | city | population | ... | total_wages | hard_pulls | bankruptcies | missed_payments_1y | mortgage_due | credit_card_due | missed_payments_2y | missed_payments_6m | student_loan_due | vehicle_loan_due | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 133 | 59000 | RENT | 123.0 | PERSONAL | 35000 | 16.02 | None | None | None | ... | None | None | None | None | None | None | None | None | None | None |
1 rows × 22 columns
# Run through our predictor using `Predictor.predict()` API.
loan_result = np.round(predictor.predict(loan_request_df)["predictions"][0])
if loan_result == 0:
print("Loan approved!")
elif loan_result == 1:
print("Loan rejected!")
Loan rejected!