Feature embedding is a way to translate a high-dimensional feature vector to a lower-dimensional vector, where the embedding can be learned and reused across models. In this example, we show how one can define feature embeddings in Feathr Feature Store via UDF (User Defined Function).
We use a sample hotel review dataset downloaded from Azure-Samples repository. The original dataset can be found here.
For the embedding, a pre-trained HuggingFace Transformer model is used to encode texts into numerical values. The text embeddings can be used for many NLP problems such as detecting fake reviews, sentiment analysis, and finding similar hotels, but building such models is out of scope and thus we don't cover that in this notebook.
sentence-transformers
pip package to your Spark pool to use the embedding example.First, install Feathr and other necessary packages to run this notebook.
# Install feathr from the latest codes in the repo. You may use `pip install "feathr[notebook]"` as well.
#%pip install "git+https://github.com/feathr-ai/feathr.git#subdirectory=feathr_project&egg=feathr[notebook]"
from copy import deepcopy
import json
import os
import pandas as pd
from pyspark.sql import DataFrame
import feathr
from feathr import (
# dtype
FLOAT_VECTOR, ValueType,
# source
HdfsSource,
# client
FeathrClient,
# feature
Feature,
# anchor
FeatureAnchor,
# typed_key
TypedKey,
# query_feature_list
FeatureQuery,
# settings
ObservationSettings,
# feathr_configurations
SparkExecutionConfiguration,
)
from feathr.datasets.constants import HOTEL_REVIEWS_URL
from feathr.datasets.utils import maybe_download
from feathr.utils.config import DEFAULT_DATABRICKS_CLUSTER_CONFIG, generate_config
from feathr.utils.job_utils import get_result_df
from feathr.utils.platform import is_jupyter, is_databricks
print(f"Feathr version: {feathr.__version__}")
Notebook parameters:
RESOURCE_PREFIX = "" # TODO fill the value
PROJECT_NAME = "hotel_reviews_embedding"
REGISTRY_ENDPOINT = f"https://{RESOURCE_PREFIX}webapp.azurewebsites.net/api/v1"
# TODO fill values to the following variables to use databricks cluster:
DATABRICKS_CLUSTER_ID = None # Set Databricks cluster id to use an existing cluster
if is_databricks():
# If this notebook is running on Databricks, its context can be used to retrieve token and instance URL
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
DATABRICKS_WORKSPACE_TOKEN_VALUE = ctx.apiToken().get()
SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f"https://{ctx.tags().get('browserHostName').get()}"
else:
# TODO change the values if necessary
DATABRICKS_WORKSPACE_TOKEN_VALUE = os.environ.get("DATABRICKS_WORKSPACE_TOKEN_VALUE")
SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = os.environ.get("SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL")
# TODO Change the value if necessary
DATABRICKS_NODE_SIZE = "Standard_DS3_v2"
# We'll need an authentication credential to access Azure resources and register features
USE_CLI_AUTH = False # Set True to use interactive authentication
# If set True, register the features to Feathr registry.
REGISTER_FEATURES = False
# TODO fill the values to use EnvironmentCredential for authentication. (e.g. to run this notebook on DataBricks.)
AZURE_TENANT_ID = None
AZURE_CLIENT_ID = None
AZURE_CLIENT_SECRET = None
# Set True to delete the project output files at the end of this notebook.
CLEAN_UP = False
# Get an authentication credential to access Azure resources and register features
if USE_CLI_AUTH:
# Use AZ CLI interactive browser authentication
!az login --use-device-code
from azure.identity import AzureCliCredential
credential = AzureCliCredential(additionally_allowed_tenants=['*'],)
elif AZURE_TENANT_ID and AZURE_CLIENT_ID and AZURE_CLIENT_SECRET:
# Use Environment variable secret
from azure.identity import EnvironmentCredential
os.environ["AZURE_TENANT_ID"] = AZURE_TENANT_ID
os.environ["AZURE_CLIENT_ID"] = AZURE_CLIENT_ID
os.environ["AZURE_CLIENT_SECRET"] = AZURE_CLIENT_SECRET
credential = EnvironmentCredential()
else:
# Try to use the default credential
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential(
exclude_interactive_browser_credential=False,
additionally_allowed_tenants=['*'],
)
First, prepare the hotel review dataset.
# Use dbfs if the notebook is running on Databricks
if is_databricks():
WORKING_DIR = f"/dbfs/{PROJECT_NAME}"
else:
WORKING_DIR = PROJECT_NAME
data_filepath = f"{WORKING_DIR}/hotel_reviews_100_with_id.csv"
maybe_download(src_url=HOTEL_REVIEWS_URL, dst_filepath=data_filepath)
Since the review IDs are not included in our sample dataset, we set incremantal numbers to the ID column so that we can use them for feature joinining later.
df = pd.read_csv(data_filepath)
df['reviews_id'] = df.index
# Verify the data
df.head(5)
# Save the updated data back to file so that we can use it later in this sample notebook.
df.to_csv(data_filepath, index=False)
databricks_cluster_config = deepcopy(DEFAULT_DATABRICKS_CLUSTER_CONFIG)
databricks_cluster_config["node_type_id"] = DATABRICKS_NODE_SIZE
databricks_config = {
"run_name": "FEATHR_FILL_IN",
"libraries": [
{"jar": "FEATHR_FILL_IN"},
# sentence-transformers pip package
{"pypi": {"package": "sentence-transformers"}},
],
"spark_jar_task": {
"main_class_name": "FEATHR_FILL_IN",
"parameters": ["FEATHR_FILL_IN"],
},
"new_cluster": databricks_cluster_config,
}
config_path = generate_config(
resource_prefix=RESOURCE_PREFIX,
project_name=PROJECT_NAME,
spark_config__spark_cluster="databricks",
# You may set an existing cluster id here, but Databricks recommend to use new clusters for greater reliability.
databricks_cluster_id=None, # Set None to create a new job cluster
databricks_workspace_token_value=DATABRICKS_WORKSPACE_TOKEN_VALUE,
spark_config__databricks__work_dir=f"dbfs:/{PROJECT_NAME}",
spark_config__databricks__workspace_instance_url=SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL,
spark_config__databricks__config_template=json.dumps(databricks_config),
feature_registry__api_endpoint=REGISTRY_ENDPOINT,
)
with open(config_path, "r") as f:
print(f.read())
client = FeathrClient(
config_path=config_path,
credential=credential,
use_env_vars=False,
)
With the feature creator's point of view, we implement a feature embedding UDF, define the embedding output as a feature, and register the feature to Feathr registry.
First, we set the data source path that our feature definition will use. This path will be used from the Feature Consumer Scenario later in this notebook when extracting the feature vectors.
if client.spark_runtime == "local":
data_source_path = data_filepath
# If the notebook is running on Databricks, convert to spark path format
elif client.spark_runtime == "databricks" and is_databricks():
data_source_path = data_filepath.replace("/dbfs", "dbfs:")
# Otherwise, upload the local file to the cloud storage (either dbfs or adls).
else:
data_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(data_filepath)
data_source_path
Create feature embedding UDF. Here, we will use a pretrained Transformer model from HuggingFace.
def sentence_embedding(df: DataFrame) -> DataFrame:
"""Feathr data source UDF to generate sentence embeddings.
Args:
df: A Spark DataFrame with a column named "reviews_text" of type string.
Returns:
A Spark DataFrame with a column named "reviews_text_embedding" of type array<float>.
"""
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import ArrayType, FloatType
from sentence_transformers import SentenceTransformer
@pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(data: pd.Series) -> pd.Series:
"""Pandas UDF transforming a pandas.Series of text into a pandas.Series of embeddings.
You may use iterator input and output instead, e.g. Iterator[pd.Series] -> Iterator[pd.Series]
"""
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
embedding = model.encode(data.to_list())
return pd.Series(embedding.tolist())
return df.withColumn("reviews_text_embedding", predict_batch_udf(col("reviews_text")))
hdfs_source = HdfsSource(
name="hotel_reviews",
path=data_source_path,
preprocessing=sentence_embedding,
)
# key is required for the features from non-INPUT_CONTEXT source
key = TypedKey(
key_column="reviews_id",
key_column_type=ValueType.INT64,
description="Reviews ID",
full_name=f"{PROJECT_NAME}.review_id",
)
# The column 'reviews_text_embedding' will be generated by our UDF `sentence_embedding`.
# We use the column as the feature.
features = [
Feature(
name="f_reviews_text_embedding",
key=key,
feature_type=FLOAT_VECTOR,
transform="reviews_text_embedding",
),
]
feature_anchor = FeatureAnchor(
name="feature_anchor",
source=hdfs_source,
features=features,
)
client.build_features(
anchor_list=[feature_anchor],
)
if REGISTER_FEATURES:
try:
client.register_features()
except Exception as e:
print(e)
print(client.list_registered_features(project_name=PROJECT_NAME))
# You can get the actual features too by calling client.get_features_from_registry(PROJECT_NAME)
if REGISTER_FEATURES:
registered_features = client.get_features_from_registry(project_name=PROJECT_NAME)
else:
# Assume we get the registered features. This is for a notebook unit-test w/o the actual registration.
registered_features = {feat.name: feat for feat in features}
print("Features:")
for f_name, f in registered_features.items():
print(f"\t{f_name} (key: {f.key[0].key_column})")
feature_name = "f_reviews_text_embedding"
feature_key = registered_features[feature_name].key[0]
if client.spark_runtime == "databricks":
output_filepath = f"dbfs:/{PROJECT_NAME}/feature_embeddings.parquet"
else:
raise ValueError("This notebook is expected to use Databricks as a target Spark cluster.\
To use other platforms, you'll need to install `sentence-transformers` pip package to your Spark cluster.")
query = FeatureQuery(
feature_list=[feature_name],
key=feature_key,
)
settings = ObservationSettings(
observation_path=data_source_path,
)
client.get_offline_features(
observation_settings=settings,
feature_query=query,
# For more details, see https://feathr-ai.github.io/feathr/how-to-guides/feathr-job-configuration.html
execution_configurations=SparkExecutionConfiguration({
"spark.feathr.outputFormat": "parquet",
"spark.sql.execution.arrow.enabled": "true",
}),
output_path=output_filepath,
)
client.wait_job_to_finish(timeout_sec=5000)
result_df = get_result_df(client=client, res_url=output_filepath, data_format="parquet")
result_df[["name", "reviews_text", feature_name]].head(5)
Let's visualize the feature values. Here, we use TSNE (T-distributed Stochastic Neighbor Embedding) using scikit-learn to plot the vectors in 2D space.
import numpy as np
import plotly.graph_objs as go
from sklearn.manifold import TSNE
X = np.stack(result_df[feature_name], axis=0)
result = TSNE(
n_components=2,
init='random',
perplexity=10,
).fit_transform(X)
result[:10]
names = set(result_df['name'])
names
fig = go.Figure()
for name in names:
mask = result_df['name']==name
fig.add_trace(go.Scatter(
x=result[mask, 0],
y=result[mask, 1],
name=name,
textposition='top center',
mode='markers+text',
marker={
'size': 8,
'opacity': 0.8,
},
))
fig.update_layout(
margin={'l': 0, 'r': 0, 'b': 0, 't': 0},
showlegend=True,
autosize=False,
width=1000,
height=500,
)
fig.show()
# Cleaning up the output files. CAUTION: this maybe dangerous if you "reused" the project name.
if CLEAN_UP:
import shutil
shutil.rmtree(WORKING_DIR, ignore_errors=False)