This notebook illustrates the use of Feature Store to create a model that predicts the fraud status of transactions based on the user account data and trasaction data. The main focus of this notebook is to depict:
FeatureQuery
.The sample fraud transaction datasets that are used in the notebook can be found here: https://github.com/microsoft/r-server-fraud-detection.
The outline of the notebook is as follows:
Prior to running the notebook, if you have not deployed all the required resources, please refer to the guide here and follow the steps to do so: https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html
To run the cells below, you need additional permissions for your managed identity to access the keyvault and the Storage Account. You may run the following lines of command in the Cloud Shell in order to grant yourself the access.
userId=<email_id_of_account_requesting_access>
resource_prefix=<resource_prefix>
synapse_workspace_name="${resource_prefix}syws"
keyvault_name="${resource_prefix}kv"
objectId=$(az ad user show --id $userId --query id -o tsv)
az keyvault update --name $keyvault_name --enable-rbac-authorization false
az keyvault set-policy -n $keyvault_name --secret-permissions get list --object-id $objectId
az role assignment create --assignee $userId --role "Storage Blob Data Contributor"
az synapse role assignment create --workspace-name $synapse_workspace_name --role "Synapse Contributor" --assignee $userId
Uncomment following cell and run it to install Feathr python package and necessary dependencies.
# Install feathr from the latest codes in the repo. You may use `pip install feathr[notebook]` as well.
# %pip install "git+https://github.com/feathr-ai/feathr.git#subdirectory=feathr_project&egg=feathr[notebook]"
from datetime import datetime, timedelta
import os
from pathlib import Path
import numpy as np
import pandas as pd
import feathr
from feathr import (
FeathrClient,
STRING, BOOLEAN, FLOAT, INT32, ValueType,
Feature, DerivedFeature, FeatureAnchor,
BackfillTime, MaterializationSettings,
FeatureQuery, ObservationSettings,
RedisSink,
HdfsSource,
WindowAggTransformation,
TypedKey,
)
from feathr.datasets.constants import (
FRAUD_DETECTION_ACCOUNT_INFO_URL,
FRAUD_DETECTION_FRAUD_TRANSACTIONS_URL,
FRAUD_DETECTION_UNTAGGED_TRANSACTIONS_URL,
)
from feathr.datasets.utils import maybe_download
from feathr.utils.config import generate_config
from feathr.utils.job_utils import get_result_df
from feathr.utils.platform import is_databricks
print(f"Feathr version: {feathr.__version__}")
RESOURCE_PREFIX = "" # TODO fill the value used to deploy the resources via ARM template
PROJECT_NAME = "fraud_detection"
# Currently support: 'azure_synapse', 'databricks', and 'local'
SPARK_CLUSTER = "local"
# TODO fill values to use databricks cluster:
DATABRICKS_CLUSTER_ID = None # Set Databricks cluster id to use an existing cluster
if is_databricks():
# If this notebook is running on Databricks, its context can be used to retrieve token and instance URL
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
DATABRICKS_WORKSPACE_TOKEN_VALUE = ctx.apiToken().get()
SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f"https://{ctx.tags().get('browserHostName').get()}"
else:
DATABRICKS_WORKSPACE_TOKEN_VALUE = None # Set Databricks workspace token to use databricks
SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = None # Set Databricks workspace url to use databricks
# TODO fill values to use Azure Synapse cluster:
AZURE_SYNAPSE_SPARK_POOL = None # Set Azure Synapse Spark pool name
AZURE_SYNAPSE_URL = None # Set Azure Synapse workspace url to use Azure Synapse
ADLS_KEY = None # Set Azure Data Lake Storage key to use Azure Synapse
USE_CLI_AUTH = False # Set to True to use CLI authentication
# An existing Feathr config file path. If None, we'll generate a new config based on the constants in this cell.
FEATHR_CONFIG_PATH = None
# (For the notebook test pipeline) If true, use ScrapBook package to collect the results.
SCRAP_RESULTS = False
if SPARK_CLUSTER == "azure_synapse" and not os.environ.get("ADLS_KEY"):
os.environ["ADLS_KEY"] = ADLS_KEY
elif SPARK_CLUSTER == "databricks" and not os.environ.get("DATABRICKS_WORKSPACE_TOKEN_VALUE"):
os.environ["DATABRICKS_WORKSPACE_TOKEN_VALUE"] = DATABRICKS_WORKSPACE_TOKEN_VALUE
# Get an authentication credential to access Azure resources and register features
if USE_CLI_AUTH:
# Use AZ CLI interactive browser authentication
!az login --use-device-code
from azure.identity import AzureCliCredential
credential = AzureCliCredential(additionally_allowed_tenants=['*'],)
elif "AZURE_TENANT_ID" in os.environ and "AZURE_CLIENT_ID" in os.environ and "AZURE_CLIENT_SECRET" in os.environ:
# Use Environment variable secret
from azure.identity import EnvironmentCredential
credential = EnvironmentCredential()
else:
# Try to use the default credential
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential(
exclude_interactive_browser_credential=False,
additionally_allowed_tenants=['*'],
)
# Redis password
if 'REDIS_PASSWORD' not in os.environ:
from azure.keyvault.secrets import SecretClient
vault_url = f"https://{RESOURCE_PREFIX}kv.vault.azure.net"
secret_client = SecretClient(vault_url=vault_url, credential=credential)
retrieved_secret = secret_client.get_secret('FEATHR-ONLINE-STORE-CONN').value
os.environ['REDIS_PASSWORD'] = retrieved_secret.split(",")[1].split("password=", 1)[1]
if FEATHR_CONFIG_PATH:
config_path = FEATHR_CONFIG_PATH
else:
config_path = generate_config(
resource_prefix=RESOURCE_PREFIX,
project_name=PROJECT_NAME,
spark_config__spark_cluster=SPARK_CLUSTER,
spark_config__azure_synapse__dev_url=AZURE_SYNAPSE_URL,
spark_config__azure_synapse__pool_name=AZURE_SYNAPSE_SPARK_POOL,
spark_config__databricks__workspace_instance_url=SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL,
databricks_cluster_id=DATABRICKS_CLUSTER_ID,
)
with open(config_path, 'r') as f:
print(f.read())
client = FeathrClient(config_path=config_path, credential=credential)
We prepare the fraud detection dataset as follows:
is_fraud = 0
for non fraud, 1
for fraud.To learn more about the fraud detection scenario as well as the dataset source we use and the method we tag the transactions, please see here.
# Use dbfs if the notebook is running on Databricks
if is_databricks():
WORKING_DIR = f"/dbfs/{PROJECT_NAME}"
else:
WORKING_DIR = PROJECT_NAME
# Download datasets
account_info_file_path = f"{WORKING_DIR}/account_info.csv"
fraud_transactions_file_path = f"{WORKING_DIR}/fraud_transactions.csv"
obs_transactions_file_path = f"{WORKING_DIR}/obs_transactions.csv"
maybe_download(
src_url=FRAUD_DETECTION_ACCOUNT_INFO_URL,
dst_filepath=account_info_file_path,
)
maybe_download(
src_url=FRAUD_DETECTION_FRAUD_TRANSACTIONS_URL,
dst_filepath=fraud_transactions_file_path,
)
maybe_download(
src_url=FRAUD_DETECTION_UNTAGGED_TRANSACTIONS_URL,
dst_filepath=obs_transactions_file_path,
)
# Load datasets
fraud_df = pd.read_csv(fraud_transactions_file_path)
obs_df = pd.read_csv(obs_transactions_file_path)
# Combine transactionDate and transactionTime into one column. E.g. "20130903", "013641" -> "20130903 013641"
fraud_df["timestamp"] = fraud_df["transactionDate"].astype(str) + " " + fraud_df["transactionTime"].astype(str).str.zfill(6)
obs_df["timestamp"] = obs_df["transactionDate"].astype(str) + " " + obs_df["transactionTime"].astype(str).str.zfill(6)
In this step, we compute the timestamp range that the frauds were happened by referencing the transaction-level fraud data.
We create the labels is_fraud
to the untagged transaction data based on that.
# For each user in the fraud transaction data, get the timestamp range that the fraud transactions were happened.
fraud_labels_df = fraud_df.groupby("accountID").agg({"timestamp": ['min', 'max']})
fraud_labels_df.columns = ["_".join(col) for col in fraud_labels_df.columns.values]
fraud_labels_df.head()
# Combine fraud and untagged transaction data to generate the tagged transaction data.
transactions_df = pd.concat([fraud_df, obs_df], ignore_index=True).merge(
fraud_labels_df,
on="accountID",
how="outer",
)
# Data cleaning
transactions_df.dropna(
subset=[
"accountID",
"transactionID",
"transactionAmount",
"localHour",
"timestamp",
],
inplace=True,
)
transactions_df.sort_values("timestamp", inplace=True)
transactions_df.drop_duplicates(inplace=True)
# is_fraud = 0 if the transaction is not fraud. Otherwise (if it is a fraud), is_fraud = 1.
transactions_df["is_fraud"] = np.logical_and(
transactions_df["timestamp_min"] <= transactions_df["timestamp"],
transactions_df["timestamp"] <= transactions_df["timestamp_max"],
).astype(int)
transactions_df.head()
transactions_df["is_fraud"].value_counts()
# Save the tagged transaction data into file
transactions_file_path = f"{WORKING_DIR}/transactions.csv"
transactions_df.to_csv(transactions_file_path, index=False)
# Upload files to cloud if needed
if client.spark_runtime == "local":
# In local mode, we can use the same data path as the source.
# If the notebook is running on databricks, DATA_FILE_PATH should be already a dbfs path.
account_info_source_path = account_info_file_path
transactions_source_path = transactions_file_path
elif client.spark_runtime == "databricks" and is_databricks():
# If the notebook is running on databricks, we can use the same data path as the source.
account_info_source_path = account_info_file_path.replace("/dbfs", "dbfs:")
transactions_source_path = transactions_file_path.replace("/dbfs", "dbfs:")
else:
# Otherwise, upload the local file to the cloud storage (either dbfs or adls).
account_info_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(account_info_file_path)
transactions_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(transactions_file_path)
Now, we can define following features:
Some important concepts include HdfsSource
, TypedKey
, Feature
, FeatureAnchor
, and DerivedFeature
. Please refer to feathr documents to learn more about the details.
Let's first check the account data.
# Check account data
pd.read_csv(account_info_file_path).head()
Here, we use accountCountry
, isUserRegistered
, numPaymentRejects1dPerUser
, and accountAge
as the account features.
def account_preprocessing(df):
"""Drop rows with missing values in the account info dataset."""
return df.select(
"accountID",
"accountCountry",
"isUserRegistered",
"numPaymentRejects1dPerUser",
"accountAge",
).dropna(subset=["accountID"])
account_info_source = HdfsSource(
name="account_data",
path=account_info_source_path,
preprocessing=account_preprocessing,
)
# Account features will be joined to observation data on accountID
account_id = TypedKey(
key_column="accountID",
key_column_type=ValueType.STRING,
description="account id",
)
account_features = [
Feature(
name="account_country_code",
key=account_id,
feature_type=STRING,
transform="accountCountry",
),
Feature(
name="is_user_registered",
key=account_id,
feature_type=BOOLEAN,
transform="isUserRegistered==TRUE",
),
Feature(
name="num_payment_rejects_1d_per_user",
key=account_id,
feature_type=INT32,
transform="numPaymentRejects1dPerUser",
),
Feature(
name="account_age",
key=account_id,
feature_type=INT32,
transform="accountAge",
),
]
account_anchor = FeatureAnchor(
name="account_features",
source=account_info_source,
features=account_features,
)
We already checked the transaction dataset when we tagged the fraud label is_fraud
. So, let's jump to defining features.
def transaction_preprocessing(df):
"""Preprocess the transaction data."""
import pyspark.sql.functions as F
return df.withColumn("ipCountryCode", F.upper("ipCountryCode"))
transactions_source = HdfsSource(
name="transaction_data",
path=transactions_source_path,
event_timestamp_column="timestamp",
timestamp_format="yyyyMMdd HHmmss",
preprocessing=transaction_preprocessing,
)
# Transaction features will be joined to observation data on transactionID
transaction_id = TypedKey(
key_column="transactionID",
key_column_type=ValueType.STRING,
description="transaction id",
)
transaction_amount = Feature(
name="transaction_amount",
key=transaction_id,
feature_type=FLOAT,
transform="transactionAmount",
)
transaction_features = [
transaction_amount,
Feature(
name="transaction_country_code",
key=transaction_id,
feature_type=STRING,
transform="ipCountryCode",
),
Feature(
name="transaction_time",
key=transaction_id,
feature_type=FLOAT,
transform="localHour", # Local time of the transaction
),
Feature(
name="is_proxy_ip",
key=transaction_id,
feature_type=STRING, # [nan, True, False]
transform="isProxyIP",
),
Feature(
name="cvv_verify_result",
key=transaction_id,
feature_type=STRING, # [nan, 'M', 'P', 'N', 'X', 'U', 'S', 'Y']
transform="cvvVerifyResult",
),
]
transaction_feature_anchor = FeatureAnchor(
name="transaction_features",
source=transactions_source,
features=transaction_features,
)
# average amount of transaction in that week
avg_transaction_amount = Feature(
name="avg_transaction_amount",
key=account_id,
feature_type=FLOAT,
transform=WindowAggTransformation(
agg_expr="cast_float(transactionAmount)", agg_func="AVG", window="7d"
),
)
agg_features = [
avg_transaction_amount,
# number of transaction that took place in a day
Feature(
name="num_transaction_count_in_day",
key=account_id,
feature_type=INT32,
transform=WindowAggTransformation(
agg_expr="transactionID", agg_func="COUNT", window="1d"
),
),
# number of transaction that took place in the past week
Feature(
name="num_transaction_count_in_week",
key=account_id,
feature_type=INT32,
transform=WindowAggTransformation(
agg_expr="transactionID", agg_func="COUNT", window="7d"
),
),
# amount of transaction that took place in a day
Feature(
name="total_transaction_amount_in_day",
key=account_id,
feature_type=FLOAT,
transform=WindowAggTransformation(
agg_expr="cast_float(transactionAmount)", agg_func="SUM", window="1d"
),
),
# average time of transaction in the past week
Feature(
name="avg_transaction_time_in_week",
key=account_id,
feature_type=FLOAT,
transform=WindowAggTransformation(
agg_expr="cast_float(localHour)", agg_func="AVG", window="7d"
),
),
]
agg_anchor = FeatureAnchor(
name="transaction_agg_features",
source=transactions_source,
features=agg_features,
)
derived_features = [
DerivedFeature(
name="diff_between_current_and_avg_amount",
key=[transaction_id, account_id],
feature_type=FLOAT,
input_features=[transaction_amount, avg_transaction_amount],
transform="transaction_amount - avg_transaction_amount",
),
]
Now, let's build the features.
client.build_features(
anchor_list=[
account_anchor,
transaction_feature_anchor,
agg_anchor,
],
derived_feature_list=derived_features,
)
account_feature_names = [feat.name for feat in account_features] + [feat.name for feat in agg_features]
transactions_feature_names = [feat.name for feat in transaction_features]
derived_feature_names = [feat.name for feat in derived_features]
To extract the offline feature values from the features that have different keys, we use multiple FeatureQuery
objects.
account_feature_query = FeatureQuery(
feature_list=account_feature_names,
key=account_id,
)
transactions_feature_query = FeatureQuery(
feature_list=transactions_feature_names,
key=transaction_id,
)
derived_feature_query = FeatureQuery(
feature_list=derived_feature_names,
key=[transaction_id, account_id],
)
settings = ObservationSettings(
observation_path=transactions_source_path,
event_timestamp_column="timestamp",
timestamp_format="yyyyMMdd HHmmss",
)
client.get_offline_features(
observation_settings=settings,
feature_query=[account_feature_query, transactions_feature_query, derived_feature_query],
output_path=transactions_source_path.rpartition("/")[0] + f"/fraud_transactions_features.avro",
)
client.wait_job_to_finish(timeout_sec=5000)
df = get_result_df(client)[
account_feature_names
+ transactions_feature_names
+ derived_feature_names
+ ["is_fraud", "timestamp"]
]
df.head(5)
We use Random Forest Classifier to build a fraud detection model.
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
confusion_matrix,
f1_score,
precision_score,
recall_score,
PrecisionRecallDisplay,
)
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder
df.describe().T
df.nunique()
# plot only sub-samples for simplicity
NUM_SAMPLES_TO_PLOT = 5000
fig = px.scatter_matrix(
df.sample(n=NUM_SAMPLES_TO_PLOT, random_state=42),
dimensions=df.columns[:-2], # exclude the label and timestamp
color="is_fraud",
labels={col:col.replace('_', ' ') for col in df.columns}, # remove underscore
)
fig.update_traces(diagonal_visible=False, showupperhalf=False, marker_size=3, marker_opacity=0.5)
fig.update_layout(
width=2000,
height=2000,
title={"text": "Scatter matrix for transaction dataset", "font_size": 20},
font_size=6,
)
fig.show()
n_train = int(len(df) * 0.7)
train_df = df.iloc[:n_train]
test_df = df.iloc[n_train:]
print(f"""Training set:
{train_df["is_fraud"].value_counts()}
Validation set:
{test_df["is_fraud"].value_counts()}
""")
# Check the time range of the training and test set doesn't overlap
train_df["timestamp"].max(), test_df["timestamp"].min()
# Get labels as integers
y_train = train_df["is_fraud"].astype(int).to_numpy()
y_test = test_df["is_fraud"].astype(int).to_numpy()
# We convert categorical features into integer values by using one-hot-encoding and ordinal-encoding
categorical_feature_names = [
"account_country_code",
"transaction_country_code",
"cvv_verify_result",
]
ordinal_feature_names = [
"is_user_registered",
"is_proxy_ip",
]
one_hot_encoder = OneHotEncoder(sparse_output=False).fit(df[categorical_feature_names])
ordinal_encoder = OrdinalEncoder().fit(df[ordinal_feature_names])
ordinal_encoder.categories_
one_hot_encoder.categories_
X_train = np.concatenate(
(
one_hot_encoder.transform(train_df[categorical_feature_names]),
ordinal_encoder.transform(train_df[ordinal_feature_names]),
train_df.drop(categorical_feature_names + ordinal_feature_names + ["is_fraud", "timestamp"], axis="columns").fillna(0).to_numpy(),
),
axis=1,
)
X_test = np.concatenate(
(
one_hot_encoder.transform(test_df[categorical_feature_names]),
ordinal_encoder.transform(test_df[ordinal_feature_names]),
test_df.drop(categorical_feature_names + ordinal_feature_names + ["is_fraud", "timestamp"], axis="columns").fillna(0).to_numpy(),
),
axis=1,
)
clf = RandomForestClassifier(
n_estimators=50,
random_state=42,
).fit(X_train, y_train)
clf.score(X_test, y_test)
y_pred = clf.predict(X_test)
y_pred
y_prob = clf.predict_proba(X_test)
y_prob
To measure the performance, we use recall, precision and F1 score that handle imbalanced data better.
display = PrecisionRecallDisplay.from_predictions(
y_test, y_prob[:, 1], name="RandomForestClassifier"
)
_ = display.ax_.set_title("Fraud Detection Precision-Recall Curve")
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
print(f"""Precision: {precision},
Recall: {recall},
F1: {f1}""")
confusion_matrix(y_test, y_pred)
numeric_feature_names = [name for name in train_df.columns if name not in set(categorical_feature_names + ordinal_feature_names + ["is_fraud", "timestamp"])]
numeric_feature_names
# the order of features is [categorical features, ordinal features, numeric features]
importances = clf.feature_importances_[-len(numeric_feature_names):]
std = np.std([tree.feature_importances_[-len(numeric_feature_names):] for tree in clf.estimators_], axis=0)
fig = px.bar(
pd.DataFrame([numeric_feature_names, importances, std], index=["Numeric features", "importances", "std"]).T,
y="Numeric features",
x="importances",
error_x="std",
orientation="h",
title="Importance of the numeric features",
)
fig.update_layout(showlegend=False, width=1000)
fig.update_xaxes(title_text="Mean decrease in impurity", range=[0, 0.5])
fig.update_yaxes(title_text="Numeric features")
fig.show()
feature_names = categorical_feature_names + ordinal_feature_names
categories = one_hot_encoder.categories_ + ordinal_encoder.categories_
start_i = 0
n_rows = len(feature_names)
fig = make_subplots(
rows=n_rows,
cols=1,
subplot_titles=[name.replace("_", " ") for name in feature_names],
x_title="Mean decrease in impurity",
)
for i in range(n_rows):
category = categories[i]
end_i = start_i + len(category)
fig.add_trace(
go.Bar(
x=clf.feature_importances_[start_i:end_i],
y=category,
width=0.2,
error_x=dict(
type="data",
array=np.std([tree.feature_importances_[start_i:end_i] for tree in clf.estimators_], axis=0),
),
orientation="h",
),
row=i+1,
col=1,
)
start_i = end_i
fig.update_layout(title="Importance of the categorical features", showlegend=False, width=1000, height=1000)
fig.update_xaxes(range=[0, 0.5])
fig.show()
Now, we materialize features to RedisSink
so that we can retrieve online features.
ACCOUNT_FEATURE_TABLE_NAME = "fraudDetectionAccountFeatures"
backfill_time = BackfillTime(
start=datetime(2013, 8, 4),
end=datetime(2013, 8, 4),
step=timedelta(days=1),
)
client.materialize_features(
MaterializationSettings(
ACCOUNT_FEATURE_TABLE_NAME,
backfill_time=backfill_time,
sinks=[RedisSink(table_name=ACCOUNT_FEATURE_TABLE_NAME)],
feature_names=account_feature_names[1:],
),
allow_materialize_non_agg_feature=True,
)
client.wait_job_to_finish(timeout_sec=5000)
materialized_feature_values = client.get_online_features(
ACCOUNT_FEATURE_TABLE_NAME,
key="A1055520452832600",
feature_names=account_feature_names[1:],
)
materialized_feature_values
Scrap results for unit test
if SCRAP_RESULTS:
import scrapbook as sb
sb.glue("materialized_feature_values", materialized_feature_values)
sb.glue("precision", precision)
sb.glue("recall", recall)
sb.glue("f1", f1)
# Cleaning up the output files. CAUTION: this maybe dangerous if you "reused" the project name.
import shutil
shutil.rmtree(WORKING_DIR, ignore_errors=False)