key = "blobstorekey" acnt = "studiofeathrazuredevsto" container = "studio-feathrazure-dev-fs" mntpnt = "/mnt/studio-feathrazure-dev-fs" def mountStorageContainer(storageAccount, storageAccountKey, storageContainer, blobMountPoint): try: print("Mounting {0} to {1}:".format(storageContainer, blobMountPoint)) dbutils.fs.unmount(blobMountPoint) except Exception as e: print("....Container is not mounted; Attempting mounting now..") mountStatus = dbutils.fs.mount(source = "wasbs://{0}@{1}.blob.core.windows.net/".format(storageContainer, storageAccount), mount_point = blobMountPoint, extra_configs = {"fs.azure.account.key.{0}.blob.core.windows.net".format(storageAccount): storageAccountKey}) print("....Status of mount is: " + str(mountStatus)) print() mountStorageContainer(acnt,key,container,mntpnt) df = spark.read.format("csv").option("header", "true").load("/mnt/studio-feathrazure-dev-fs/data/customer360.csv") display(df) %pip install --force-reinstall git+https://github.com/feathr-ai/feathr.git@registry_fix#subdirectory=feathr_project pandavro scikit-learn import tempfile yaml_config = """ api_version: 1 project_config: project_name: 'customer360' required_environment_variables: - 'REDIS_PASSWORD' - 'ADLS_ACCOUNT' - 'ADLS_KEY' - 'BLOB_ACCOUNT' - 'BLOB_KEY' - 'DATABRICKS_WORKSPACE_TOKEN_VALUE ' offline_store: adls: adls_enabled: true wasb: wasb_enabled: true s3: s3_enabled: false s3_endpoint: 's3.amazonaws.com' jdbc: jdbc_enabled: false jdbc_database: '' jdbc_table: '' snowflake: snowflake_enabled: false url: ".snowflakecomputing.com" user: "" role: "" warehouse: "" spark_config: spark_cluster: 'databricks' spark_result_output_parts: '1' azure_synapse: dev_url: 'https://feathrazure.dev.azuresynapse.net' pool_name: 'spark3' workspace_dir: 'abfss://container@blobaccountname.dfs.core.windows.net/demo_data1/' executor_size: 'Small' executor_num: 1 databricks: workspace_instance_url: "https://.azuredatabricks.net/" workspace_token_value: "" config_template: '{"run_name":"","new_cluster":{"spark_version":"9.1.x-scala2.12","node_type_id":"Standard_D3_v2","num_workers":2,"spark_conf":{}},"libraries":[{"jar":""}],"spark_jar_task":{"main_class_name":"","parameters":[""]}}' work_dir: 'dbfs:/customer360' online_store: redis: host: '.redis.cache.windows.net' port: 6380 ssl_enabled: True feature_registry: api_endpoint: "https://.azurewebsites.net/api/v1" """ # write this configuration string to a temporary location and load it to Feathr tmp = tempfile.NamedTemporaryFile(mode='w', delete=False) with open(tmp.name, "w") as text_file: text_file.write(yaml_config) import glob import os import tempfile from datetime import datetime, timedelta from math import sqrt import pandas as pd import pandavro as pdx from feathr import FeathrClient from feathr import BOOLEAN, FLOAT, INT32, ValueType,STRING from feathr import Feature, DerivedFeature, FeatureAnchor from feathr import FeatureAnchor from feathr.client import FeathrClient from feathr import DerivedFeature from feathr import BackfillTime, MaterializationSettings from feathr import FeatureQuery, ObservationSettings from feathr import RedisSink from feathr import INPUT_CONTEXT, HdfsSource from feathr import WindowAggTransformation from feathr import TypedKey from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split import os os.environ['REDIS_PASSWORD'] = '' os.environ['ADLS_ACCOUNT'] = '' os.environ['ADLS_KEY'] = '' os.environ['BLOB_ACCOUNT'] = "" os.environ['BLOB_KEY'] = '' os.environ['DATABRICKS_WORKSPACE_TOKEN_VALUE'] = '' client = FeathrClient(config_path=tmp.name) batch_source = HdfsSource(name="cosmos_final_data", path="abfss://container@blobaccountname.dfs.core.windows.net/data/customer360.csv", event_timestamp_column="sales_order_dt", timestamp_format="yyyy-MM-dd") f_sales_cust_id = Feature(name = "f_sales_cust_id", feature_type = STRING, transform = "sales_cust_id" ) f_sales_tran_id = Feature(name = "f_sales_tran_id", feature_type = STRING, transform = "sales_tran_id" ) f_sales_order_id = Feature(name = "f_sales_order_id", feature_type = STRING, transform = "sales_order_id" ) f_sales_item_quantity = Feature(name = "f_sales_item_quantity", feature_type = INT32, transform = "cast_float(sales_item_quantity)" ) f_sales_order_dt = Feature(name = "f_sales_order_dt", feature_type = STRING, transform = "sales_order_dt" ) f_sales_sell_price = Feature(name = "f_sales_sell_price", feature_type = INT32, transform = "cast_float(sales_sell_price)" ) f_sales_discount_amt = Feature(name = "f_sales_discount_amt", feature_type = INT32, transform = "cast_float(sales_discount_amt)" ) f_payment_preference = Feature(name = "f_payment_preference", feature_type = STRING, transform = "payment_preference" ) features = [f_sales_cust_id, f_sales_tran_id, f_sales_order_id, f_sales_item_quantity, f_sales_order_dt, f_sales_sell_price, f_sales_discount_amt, f_payment_preference] request_anchor = FeatureAnchor(name="request_features", source=INPUT_CONTEXT, features=features) f_total_sales_amount = DerivedFeature(name = "f_total_sales_amount", feature_type = FLOAT, input_features = [f_sales_item_quantity,f_sales_sell_price], transform = "f_sales_item_quantity * f_sales_sell_price") f_total_sales_discount= DerivedFeature(name = "f_total_sales_discount", feature_type = FLOAT, input_features = [f_sales_item_quantity,f_sales_discount_amt], transform = "f_sales_item_quantity * f_sales_discount_amt") f_total_amount_paid= DerivedFeature(name = "f_total_amount_paid", feature_type = FLOAT, input_features = [f_sales_sell_price,f_sales_discount_amt], transform ="f_sales_sell_price - f_sales_discount_amt") customer_ID = TypedKey(key_column="sales_cust_id", key_column_type=ValueType.INT32, description="customer ID", full_name="cosmos.sales_cust_id") agg_features = [Feature(name="f_avg_customer_sales_amount", key=customer_ID, feature_type=FLOAT, transform=WindowAggTransformation(agg_expr="cast_float(sales_sell_price)", agg_func="AVG", window="1d")), Feature(name="f_avg_customer_discount_amount", key=customer_ID, feature_type=FLOAT, transform=WindowAggTransformation(agg_expr="cast_float(sales_discount_amt)", agg_func="AVG", window="1d")), Feature(name="f_avg_item_ordered_by_customer", key=customer_ID, feature_type=FLOAT, transform=WindowAggTransformation(agg_expr="cast_float(sales_item_quantity)", agg_func="AVG", window="1d"))] agg_anchor = FeatureAnchor(name="aggregationFeatures", source=batch_source, features=agg_features) client.build_features(anchor_list=[request_anchor,agg_anchor], derived_feature_list=[f_total_sales_amount, f_total_sales_discount,f_total_amount_paid]) client.register_features() client.list_registered_features(project_name="customer360") feature_query = FeatureQuery( feature_list=["f_avg_item_ordered_by_customer","f_avg_customer_discount_amount","f_avg_customer_sales_amount","f_total_sales_discount"], key=customer_ID) settings = ObservationSettings( observation_path="abfss://container@blobaccountname.dfs.core.windows.net/data/customer360.csv", event_timestamp_column="sales_order_dt", timestamp_format="yyyy-MM-dd") client.get_offline_features(observation_settings=settings, feature_query=feature_query, output_path="abfss://container@blobaccountname.dfs.core.windows.net/data/output/output.avro") client.wait_job_to_finish(timeout_sec=500) path = '/mnt/studio-feathrazure-dev-fs/cosmos/output/output' df= spark.read.format("avro").load(path) df = df.toPandas() display(df) X = df['f_total_sales_discount'] y = df['f_total_sales_amount'] X_train, X_test, y_train, y_test = train_test_split(X, y, train_size = 0.7, test_size = 0.3, random_state = 100) # Add a constant to get an intercept X_train_sm = sm.add_constant(X_train) # Fit the resgression line using 'OLS' lr = sm.OLS(y_train, X_train_sm).fit() # Add a constant to X_test X_test_sm = sm.add_constant(X_test) # Predict the y values corresponding to X_test_sm y_pred = lr.predict(X_test_sm) # Checking the R-squared on the test set r_squared = r2_score(y_test, y_pred) r_squared print("Model MAPE:") print(1 - r_squared) print() print("Model Accuracy:") print(r_squared) redisSink = RedisSink(table_name="Customer360") settings = MaterializationSettings("cosmos_feathr_table", sinks=[redisSink], feature_names=["f_avg_item_ordered_by_customer","f_avg_customer_discount_amount"]) client.materialize_features(settings, allow_materialize_non_agg_feature =True) client.wait_job_to_finish(timeout_sec=500) client.get_online_features(feature_table = "Customer360", key = "KB-16240", feature_names = ['f_avg_item_ordered_by_customer'])