This notebook is the first in the series of notebooks that show how Aerospike can be used as a feature store.
This notebook requires the Aerospike Database and Spark running locally with Aerospike Spark Connector. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
This notebook demonstrates how Aerospike can be used as a Feature Store for Machine Learning applications on Spark using the Aerospike Spark Connector. It is part of the Feature Store series of notebooks, and focuses on data model and Feature Engineering aspects concerning a Feature Store. The subsequent notebook(s) will describe the Model Traiing and Model Serving aspects of an ML application that deal with a Feature Store.
This notebook is organized in two parts:
This tutorial assumes familiarity with the following topics:
Set up Aerospike Server. Spark Server, and Spark Connector.
This notebook requires that Aerospike datbase is running.
!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"
Aerospike database is running!
We will be using Spark functionality in this notebook.
# directory where spark notebook requisites are installed
SPARK_NB_DIR = '/opt/spark-nb'
SPARK_DIR = 'spark-dir-link'
SPARK_HOME = SPARK_NB_DIR + '/' + SPARK_DIR
AEROSPIKE_JAR = 'aerospike-jar-link'
AEROSPIKE_JAR_PATH = SPARK_NB_DIR + '/' + AEROSPIKE_JAR
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="localhost"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Locate the Spark installation using the SPARK_HOME parameter.
import findspark
findspark.init(SPARK_HOME)
# Specify the Aerospike Spark Connector jar in the command used to interact with Aerospike.
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
Please visit Configuring Aerospike Connect for Spark for more information about the properties used on this page.
# imports
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
Aerospike can be used as a Feature Store in cases where broader capabilities of a commercial Feature Store are not warranted. Instead, when it is critical to have:
An Online Feature Store provides access to features in real time, and as such requires speed and, for many deployments, scalability. The Online Store only stores the latest version of feature values.
Aerospike is purpose built for the performance and scale requirements of an Online Store.
An Offline Store requires historic data to be maintained for “time travel” use cases, that is, to create datasets at a particular point in time for training and other needs. A time series of feature values can be maintained in Aerospike as a map (timestamp->value) for each feature. Aerospike's native APIs allow efficient implementation of an Offline Store.
An essential requirement for a Feature Store is to maintain Online-Offline consistency to ensure that the models that were trained with the Offline (historic) data remain valid in the production (Online) environment. Typically this means convenient, efficient, and timely sync of data from Offline to Online Store. Sometimes a reverse sync from Online to Offline Store may be requred if only the Online Store may be updated.
Synchronization in either direction between Online and Offline Stores can be achieved through various streaming connectors that Aerospike provides (such as for Kafka and Pulsar).
What is also necessary is to be able to access a specific version “as of time” (a special case being the latest version) for various use cases using the appropriate client library or connector.
Our focus in this Feature Store series will be primarily on the Online Store.
The key design criteria for a Feature Store with Aerospike are access and storage efficiency. (The other key criterion, Offline - Online synchronization, will not be discussed further here. For our purpose here, we assume that can be handled with appropriate change notification and update setup between Offline and Online Stores.)
We will assume the following object model which is typical in a Feature Store:
Feature Values
are stored for Entities
such as users, credit card transactions, and sensors.Features
are organized in Feature Groups
. A Feature Group
signifies the set of features that share common data sources, pipeline, and are created and updated together.Dataset
is a snapshot of specified Features
across a set of Entity
instances. The snapshot is stored outside the Feature Store, whereas the Feature Store holds the metadata such as the snapshot query and location.There are multiple ways to store the latest Feature Values, each with its trade-offs. Here are two possible options:
One Entity
instance record holds all Feature Values
across multiple Feature Groups
.
Entity
type (e.g., "user-features").Feature Group
's Features
entails listing the Feature
bins. Accessing a Feature
vector at Model Serving time entails enumerating the model's Feature
bins across Feature Groups
. To avoid Feature
name collisions acroos Feature Groups
, a bin name needs to include both the Feature Group
and Feature
name such as Feature-Group:Feature
.Feature Groups
without additional mechanisms.Multiple Entity
instance records, one for each Feature Group
.
Feature Group
's values are stored in its own set named in form Entity:FeatureGroup-features
such as "user:fg1-features".Feature Group
's Features
entail listing the Feature
bins. Accessing a Feature
vector at Model Serving time entails multiple requests over Feature Groups
. Bin names are same as Feature
names since they are unique within a Feature Group
.Features
if they may exceed the record size limit.Feature Group
since each is updated separately.Features
across Feature Groups
.The design requires multiple versions of Feature Values to be stored, each with its timestamp. One way of modeling such a time series of feature values in Aerospike is to store each feature as a map timestamp->value
.
With historic versions stored in it, the Offline Store has to account for the data size problem: both in terms of individual record size as well as overall database size.
entity-key:duration-id
like "123:202101", for example, for a record holding feature values during January 2021 for a user with id 123.Before we dive into the implementation, here are some relevant performance and scalability aspects in use of the Spark Connector.
Random access using the primary key must use the __key bin for direct access through the primary index. Many times the unique user key is duplicated for convenience in another bin, but the equality predicate using another bin will result in a scan. This is shown below in single object load
methods.
A set index should be enabled on a set for faster scans over the set. Data exploration as well as dataset creation operations will benefit by not having to scan the entire namespace instead of a fraction of records in a set, resulting in significant speedup as the namespace can be very large as compared to, say, the metadata sets. In the following implementation, we enable set indexes on all sets.
To minimize the amount of data retrieved from the database, query predicates must be "pushed down", or in other words, processed in the database and not on Spark. We will illustrate how expression pushdown is implemented with examples in the Model Training notebook.
Several tuning parameters such as partition factor
and compression
are available for optimizing perfomrance with the Spark Connector. The implementation here does not make use of them, but you are encouraged to explire them elsewhere.
While Aerospike provides secondary indexes, the Spark Connector currently does not leverage them. This performance enhancement is planned in the future.
To make it concrete, let us define and implement a simple Feature Store.
The main objects in a Feature Store as discussed above are Feature Group, Feature, Entiry, and Dataset. These are explained below.
Machine Learning features are grouped by the source of data and processing pipeline. A Feature Group includes many features, and has the following attributes that are useful during feature exploration:
A feature group is stored in the set "fg-metadata".
A Feature consists of the following attributes:
A feature is stored in the set "feature-metadata".
Features are computed and stored for an Entity such as a user, credit card, or sensor. Feature values are stored per Entity instance. Features in multiple feature groups for an Entity type are combined in one Entity record. A feature values record for an Entity has these attributes:
Entity records for an entity type are stored in an entity-type specific set "entitytype-features". For example, the set "user-features" holds features for user instances, and "cc-features" holds features for credit card instances.
A Dataset is a subset of features and entities selected to train an ML model. A Dataset object holds the selected features and entity instance definitions. The actual copy of entities is stored outside the feature store (for instance, in a file system). A dataset record has the following attributes.
Datasets are stored in the set "dataset-metadata".
The following operations are implemented for the different use scenarios:
Feature Engineering
Model Training
Model Serving
The following code is a simple implementation of the above operations. These operations will be illustrated in this and follow-up notebooks.
import copy
class FeatureGroup:
schema = StructType([StructField("name", StringType(), False),
StructField("description", StringType(), True),
StructField("source", StringType(), True),
StructField("attrs", MapType(StringType(), StringType()), True),
StructField("tags", ArrayType(StringType()), True)])
def __init__(self, name, description, source, attrs, tags):
self.name = name
self.description = description
self.source = source
self.attrs = attrs
self.tags = tags
return
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def save(self):
inputBuf = [(self.name, self.description, self.source, self.attrs, self.tags)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, FeatureGroup.schema)
#Write the data frame to Aerospike, the name field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "fg-metadata")\
.option("aerospike.updateByKey", "name") \
.save()
return
def load(name):
fg = None
schema = copy.deepcopy(FeatureGroup.schema)
schema.add("__key", StringType(), False)
fgdf = spark.read \
.format("aerospike") \
.option("aerospike.set", "fg-metadata") \
.schema(schema) \
.load().where("__key = \"" + name + "\"")
if fgdf.count() > 0:
fgtuple = fgdf.collect()[0]
fg = FeatureGroup(*fgtuple[:-1])
return fg
def query(predicate): #returns a dataframe
fg_df = spark.read \
.format("aerospike") \
.schema(FeatureGroup.schema) \
.option("aerospike.set", "fg-metadata") \
.load().where(predicate)
return fg_df
# Enable set index on fg-metadata
!asinfo -v "set-config:context=namespace;id=test;set=fg-metadata;enable-index=true"
ok
# test feature group
# test save and load
# save
fg1 = FeatureGroup("fg_name1", "fg_desc1", "fg_source1",
{"fg_attr1":"1", "fg_attr2":"two"}, ["fg_tag1", "fg_tag2"])
fg1.save()
# load
fg2 = FeatureGroup.load("fg_name1")
print(fg2, '\n')
# test query
print("Feature Groups with name ending with '_name1' and having attribute 'fg_tag1'='1':")
df = FeatureGroup.query("name like '%_name1' and attrs.fg_attr1 == '1'")
df.show()
<class '__main__.FeatureGroup'>: {'name': 'fg_name1', 'description': 'fg_desc1', 'source': 'fg_source1', 'attrs': {'fg_attr2': 'two', 'fg_attr1': '1'}, 'tags': ['fg_tag1', 'fg_tag2']} Feature Groups with name ending with '_name1' and having attribute 'fg_tag1'='1': +--------+-----------+----------+--------------------+------------------+ | name|description| source| attrs| tags| +--------+-----------+----------+--------------------+------------------+ |fg_name1| fg_desc1|fg_source1|{fg_attr1 -> 1, f...|[fg_tag1, fg_tag2]| +--------+-----------+----------+--------------------+------------------+
class Feature:
schema = StructType([StructField("fid", StringType(), False),
StructField("fgname", StringType(), False),
StructField("name", StringType(), False),
StructField("type", StringType(), False),
StructField("description", StringType(), True),
StructField("attrs", MapType(StringType(), StringType()), True),
StructField("tags", ArrayType(StringType()), True)])
def __init__(self, fgname, name, ftype, description, attrs, tags):
self.fid = fgname + '_' + name
self.fgname = fgname
self.name = name
self.ftype = ftype
self.description = description
self.attrs = attrs
self.tags = tags
return
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def save(self):
inputBuf = [(self.fid, self.fgname, self.name, self.ftype, self.description, self.attrs, self.tags)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, Feature.schema)
#Write the data frame to Aerospike, the fid field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "feature-metadata")\
.option("aerospike.updateByKey", "fid") \
.save()
return
def load(fgname, name):
f = None
schema = copy.deepcopy(Feature.schema)
schema.add("__key", StringType(), False)
f_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", "feature-metadata") \
.load().where("__key = \"" + fgname+'_'+name + "\"")
if f_df.count() > 0:
f_tuple = f_df.collect()[0]
f = Feature(*f_tuple[1:-1])
return f
def query(predicate): #returns a dataframe
f_df = spark.read \
.format("aerospike") \
.schema(Feature.schema) \
.option("aerospike.set", "feature-metadata") \
.load().where(predicate)
return f_df
# Enable set index on feature-metadata
!asinfo -v "set-config:context=namespace;id=test;set=feature-metadata;enable-index=true"
ok
# test Feature
# test save and load
# save
feature1 = Feature("fg_name1", "f_name1", "integer", "f_desc1",
{"f_attr1":"1", "f_attr2":"two"}, ["f_tag1", "f_tag2"])
feature1.save()
# load
f1 = Feature.load("fg_name1", "f_name1")
print(f1, '\n')
# test query
feature2 = Feature("fg_name1", "f_name2", "double", "f_desc2",
{"f_attr1":"1.0", "f_attr3":"three"}, ["f_tag1", "f_tag3"])
feature2.save()
print("Features having name starting with 'f_name' and tagged with 'f_tag1':")
f_df = Feature.query("name like 'f_name%' and array_contains(tags, 'f_tag1')")
f_df.show()
<class '__main__.Feature'>: {'fid': 'fg_name1_f_name1', 'fgname': 'fg_name1', 'name': 'f_name1', 'ftype': 'integer', 'description': 'f_desc1', 'attrs': {'f_attr2': 'two', 'f_attr1': '1'}, 'tags': ['f_tag1', 'f_tag2']} Features having name starting with 'f_name' and tagged with 'f_tag1': +----------------+--------+-------+-------+-----------+--------------------+----------------+ | fid| fgname| name| type|description| attrs| tags| +----------------+--------+-------+-------+-----------+--------------------+----------------+ |fg_name1_f_name2|fg_name1|f_name2| double| f_desc2|{f_attr1 -> 1.0, ...|[f_tag1, f_tag3]| |fg_name1_f_name1|fg_name1|f_name1|integer| f_desc1|{f_attr1 -> 1, f_...|[f_tag1, f_tag2]| +----------------+--------+-------+-------+-----------+--------------------+----------------+
class Entity:
def __init__(self, etype, record, id_col):
# record is an array of triples (name, type, value)
self.etype = etype
self.record = record
self.id_col = id_col
return
def __str__(self):
return str(self.__class__) + ": " + str(self.__dict__)
def get_schema(record):
schema = StructType()
for f in record:
schema.add(f[0], f[1], True)
return schema
def get_id_type(schema, id_col):
return schema[id_col].dataType.typeName()
def save(self, schema):
fvalues = [f[2] for f in self.record]
inputBuf = [tuple(fvalues)]
inputRDD = spark.sparkContext.parallelize(inputBuf)
inputDF = spark.createDataFrame(inputRDD, schema)
#Write the data frame to Aerospike, the id_col field is used as the key
inputDF.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", self.etype+'-features')\
.option("aerospike.updateByKey", self.id_col) \
.save()
return
def load(etype, eid, schema, id_col):
ent = None
ent_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", etype+'-features') \
.option("aerospike.keyType", "string") \
.load().where(id_col + " = \"" + eid + "\"")
if ent_df.count() > 0:
ent_tuple = ent_df.collect()[0]
record = [(schema[i].name, schema[i].dataType.typeName(), fv) for i, fv in enumerate(ent_tuple)]
ent = Entity(etype, record, id_col)
return ent
def saveDF(df, etype, id_col): # save a dataframe
# df: dataframe consisting of feature values
# etyoe: entity type (such as user or sensor)
# id_col: column name that holds the primary key
#Write the data frame to Aerospike, the column in id_col is used as the key
df.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", etype+'-features')\
.option("aerospike.updateByKey", id_col) \
.save()
return
def query(etype, predicate, schema, id_col): #returns a dataframe
ent_df = spark.read \
.format("aerospike") \
.schema(schema) \
.option("aerospike.set", etype+'-features') \
.option("aerospike.keyType", Entity.get_id_type(schema, id_col)) \
.load().where(predicate)
return ent_df
def get_feature_vector(etype, eid, feature_list): # elements in feature_list are in "fg_name|name" form
# deferred to Model Serving notebook
pass
# Enable set index on cctxn-features
!asinfo -v "set-config:context=namespace;id=test;set=cctxn-features;enable-index=true"
ok
# test Entity
# test save and load
# save
features1 = [('fg1:f_name1', IntegerType(), 1), ('fg1:f_name2', DoubleType(), 2.0), ('fg1:f_name3', StringType(), 'three')]
record1 = [('eid', StringType(), 'eid1')] + features1
ent1 = Entity('entity_type1', record1, 'eid')
schema = Entity.get_schema(record1)
ent1.save(schema);
# load
e1 = Entity.load('entity_type1', 'eid1', schema, 'eid')
print(e1, '\n')
# test query
features2 = [('fg1:f_name1', IntegerType(), 10), ('fg1:f_name2', DoubleType(), 20.0), ('fg1:f_name3', StringType(), 'thirty')]
record2 = [('eid', StringType(), 'eid2')] + features2
ent2 = Entity('entity_type2', record2, 'eid')
ent2.save(schema);
# query 1
print("Instances of entity type entity_type1 with eid ending in 1:")
instances = Entity.query('entity_type1', 'eid like "%1"', schema, 'eid')
instances.show()
# query 2
print("Instances of entity type entity_type2 with eid in ['eid2']:")
instances = Entity.query('entity_type2', 'eid in ("eid2")', schema, 'eid')
instances.show()
<class '__main__.Entity'>: {'etype': 'entity_type1', 'record': [('eid', 'string', 'eid1'), ('fg1:f_name1', 'integer', 1), ('fg1:f_name2', 'double', 2.0), ('fg1:f_name3', 'string', 'three')], 'id_col': 'eid'} Instances of entity type entity_type1 with eid ending in 1: +----+-----------+-----------+-----------+ | eid|fg1:f_name1|fg1:f_name2|fg1:f_name3| +----+-----------+-----------+-----------+ |eid1| 1| 2.0| three| +----+-----------+-----------+-----------+ Instances of entity type entity_type2 with eid in ['eid2']: +----+-----------+-----------+-----------+ | eid|fg1:f_name1|fg1:f_name2|fg1:f_name3| +----+-----------+-----------+-----------+ |eid2| 10| 20.0| thirty| +----+-----------+-----------+-----------+
# test save dataframe
from pyspark.sql import SparkSession, Row
data = [{"ID": 'eid_1', "fg2_feature_1": 1, "fg2_feature_2": 12.40},
{"ID": 'eid_2', "fg2_feature_1": 2, "fg2_feature_2": 30.10},
{"ID": 'eid_3', "fg2_feature_1": 3, "fg2_feature_2": 100.01}
]
# create and save a dataframe
df = spark.createDataFrame([Row(**i) for i in data])
df.show()
Entity.saveDF(df, "etype_1", "ID")
+-----+-------------+-------------+ | ID|fg2_feature_1|fg2_feature_2| +-----+-------------+-------------+ |eid_1| 1| 12.4| |eid_2| 2| 30.1| |eid_3| 3| 100.01| +-----+-------------+-------------+
# test query operation that returns a dataframe
# need to define a schema first
schema = StructType([
StructField('ID', StringType(), False),
StructField('fg2_feature_1', IntegerType(), True),
StructField('fg2_feature_2', DoubleType(), True)])
queryDF = Entity.query('etype_1', 'ID in ("eid_1", "eid_3")', schema, 'ID')
queryDF.show()
+-----+-------------+-------------+ | ID|fg2_feature_1|fg2_feature_2| +-----+-------------+-------------+ |eid_1| 1| 12.4| |eid_3| 3| 100.01| +-----+-------------+-------------+
# Deferred to the second notebook in this series on Model Training.
Let us now see how the feature store objects and operations can be leveraged through the various phases of the ML flow. In this notebook, the focus is on Feature Engineering. Future notebooks will look into Model Training and Model Serving scenarios.
The demo data is abridged from its original version from here. It represents real transactions by European cardholders in 2013. The original dataset has close to 300K transactions, whereas the abdridged version used here contains about a thousand records.
We will illustrate use of the feature store for Feature Engineering through the following sequence:
Read and examine the columns and rows.
The data contains transformed versions of PCA ("V1"-"V28") with 29 feature columns and 1 label ("Class") column.
import pandas as pd
data=pd.read_csv("resources/creditcard_small.csv")
data.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 984 entries, 0 to 983 Data columns (total 32 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 Unnamed: 0 984 non-null int64 1 Time 984 non-null float64 2 V1 984 non-null float64 3 V2 984 non-null float64 4 V3 984 non-null float64 5 V4 984 non-null float64 6 V5 984 non-null float64 7 V6 984 non-null float64 8 V7 984 non-null float64 9 V8 984 non-null float64 10 V9 984 non-null float64 11 V10 984 non-null float64 12 V11 984 non-null float64 13 V12 984 non-null float64 14 V13 984 non-null float64 15 V14 984 non-null float64 16 V15 984 non-null float64 17 V16 984 non-null float64 18 V17 984 non-null float64 19 V18 984 non-null float64 20 V19 984 non-null float64 21 V20 984 non-null float64 22 V21 984 non-null float64 23 V22 984 non-null float64 24 V23 984 non-null float64 25 V24 984 non-null float64 26 V25 984 non-null float64 27 V26 984 non-null float64 28 V27 984 non-null float64 29 V28 984 non-null float64 30 Amount 984 non-null float64 31 Class 984 non-null int64 dtypes: float64(30), int64(2) memory usage: 246.1 KB
The data contains all fraudulent transactions from the original dataset, and the same number of randomly selected non-fraudulent transactions.
total = len(data)
normal = len(data[data.Class == 0])
fraudulent = len(data[data.Class == 1])
fraud_percentage = round(fraudulent/total*100, 2)
print('Total number of transactions: {}'.format(total))
print('Number of normal transactions {}'.format(normal))
print('Number of fraudulent transactions: {}'.format(fraudulent))
Total number of transactions: 984 Number of normal transactions 492 Number of fraudulent transactions: 492
Here is how the data looks:
data.head()
Unnamed: 0 | Time | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | ... | V21 | V22 | V23 | V24 | V25 | V26 | V27 | V28 | Amount | Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 121 | 77.0 | -0.427191 | 0.745708 | 1.761811 | -0.165130 | 0.058298 | -0.213413 | 0.647323 | 0.073464 | ... | -0.201681 | -0.432070 | 0.013164 | 0.161606 | -0.401310 | 0.047423 | 0.102549 | -0.116571 | 9.12 | 0 |
1 | 248296 | 153875.0 | -0.613696 | 3.698772 | -5.534941 | 5.620486 | 1.649263 | -2.335145 | -0.907188 | 0.706362 | ... | 0.319261 | -0.471379 | -0.075890 | -0.667909 | -0.642848 | 0.070600 | 0.488410 | 0.292345 | 0.00 | 1 |
2 | 239 | 160.0 | 1.171439 | 0.474974 | 0.011761 | 1.264303 | 0.116234 | -0.865986 | 0.554393 | -0.276375 | ... | 0.070051 | 0.278843 | -0.097491 | 0.426278 | 0.744938 | -0.274728 | 0.008472 | 0.015492 | 20.00 | 0 |
3 | 239501 | 150139.0 | -6.682832 | -2.714268 | -5.774530 | 1.449792 | -0.661836 | -1.148650 | 0.849686 | 0.433427 | ... | 0.220526 | 1.187013 | 0.335821 | 0.215683 | 0.803110 | 0.044033 | -0.054988 | 0.082337 | 237.26 | 1 |
4 | 143336 | 85285.0 | -6.713407 | 3.921104 | -9.746678 | 5.148263 | -5.151563 | -2.099389 | -5.937767 | 3.578780 | ... | 0.954272 | -0.451086 | 0.127214 | -0.339450 | 0.394096 | 1.075295 | 1.649906 | -0.394905 | 252.92 | 1 |
5 rows × 32 columns
We will perform some simple data transformations:
# rename the index column from the orignal dataset
data = data.rename(columns={"Unnamed: 0": "OldIdx"})
# order the rows by timestamp (original index order)
data = data.sort_values("OldIdx")
data.reset_index(drop=True, inplace=True)
data
OldIdx | Time | V1 | V2 | V3 | V4 | V5 | V6 | V7 | V8 | ... | V21 | V22 | V23 | V24 | V25 | V26 | V27 | V28 | Amount | Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0.0 | -1.359807 | -0.072781 | 2.536347 | 1.378155 | -0.338321 | 0.462388 | 0.239599 | 0.098698 | ... | -0.018307 | 0.277838 | -0.110474 | 0.066928 | 0.128539 | -0.189115 | 0.133558 | -0.021053 | 149.62 | 0 |
1 | 1 | 0.0 | 1.191857 | 0.266151 | 0.166480 | 0.448154 | 0.060018 | -0.082361 | -0.078803 | 0.085102 | ... | -0.225775 | -0.638672 | 0.101288 | -0.339846 | 0.167170 | 0.125895 | -0.008983 | 0.014724 | 2.69 | 0 |
2 | 2 | 1.0 | -1.358354 | -1.340163 | 1.773209 | 0.379780 | -0.503198 | 1.800499 | 0.791461 | 0.247676 | ... | 0.247998 | 0.771679 | 0.909412 | -0.689281 | -0.327642 | -0.139097 | -0.055353 | -0.059752 | 378.66 | 0 |
3 | 3 | 1.0 | -0.966272 | -0.185226 | 1.792993 | -0.863291 | -0.010309 | 1.247203 | 0.237609 | 0.377436 | ... | -0.108300 | 0.005274 | -0.190321 | -1.175575 | 0.647376 | -0.221929 | 0.062723 | 0.061458 | 123.50 | 0 |
4 | 4 | 2.0 | -1.158233 | 0.877737 | 1.548718 | 0.403034 | -0.407193 | 0.095921 | 0.592941 | -0.270533 | ... | -0.009431 | 0.798278 | -0.137458 | 0.141267 | -0.206010 | 0.502292 | 0.219422 | 0.215153 | 69.99 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
979 | 279863 | 169142.0 | -1.927883 | 1.125653 | -4.518331 | 1.749293 | -1.566487 | -2.010494 | -0.882850 | 0.697211 | ... | 0.778584 | -0.319189 | 0.639419 | -0.294885 | 0.537503 | 0.788395 | 0.292680 | 0.147968 | 390.00 | 1 |
980 | 280143 | 169347.0 | 1.378559 | 1.289381 | -5.004247 | 1.411850 | 0.442581 | -1.326536 | -1.413170 | 0.248525 | ... | 0.370612 | 0.028234 | -0.145640 | -0.081049 | 0.521875 | 0.739467 | 0.389152 | 0.186637 | 0.76 | 1 |
981 | 280149 | 169351.0 | -0.676143 | 1.126366 | -2.213700 | 0.468308 | -1.120541 | -0.003346 | -2.234739 | 1.210158 | ... | 0.751826 | 0.834108 | 0.190944 | 0.032070 | -0.739695 | 0.471111 | 0.385107 | 0.194361 | 77.89 | 1 |
982 | 281144 | 169966.0 | -3.113832 | 0.585864 | -5.399730 | 1.817092 | -0.840618 | -2.943548 | -2.208002 | 1.058733 | ... | 0.583276 | -0.269209 | -0.456108 | -0.183659 | -0.328168 | 0.606116 | 0.884876 | -0.253700 | 245.00 | 1 |
983 | 281674 | 170348.0 | 1.991976 | 0.158476 | -2.583441 | 0.408670 | 1.151147 | -0.096695 | 0.223050 | -0.068384 | ... | -0.164350 | -0.295135 | -0.072173 | -0.450261 | 0.313267 | -0.289617 | 0.002988 | -0.015309 | 42.53 | 1 |
984 rows × 32 columns
# rename the columns to include the feature group prefix "CC1"
curr_columns = data.columns
data = data.rename(columns=dict(zip(curr_columns, ["CC1_"+c for c in curr_columns])))
data
CC1_OldIdx | CC1_Time | CC1_V1 | CC1_V2 | CC1_V3 | CC1_V4 | CC1_V5 | CC1_V6 | CC1_V7 | CC1_V8 | ... | CC1_V21 | CC1_V22 | CC1_V23 | CC1_V24 | CC1_V25 | CC1_V26 | CC1_V27 | CC1_V28 | CC1_Amount | CC1_Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0.0 | -1.359807 | -0.072781 | 2.536347 | 1.378155 | -0.338321 | 0.462388 | 0.239599 | 0.098698 | ... | -0.018307 | 0.277838 | -0.110474 | 0.066928 | 0.128539 | -0.189115 | 0.133558 | -0.021053 | 149.62 | 0 |
1 | 1 | 0.0 | 1.191857 | 0.266151 | 0.166480 | 0.448154 | 0.060018 | -0.082361 | -0.078803 | 0.085102 | ... | -0.225775 | -0.638672 | 0.101288 | -0.339846 | 0.167170 | 0.125895 | -0.008983 | 0.014724 | 2.69 | 0 |
2 | 2 | 1.0 | -1.358354 | -1.340163 | 1.773209 | 0.379780 | -0.503198 | 1.800499 | 0.791461 | 0.247676 | ... | 0.247998 | 0.771679 | 0.909412 | -0.689281 | -0.327642 | -0.139097 | -0.055353 | -0.059752 | 378.66 | 0 |
3 | 3 | 1.0 | -0.966272 | -0.185226 | 1.792993 | -0.863291 | -0.010309 | 1.247203 | 0.237609 | 0.377436 | ... | -0.108300 | 0.005274 | -0.190321 | -1.175575 | 0.647376 | -0.221929 | 0.062723 | 0.061458 | 123.50 | 0 |
4 | 4 | 2.0 | -1.158233 | 0.877737 | 1.548718 | 0.403034 | -0.407193 | 0.095921 | 0.592941 | -0.270533 | ... | -0.009431 | 0.798278 | -0.137458 | 0.141267 | -0.206010 | 0.502292 | 0.219422 | 0.215153 | 69.99 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
979 | 279863 | 169142.0 | -1.927883 | 1.125653 | -4.518331 | 1.749293 | -1.566487 | -2.010494 | -0.882850 | 0.697211 | ... | 0.778584 | -0.319189 | 0.639419 | -0.294885 | 0.537503 | 0.788395 | 0.292680 | 0.147968 | 390.00 | 1 |
980 | 280143 | 169347.0 | 1.378559 | 1.289381 | -5.004247 | 1.411850 | 0.442581 | -1.326536 | -1.413170 | 0.248525 | ... | 0.370612 | 0.028234 | -0.145640 | -0.081049 | 0.521875 | 0.739467 | 0.389152 | 0.186637 | 0.76 | 1 |
981 | 280149 | 169351.0 | -0.676143 | 1.126366 | -2.213700 | 0.468308 | -1.120541 | -0.003346 | -2.234739 | 1.210158 | ... | 0.751826 | 0.834108 | 0.190944 | 0.032070 | -0.739695 | 0.471111 | 0.385107 | 0.194361 | 77.89 | 1 |
982 | 281144 | 169966.0 | -3.113832 | 0.585864 | -5.399730 | 1.817092 | -0.840618 | -2.943548 | -2.208002 | 1.058733 | ... | 0.583276 | -0.269209 | -0.456108 | -0.183659 | -0.328168 | 0.606116 | 0.884876 | -0.253700 | 245.00 | 1 |
983 | 281674 | 170348.0 | 1.991976 | 0.158476 | -2.583441 | 0.408670 | 1.151147 | -0.096695 | 0.223050 | -0.068384 | ... | -0.164350 | -0.295135 | -0.072173 | -0.450261 | 0.313267 | -0.289617 | 0.002988 | -0.015309 | 42.53 | 1 |
984 rows × 32 columns
Create a Spark dataframe to save data to Aerospike efficiently through the Aerospike Spark Connector.
sparkDF = spark.createDataFrame(data)
sparkDF.printSchema()
sparkDF.toPandas()
root |-- CC1_OldIdx: long (nullable = true) |-- CC1_Time: double (nullable = true) |-- CC1_V1: double (nullable = true) |-- CC1_V2: double (nullable = true) |-- CC1_V3: double (nullable = true) |-- CC1_V4: double (nullable = true) |-- CC1_V5: double (nullable = true) |-- CC1_V6: double (nullable = true) |-- CC1_V7: double (nullable = true) |-- CC1_V8: double (nullable = true) |-- CC1_V9: double (nullable = true) |-- CC1_V10: double (nullable = true) |-- CC1_V11: double (nullable = true) |-- CC1_V12: double (nullable = true) |-- CC1_V13: double (nullable = true) |-- CC1_V14: double (nullable = true) |-- CC1_V15: double (nullable = true) |-- CC1_V16: double (nullable = true) |-- CC1_V17: double (nullable = true) |-- CC1_V18: double (nullable = true) |-- CC1_V19: double (nullable = true) |-- CC1_V20: double (nullable = true) |-- CC1_V21: double (nullable = true) |-- CC1_V22: double (nullable = true) |-- CC1_V23: double (nullable = true) |-- CC1_V24: double (nullable = true) |-- CC1_V25: double (nullable = true) |-- CC1_V26: double (nullable = true) |-- CC1_V27: double (nullable = true) |-- CC1_V28: double (nullable = true) |-- CC1_Amount: double (nullable = true) |-- CC1_Class: long (nullable = true)
CC1_OldIdx | CC1_Time | CC1_V1 | CC1_V2 | CC1_V3 | CC1_V4 | CC1_V5 | CC1_V6 | CC1_V7 | CC1_V8 | ... | CC1_V21 | CC1_V22 | CC1_V23 | CC1_V24 | CC1_V25 | CC1_V26 | CC1_V27 | CC1_V28 | CC1_Amount | CC1_Class | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0.0 | -1.359807 | -0.072781 | 2.536347 | 1.378155 | -0.338321 | 0.462388 | 0.239599 | 0.098698 | ... | -0.018307 | 0.277838 | -0.110474 | 0.066928 | 0.128539 | -0.189115 | 0.133558 | -0.021053 | 149.62 | 0 |
1 | 1 | 0.0 | 1.191857 | 0.266151 | 0.166480 | 0.448154 | 0.060018 | -0.082361 | -0.078803 | 0.085102 | ... | -0.225775 | -0.638672 | 0.101288 | -0.339846 | 0.167170 | 0.125895 | -0.008983 | 0.014724 | 2.69 | 0 |
2 | 2 | 1.0 | -1.358354 | -1.340163 | 1.773209 | 0.379780 | -0.503198 | 1.800499 | 0.791461 | 0.247676 | ... | 0.247998 | 0.771679 | 0.909412 | -0.689281 | -0.327642 | -0.139097 | -0.055353 | -0.059752 | 378.66 | 0 |
3 | 3 | 1.0 | -0.966272 | -0.185226 | 1.792993 | -0.863291 | -0.010309 | 1.247203 | 0.237609 | 0.377436 | ... | -0.108300 | 0.005274 | -0.190321 | -1.175575 | 0.647376 | -0.221929 | 0.062723 | 0.061458 | 123.50 | 0 |
4 | 4 | 2.0 | -1.158233 | 0.877737 | 1.548718 | 0.403034 | -0.407193 | 0.095921 | 0.592941 | -0.270533 | ... | -0.009431 | 0.798278 | -0.137458 | 0.141267 | -0.206010 | 0.502292 | 0.219422 | 0.215153 | 69.99 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
979 | 279863 | 169142.0 | -1.927883 | 1.125653 | -4.518331 | 1.749293 | -1.566487 | -2.010494 | -0.882850 | 0.697211 | ... | 0.778584 | -0.319189 | 0.639419 | -0.294885 | 0.537503 | 0.788395 | 0.292680 | 0.147968 | 390.00 | 1 |
980 | 280143 | 169347.0 | 1.378559 | 1.289381 | -5.004247 | 1.411850 | 0.442581 | -1.326536 | -1.413170 | 0.248525 | ... | 0.370612 | 0.028234 | -0.145640 | -0.081049 | 0.521875 | 0.739467 | 0.389152 | 0.186637 | 0.76 | 1 |
981 | 280149 | 169351.0 | -0.676143 | 1.126366 | -2.213700 | 0.468308 | -1.120541 | -0.003346 | -2.234739 | 1.210158 | ... | 0.751826 | 0.834108 | 0.190944 | 0.032070 | -0.739695 | 0.471111 | 0.385107 | 0.194361 | 77.89 | 1 |
982 | 281144 | 169966.0 | -3.113832 | 0.585864 | -5.399730 | 1.817092 | -0.840618 | -2.943548 | -2.208002 | 1.058733 | ... | 0.583276 | -0.269209 | -0.456108 | -0.183659 | -0.328168 | 0.606116 | 0.884876 | -0.253700 | 245.00 | 1 |
983 | 281674 | 170348.0 | 1.991976 | 0.158476 | -2.583441 | 0.408670 | 1.151147 | -0.096695 | 0.223050 | -0.068384 | ... | -0.164350 | -0.295135 | -0.072173 | -0.450261 | 0.313267 | -0.289617 | 0.002988 | -0.015309 | 42.53 | 1 |
984 rows × 32 columns
Add an identifier column TxnId. Generate and assign a unique value to it.
# add a new column TxnId for transaction id
# define a udf to update the new TxnId column
import pyspark.sql.functions as F
curr_txn_id = 1
def get_txn_id():
global curr_txn_id
txn_id = str(curr_txn_id)
curr_txn_id += 1
return txn_id
txn_id_udf = F.UserDefinedFunction(get_txn_id, StringType())
sparkDF = sparkDF.withColumn('TxnId', txn_id_udf())
# select the needed columns
sparkDF = sparkDF.select(['TxnId','CC1_Class','CC1_Amount']+['CC1_V'+str(i) for i in range(1,29)])
sparkDF.show(3, truncate=3)
+-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ |TxnId|CC1_Class|CC1_Amount|CC1_V1|CC1_V2|CC1_V3|CC1_V4|CC1_V5|CC1_V6|CC1_V7|CC1_V8|CC1_V9|CC1_V10|CC1_V11|CC1_V12|CC1_V13|CC1_V14|CC1_V15|CC1_V16|CC1_V17|CC1_V18|CC1_V19|CC1_V20|CC1_V21|CC1_V22|CC1_V23|CC1_V24|CC1_V25|CC1_V26|CC1_V27|CC1_V28| +-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ | 1| 0| 149| -1.| -0.| 2.5| 1.3| -0.| 0.4| 0.2| 0.0| 0.3| 0.0| -0.| -0.| -0.| -0.| 1.4| -0.| 0.2| 0.0| 0.4| 0.2| -0.| 0.2| -0.| 0.0| 0.1| -0.| 0.1| -0.| | 2| 0| 2.6| 1.1| 0.2| 0.1| 0.4| 0.0| -0.| -0.| 0.0| -0.| -0.| 1.6| 1.0| 0.4| -0.| 0.6| 0.4| -0.| -0.| -0.| -0.| -0.| -0.| 0.1| -0.| 0.1| 0.1| -0.| 0.0| | 3| 0| 378| -1.| -1.| 1.7| 0.3| -0.| 1.8| 0.7| 0.2| -1.| 0.2| 0.6| 0.0| 0.7| -0.| 2.3| -2.| 1.1| -0.| -2.| 0.5| 0.2| 0.7| 0.9| -0.| -0.| -0.| -0.| -0.| +-----+---------+----------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ only showing top 3 rows
We are ready to save the features in the Spark dataframe to Aerospike.
Of the following three steps, the first two steps are needed only the first time when the feature group and features are created. Subsequent updates will use only the third step.
# 1. Create a feature group.
FG_NAME = 'CC1'
FG_DESCRIPTION = 'Credit card transaction data'
FG_SOURCE = 'European cardholder dataset from Kaggle'
fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE,
attrs={'entity':'cctxn', 'class':'fraud'}, tags=['kaggle', 'demo'])
fg.save()
# 2. Create feature metadata
FEATURE_AMOUNT = 'Amount'
f = Feature(FG_NAME, FEATURE_AMOUNT, 'double', "Transaction amount",
attrs={'entity':'cctxn'}, tags=['usd'])
f.save()
FEATURE_CLASS = 'Class'
f = Feature(FG_NAME, FEATURE_CLASS, 'integer', "Label indicating fraud or not",
attrs={'entity':'cctxn'}, tags=['label'])
f.save()
FEATURE_PCA_XFORM = "V"
for i in range(1,29):
f = Feature(FG_NAME, FEATURE_PCA_XFORM+str(i), 'double', "Transformed version of PCA",
attrs={'entity':'cctxn'}, tags=['pca'])
f.save()
# 3. Save feature values in entity records
ENTITY_TYPE = 'cctxn'
ID_COLUMN = 'TxnId'
Entity.saveDF(sparkDF, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')
Features stored to Feature Store.
Let's issue a query against the saved features to get fraudulent transactions with amount greater than $2000. There appears to be only one such transaction in the demo data.
schema = StructType([StructField(ID_COLUMN, StringType(), False),
StructField(FG_NAME+'_'+FEATURE_CLASS, IntegerType(), False),
StructField(FG_NAME+'_'+FEATURE_AMOUNT, DoubleType(), False)])
for i in range(1,29):
schema.add(FG_NAME+'_'+FEATURE_PCA_XFORM+str(i), DoubleType(), True)
queryDF = Entity.query(ENTITY_TYPE, 'CC1_Class == 1 and CC1_Amount > 2000', schema, ID_COLUMN)
queryDF.show(5, truncate=7)
+-----+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ |TxnId|CC1_Class|CC1_Amount| CC1_V1| CC1_V2| CC1_V3| CC1_V4| CC1_V5| CC1_V6| CC1_V7| CC1_V8| CC1_V9|CC1_V10|CC1_V11|CC1_V12|CC1_V13|CC1_V14|CC1_V15|CC1_V16|CC1_V17|CC1_V18|CC1_V19|CC1_V20|CC1_V21|CC1_V22|CC1_V23|CC1_V24|CC1_V25|CC1_V26|CC1_V27|CC1_V28| +-----+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ | 362| 1| 2125.87|-2.0...|-7.1...|-4.0...|1.30...|-2.0...|-0.0...|2.88...|-0.7...|1.46...|-1.5...|-1.3...|-0.2...|-1.5...|1.07...|0.38...|-0.6...|0.09...|0.33...|0.05...|3.97...|1.24...|-1.0...|-1.8...|0.65...|-0.4...|-0.8...|-0.3...|0.31...| +-----+---------+----------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
Examine the data through aql with the following command:
!aql -c "select * from test.cctxn-features where PK='362'"
select * from test.cctxn-features where PK='362' +-------+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+-------+ | PK | CC1_Amount | CC1_Class | CC1_V1 | CC1_V10 | CC1_V11 | CC1_V12 | CC1_V13 | CC1_V14 | CC1_V15 | CC1_V16 | CC1_V17 | CC1_V18 | CC1_V19 | CC1_V2 | CC1_V20 | CC1_V21 | CC1_V22 | CC1_V23 | CC1_V24 | CC1_V25 | CC1_V26 | CC1_V27 | CC1_V28 | CC1_V3 | CC1_V4 | CC1_V5 | CC1_V6 | CC1_V7 | CC1_V8 | CC1_V9 | TxnId | +-------+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+-------+ | "362" | 2125.87 | 1 | -2.00345953080582 | -1.53160798206082 | -1.39432826167269 | -0.220718797789479 | -1.53099043146804 | 1.0752476539262 | 0.388383209307268 | -0.660655352312646 | 0.0933209955444861 | 0.335742221574637 | 0.0575510393537501 | -7.15904171709445 | 3.97321702726744 | 1.24428677489095 | -1.01523228673153 | -1.80098486605048 | 0.657585626965743 | -0.435617246752788 | -0.894508922176968 | -0.39755738695085 | 0.314261714087509 | -4.05097631587393 | 1.30957974749918 | -2.05810158798669 | -0.0986209270722274 | 2.88008272715204 | -0.727484046608914 | 1.4603805509699 | "362" | +-------+------------+-----------+-------------------+-------------------+-------------------+--------------------+-------------------+-----------------+-------------------+--------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+-----------------+-------+ 1 row in set (0.000 secs) OK
Let's look at another example of feature store use for data engineering, which is taken from another Aerospike Spark notebook.
We will illustrate use of the feature store for Feature Engineering through the following sequence:
# We create age vs salary data, using three different Gaussian distributions
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math
# Make sure we get the same results every time this workbook is run
# Otherwise we are occasionally exposed to results not working out as expected
np.random.seed(12345)
# Create covariance matrix from std devs + correlation
def covariance_matrix(std_dev_1,std_dev_2,correlation):
return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2],
[correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]
# Return a bivariate sample given means/std dev/correlation
def age_salary_sample(distribution_params,sample_size):
mean = [distribution_params["age_mean"], distribution_params["salary_mean"]]
cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"],
distribution_params["age_salary_correlation"])
return np.random.multivariate_normal(mean, cov, sample_size).T
# Define the characteristics of our age/salary distribution
age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000,
"age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3}
age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000,
"age_std_dev":4,"salary_std_dev":8000,"age_salary_correlation":0.7}
age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000,
"age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1}
distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]
# Sample age/salary data for each distributions
sample_size_1 = 100;
sample_size_2 = 120;
sample_size_3 = 80;
sample_sizes = [sample_size_1,sample_size_2,sample_size_3]
group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=sample_size_1)
group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=sample_size_2)
group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=sample_size_3)
ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])
salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])
print("Data created")
Data created
Display simulated age/salary data.
# Plot the sample data
group_1_colour, group_2_colour, group_3_colour ='red','blue', 'pink'
plt.xlabel('Age',fontsize=10)
plt.ylabel("Salary",fontsize=10)
plt.scatter(group_1_ages,group_1_salaries,c=group_1_colour,label="Group 1")
plt.scatter(group_2_ages,group_2_salaries,c=group_2_colour,label="Group 2")
plt.scatter(group_3_ages,group_3_salaries,c=group_3_colour,label="Group 3")
plt.legend(loc='upper left')
plt.show()
# Turn the above records into a Data Frame
# First of all, create an array of arrays
inputBuf = []
for i in range(0, len(ages)) :
id = i + 1 # Avoid counting from zero
name = "Individual: {:03d}".format(id)
# Note we need to make sure values are typed correctly
# salary will have type numpy.float64 - if it is not cast as below, an error will be thrown
age = float(ages[i])
salary = int(salaries[i])
inputBuf.append((id, name,age,salary))
# Convert to an RDD
inputRDD = spark.sparkContext.parallelize(inputBuf)
# Convert to a data frame using a schema
# Note the feature group SAL is prefixed to each feature column
schema = StructType([
StructField("id", IntegerType(), True),
StructField("SAL_name", StringType(), True),
StructField("SAL_age", DoubleType(), True),
StructField("SAL_salary",IntegerType(), True)
])
inputDF=spark.createDataFrame(inputRDD,schema)
inputDF.show(5)
+---+---------------+------------------+----------+ | id| SAL_name| SAL_age|SAL_salary| +---+---------------+------------------+----------+ | 1|Individual: 001| 25.39547052370498| 48976| | 2|Individual: 002|24.314035458986748| 47402| | 3|Individual: 003|26.918958635987888| 59828| | 4|Individual: 004| 25.29664106310324| 50464| | 5|Individual: 005|26.419729731447458| 53845| +---+---------------+------------------+----------+ only showing top 5 rows
# 1. Create a feature group.
FG_NAME = 'SAL'
FG_DESCRIPTION = 'Age salary data'
FG_SOURCE = 'Generated demo data'
fg = FeatureGroup(FG_NAME, FG_DESCRIPTION, FG_SOURCE,
attrs={'access':'all'}, tags=['test'])
fg.save()
# 2. Create features metadata.
FEATURE_NAME = 'name'
f = Feature(FG_NAME, FEATURE_NAME, "string", "Name of the person",
attrs={'unique':'no'}, tags=['test'])
f.save()
FEATURE_AGE = 'age'
f = Feature(FG_NAME, FEATURE_AGE, "double", "Age of the person",
attrs={'range':'0-100'}, tags=['test'])
f.save()
FEATURE_SALARY = 'salary'
f = Feature(FG_NAME, FEATURE_SALARY, 'integer', "Salary of the person",
attrs={'range':'20-999K'}, tags=['test'])
f.save()
# 3. Save feature values in entity records.
ENTITY_TYPE = 'user'
ID_COLUMN = 'id'
Entity.saveDF(inputDF, ENTITY_TYPE, ID_COLUMN)
print('Features stored to Feature Store.')
Features stored to Feature Store.
Issue a query against the saved features to get a dataframe with records in age group 25 to 30. There are 50 such users in the dataset.
queryDF = Entity.query(ENTITY_TYPE, '`SAL_age` between 25 and 30', schema, ID_COLUMN)
print("Number of users of age between 25 and 30: {}".format(queryDF.count()))
queryDF.show(5)
Number of users of age between 25 and 30: 50 +---+---------------+------------------+----------+ | id| SAL_name| SAL_age|SAL_salary| +---+---------------+------------------+----------+ | 85|Individual: 085|26.288287033377657| 59603| | 1|Individual: 001| 25.39547052370498| 48976| | 14|Individual: 014| 25.59043077849547| 51513| | 79|Individual: 079|25.887490702675926| 48162| | 4|Individual: 004| 25.29664106310324| 50464| +---+---------------+------------------+----------+ only showing top 5 rows
Examine the data through aql with the following command:
!aql -c "select * from test.user-features where PK=85"
select * from test.user-features where PK=85 +----+----+-------------------+-------------------+------------+ | PK | id | SAL_age | SAL_name | SAL_salary | +----+----+-------------------+-------------------+------------+ | 85 | 85 | 26.28828703337766 | "Individual: 085" | 59603 | +----+----+-------------------+-------------------+------------+ 1 row in set (0.001 secs) OK
In this notebook, we explored how Aerospike can be used as a Feature Store for ML applications. Specifically, we showed how features engineered using the Spark platform can be efficiently stored in Aerospike feature store via the Aerospike Spark Connector. We implemented a simple example feature store interface that leverages the Aerospike Spark connector capabilities for this purpose. We used the API to save and query features created in two data engineering examples.
This is a first in the series of notebooks on how Aerospike can be used as a feature store. Subseequent notebooks in this series will explore use of Aerospike Feature Store for Model Training and Model Serving.
Close the spark session, and remove the tutorial data by executing the cell below.
try:
spark.stop()
except:
; # ignore
# To remove all data in the namespace test, uncomment the following line and run:
#!aql -c "truncate test"
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.