#!/usr/bin/env python # coding: utf-8 # # Using Feature Embedding with Feathr Feature Store # # Feature embedding is a way to translate a high-dimensional feature vector to a lower-dimensional vector, where the embedding can be learned and reused across models. In this example, we show how one can define feature embeddings in Feathr Feature Store via **UDF (User Defined Function).** # # We use a sample hotel review dataset downloaded from [Azure-Samples repository](https://github.com/Azure-Samples/azure-search-python-samples/tree/main/AzureML-Custom-Skill/datasets). The original dataset can be found [here](https://www.kaggle.com/datasets/datafiniti/hotel-reviews). # # For the embedding, a pre-trained [HuggingFace Transformer model](https://huggingface.co/sentence-transformers) is used to encode texts into numerical values. The text embeddings can be used for many NLP problems such as detecting fake reviews, sentiment analysis, and finding similar hotels, but building such models is out of scope and thus we don't cover that in this notebook. # # ## Prerequisite # * Databricks: In this notebook, we use Databricks as the target Spark platform. # - You may use Azure Synapse Spark pool too by following [this](https://github.com/feathr-ai/feathr/blob/main/docs/quickstart_synapse.md) instructions. Note, you'll need to install a `sentence-transformers` pip package to your Spark pool to use the embedding example. # * Feature registry: We showcase using feature registry later in this notebook. You may use [ARM-template](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html) to deploy the necessary resources. # # First, install Feathr and other necessary packages to run this notebook. # In[ ]: # Install feathr from the latest codes in the repo. You may use `pip install "feathr[notebook]"` as well. #%pip install "git+https://github.com/feathr-ai/feathr.git#subdirectory=feathr_project&egg=feathr[notebook]" # In[ ]: from copy import deepcopy import json import os import pandas as pd from pyspark.sql import DataFrame import feathr from feathr import ( # dtype FLOAT_VECTOR, ValueType, # source HdfsSource, # client FeathrClient, # feature Feature, # anchor FeatureAnchor, # typed_key TypedKey, # query_feature_list FeatureQuery, # settings ObservationSettings, # feathr_configurations SparkExecutionConfiguration, ) from feathr.datasets.constants import HOTEL_REVIEWS_URL from feathr.datasets.utils import maybe_download from feathr.utils.config import DEFAULT_DATABRICKS_CLUSTER_CONFIG, generate_config from feathr.utils.job_utils import get_result_df from feathr.utils.platform import is_jupyter, is_databricks print(f"Feathr version: {feathr.__version__}") # Notebook parameters: # In[ ]: RESOURCE_PREFIX = "" # TODO fill the value PROJECT_NAME = "hotel_reviews_embedding" REGISTRY_ENDPOINT = f"https://{RESOURCE_PREFIX}webapp.azurewebsites.net/api/v1" # TODO fill values to the following variables to use databricks cluster: DATABRICKS_CLUSTER_ID = None # Set Databricks cluster id to use an existing cluster if is_databricks(): # If this notebook is running on Databricks, its context can be used to retrieve token and instance URL ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext() DATABRICKS_WORKSPACE_TOKEN_VALUE = ctx.apiToken().get() SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = f"https://{ctx.tags().get('browserHostName').get()}" else: # TODO change the values if necessary DATABRICKS_WORKSPACE_TOKEN_VALUE = os.environ.get("DATABRICKS_WORKSPACE_TOKEN_VALUE") SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL = os.environ.get("SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL") # TODO Change the value if necessary DATABRICKS_NODE_SIZE = "Standard_DS3_v2" # We'll need an authentication credential to access Azure resources and register features USE_CLI_AUTH = False # Set True to use interactive authentication # If set True, register the features to Feathr registry. REGISTER_FEATURES = False # TODO fill the values to use EnvironmentCredential for authentication. (e.g. to run this notebook on DataBricks.) AZURE_TENANT_ID = None AZURE_CLIENT_ID = None AZURE_CLIENT_SECRET = None # Set True to delete the project output files at the end of this notebook. CLEAN_UP = False # In[ ]: # Get an authentication credential to access Azure resources and register features if USE_CLI_AUTH: # Use AZ CLI interactive browser authentication get_ipython().system('az login --use-device-code') from azure.identity import AzureCliCredential credential = AzureCliCredential(additionally_allowed_tenants=['*'],) elif AZURE_TENANT_ID and AZURE_CLIENT_ID and AZURE_CLIENT_SECRET: # Use Environment variable secret from azure.identity import EnvironmentCredential os.environ["AZURE_TENANT_ID"] = AZURE_TENANT_ID os.environ["AZURE_CLIENT_ID"] = AZURE_CLIENT_ID os.environ["AZURE_CLIENT_SECRET"] = AZURE_CLIENT_SECRET credential = EnvironmentCredential() else: # Try to use the default credential from azure.identity import DefaultAzureCredential credential = DefaultAzureCredential( exclude_interactive_browser_credential=False, additionally_allowed_tenants=['*'], ) # ## Prepare Dataset # # First, prepare the hotel review dataset. # In[ ]: # Use dbfs if the notebook is running on Databricks if is_databricks(): WORKING_DIR = f"/dbfs/{PROJECT_NAME}" else: WORKING_DIR = PROJECT_NAME # In[ ]: data_filepath = f"{WORKING_DIR}/hotel_reviews_100_with_id.csv" maybe_download(src_url=HOTEL_REVIEWS_URL, dst_filepath=data_filepath) # Since the review IDs are not included in our sample dataset, we set incremantal numbers to the ID column so that we can use them for feature joinining later. # In[ ]: df = pd.read_csv(data_filepath) df['reviews_id'] = df.index # In[ ]: # Verify the data df.head(5) # In[ ]: # Save the updated data back to file so that we can use it later in this sample notebook. df.to_csv(data_filepath, index=False) # ## Initialize Feathr Client # In[ ]: databricks_cluster_config = deepcopy(DEFAULT_DATABRICKS_CLUSTER_CONFIG) databricks_cluster_config["node_type_id"] = DATABRICKS_NODE_SIZE databricks_config = { "run_name": "FEATHR_FILL_IN", "libraries": [ {"jar": "FEATHR_FILL_IN"}, # sentence-transformers pip package {"pypi": {"package": "sentence-transformers"}}, ], "spark_jar_task": { "main_class_name": "FEATHR_FILL_IN", "parameters": ["FEATHR_FILL_IN"], }, "new_cluster": databricks_cluster_config, } config_path = generate_config( resource_prefix=RESOURCE_PREFIX, project_name=PROJECT_NAME, spark_config__spark_cluster="databricks", # You may set an existing cluster id here, but Databricks recommend to use new clusters for greater reliability. databricks_cluster_id=None, # Set None to create a new job cluster databricks_workspace_token_value=DATABRICKS_WORKSPACE_TOKEN_VALUE, spark_config__databricks__work_dir=f"dbfs:/{PROJECT_NAME}", spark_config__databricks__workspace_instance_url=SPARK_CONFIG__DATABRICKS__WORKSPACE_INSTANCE_URL, spark_config__databricks__config_template=json.dumps(databricks_config), feature_registry__api_endpoint=REGISTRY_ENDPOINT, ) with open(config_path, "r") as f: print(f.read()) # In[ ]: client = FeathrClient( config_path=config_path, credential=credential, use_env_vars=False, ) # ## Feature Creator Scenario # # With the feature creator's point of view, we implement a feature embedding UDF, define the embedding output as a feature, and register the feature to Feathr registry. # # ### Create Features # # First, we set the data source path that our feature definition will use. This path will be used from the **Feature Consumer Scenario** later in this notebook when extracting the feature vectors. # In[ ]: if client.spark_runtime == "local": data_source_path = data_filepath # If the notebook is running on Databricks, convert to spark path format elif client.spark_runtime == "databricks" and is_databricks(): data_source_path = data_filepath.replace("/dbfs", "dbfs:") # Otherwise, upload the local file to the cloud storage (either dbfs or adls). else: data_source_path = client.feathr_spark_launcher.upload_or_get_cloud_path(data_filepath) data_source_path # Create feature embedding UDF. Here, we will use a [pretrained Transformer model from HuggingFace](https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2). # In[ ]: def sentence_embedding(df: DataFrame) -> DataFrame: """Feathr data source UDF to generate sentence embeddings. Args: df: A Spark DataFrame with a column named "reviews_text" of type string. Returns: A Spark DataFrame with a column named "reviews_text_embedding" of type array. """ import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import ArrayType, FloatType from sentence_transformers import SentenceTransformer @pandas_udf(ArrayType(FloatType())) def predict_batch_udf(data: pd.Series) -> pd.Series: """Pandas UDF transforming a pandas.Series of text into a pandas.Series of embeddings. You may use iterator input and output instead, e.g. Iterator[pd.Series] -> Iterator[pd.Series] """ model = SentenceTransformer('paraphrase-MiniLM-L6-v2') embedding = model.encode(data.to_list()) return pd.Series(embedding.tolist()) return df.withColumn("reviews_text_embedding", predict_batch_udf(col("reviews_text"))) # In[ ]: hdfs_source = HdfsSource( name="hotel_reviews", path=data_source_path, preprocessing=sentence_embedding, ) # key is required for the features from non-INPUT_CONTEXT source key = TypedKey( key_column="reviews_id", key_column_type=ValueType.INT64, description="Reviews ID", full_name=f"{PROJECT_NAME}.review_id", ) # The column 'reviews_text_embedding' will be generated by our UDF `sentence_embedding`. # We use the column as the feature. features = [ Feature( name="f_reviews_text_embedding", key=key, feature_type=FLOAT_VECTOR, transform="reviews_text_embedding", ), ] feature_anchor = FeatureAnchor( name="feature_anchor", source=hdfs_source, features=features, ) # In[ ]: client.build_features( anchor_list=[feature_anchor], ) # ### Register the Features # In[ ]: if REGISTER_FEATURES: try: client.register_features() except Exception as e: print(e) print(client.list_registered_features(project_name=PROJECT_NAME)) # You can get the actual features too by calling client.get_features_from_registry(PROJECT_NAME) # ## Feature Consumer Scenario # # From the feature consumer point of view, we first get the registered feature and then extract the feature vectors by using the feature definition. # # ### Get Registered Features # In[ ]: if REGISTER_FEATURES: registered_features = client.get_features_from_registry(project_name=PROJECT_NAME) else: # Assume we get the registered features. This is for a notebook unit-test w/o the actual registration. registered_features = {feat.name: feat for feat in features} print("Features:") for f_name, f in registered_features.items(): print(f"\t{f_name} (key: {f.key[0].key_column})") # ### Extract the Features # In[ ]: feature_name = "f_reviews_text_embedding" feature_key = registered_features[feature_name].key[0] if client.spark_runtime == "databricks": output_filepath = f"dbfs:/{PROJECT_NAME}/feature_embeddings.parquet" else: raise ValueError("This notebook is expected to use Databricks as a target Spark cluster.\ To use other platforms, you'll need to install `sentence-transformers` pip package to your Spark cluster.") # In[ ]: query = FeatureQuery( feature_list=[feature_name], key=feature_key, ) settings = ObservationSettings( observation_path=data_source_path, ) client.get_offline_features( observation_settings=settings, feature_query=query, # For more details, see https://feathr-ai.github.io/feathr/how-to-guides/feathr-job-configuration.html execution_configurations=SparkExecutionConfiguration({ "spark.feathr.outputFormat": "parquet", "spark.sql.execution.arrow.enabled": "true", }), output_path=output_filepath, ) client.wait_job_to_finish(timeout_sec=5000) # In[ ]: result_df = get_result_df(client=client, res_url=output_filepath, data_format="parquet") result_df[["name", "reviews_text", feature_name]].head(5) # Let's visualize the feature values. Here, we use TSNE (T-distributed Stochastic Neighbor Embedding) using [scikit-learn](https://scikit-learn.org/stable/modules/generated/sklearn.manifold.TSNE.html) to plot the vectors in 2D space. # In[ ]: import numpy as np import plotly.graph_objs as go from sklearn.manifold import TSNE X = np.stack(result_df[feature_name], axis=0) result = TSNE( n_components=2, init='random', perplexity=10, ).fit_transform(X) result[:10] # In[ ]: names = set(result_df['name']) names # In[ ]: fig = go.Figure() for name in names: mask = result_df['name']==name fig.add_trace(go.Scatter( x=result[mask, 0], y=result[mask, 1], name=name, textposition='top center', mode='markers+text', marker={ 'size': 8, 'opacity': 0.8, }, )) fig.update_layout( margin={'l': 0, 'r': 0, 'b': 0, 't': 0}, showlegend=True, autosize=False, width=1000, height=500, ) fig.show() # ### Cleanup # In[ ]: # Cleaning up the output files. CAUTION: this maybe dangerous if you "reused" the project name. if CLEAN_UP: import shutil shutil.rmtree(WORKING_DIR, ignore_errors=False) # In[ ]: