This notebook includes examples of how to interact with the Online Feature Store in Hopsworks. The online feature store stores a subset of the feature data for real-time queries, suited for serving client-facing models.
The online feature store contrasts to the offline feature store. The offline feature store contains historical data. The offline feature data is stored in Hive, a storage engine suited for large scale batch processing of data (such as training a machine learning model). On the other hand, the online feature store uses MySQL-Cluster database as the backend, a storage engine suited for smaller datasets that need to be queried in real-time.
import io.hops.util.Hops
import scala.collection.JavaConversions._
import collection.JavaConverters._
import org.apache.spark.sql.Row
import java.sql.Date
import java.sql.Timestamp
import spark.implicits._
import org.apache.spark.sql.types._
Starting Spark application
SparkSession available as 'spark'. import io.hops.util.Hops import scala.collection.JavaConversions._ import collection.JavaConverters._ import org.apache.spark.sql.Row import java.sql.Date import java.sql.Timestamp import spark.implicits._ import org.apache.spark.sql.types._
If your project's feature store has the online feature store enabled, there will be a storage connector for each user to access the online feature store. The storage connector can be accessed using the utility method getOnlineFeaturestoreConnector()
in the Scala SDK. The storage connector includes information about the JDBC connection, the password, port, host, and username etc.
Hops.getOnlineFeaturestoreConnector.read
res1: io.hops.util.featurestore.dtos.storageconnector.FeaturestoreJdbcConnectorDTO = FeaturestoreJdbcConnectorDTO{connectionString='jdbc:mysql://10.0.2.15:3306/demo_featurestore_admin000', arguments='password=YTnQHFxNHwMlEpZboyJCgpZFSqyyKgQHXnUHJzSrVNhOslGRqKifTmzvRnhudipF,user=demo_featurestore_admin000_meb1'}
When a feature group has online feature serving enabled, it means that its data will be stored in both Hive (for historical queries) and MySQL Cluster (for online queries). To enable online feature serving of a feature group simply set the flag online=True
when creating a feature group, as illustrated below.
val sampleData = Seq(
Row(1, Date.valueOf("2019-02-30"), 0.4151f, "Sweden"),
Row(2, Date.valueOf("2019-05-01"), 1.2151f, "Ireland"),
Row(3, Date.valueOf("2019-08-06"), 0.2151f, "Belgium"),
Row(4, Date.valueOf("2019-08-06"), 0.8151f, "Russia")
)
val schema =
scala.collection.immutable.List(
StructField("id", IntegerType, true),
StructField("date", DateType, true),
StructField("value", FloatType, true),
StructField("country", StringType, true)
)
val sampleDf = spark.createDataFrame(
spark.sparkContext.parallelize(sampleData),
StructType(schema)
)
sampleData: Seq[org.apache.spark.sql.Row] = List([1,2019-03-02,0.4151,Sweden], [2,2019-05-01,1.2151,Ireland], [3,2019-08-06,0.2151,Belgium], [4,2019-08-06,0.8151,Russia]) schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true)) sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
sampleDf.show(5)
+---+----------+------+-------+ | id| date| value|country| +---+----------+------+-------+ | 1|2019-03-02|0.4151| Sweden| | 2|2019-05-01|1.2151|Ireland| | 3|2019-08-06|0.2151|Belgium| | 4|2019-08-06|0.8151| Russia| +---+----------+------+-------+
(Hops.createFeaturegroup("test_online_fg_scala_sdk")
.setDataframe(sampleDf)
.setOnline(true)
.setPrimaryKey(List("id"))
.write())
When creating a feature group, the spark dataframe is used to infer the data-schema and the feature types. The data schema is then used to create a Hive table (for offline data) and a MySQL table (for online data). If you want to have more control over the feature types for the MySQL table (e.g length of a varchar column) you can pass in the types in the optional argument onlineTypes
, which takes a map of the form feature_name --> feature_type
.
val onlineTypes = Map[String, String](
"value" -> "DECIMAL"
)
val sampleDf2 = sampleDf.withColumnRenamed(
"value", "value_test").withColumnRenamed(
"country", "country_test").withColumnRenamed(
"date", "date_test")
(Hops.createFeaturegroup("test_online_fg_scala_sdk_types")
.setDataframe(sampleDf2)
.setOnline(true)
.setPrimaryKey(List("id"))
.setOnlineTypes(onlineTypes)
.write())
onlineTypes: scala.collection.immutable.Map[String,String] = Map(value -> DECIMAL) sampleDf2: org.apache.spark.sql.DataFrame = [id: int, date_test: date ... 2 more fields]
The same methods for reading the offline feature store can be used to read from the online feature store by setting the argument online=True
. However, NOTE: as the online feature store is supposed to be used for feature serving, it should be queried with primary-key lookups for getting the best performance. In fact, it is highly discouraged to use the online feature serving for doing full-table-scans. If you find yourself frequently needing to use get_featuregroup(online=True)
to get the entire feature group (full-table scan), you are probably better of using the offline feature store. The online feature store is intended for quick primary key lookups, not data analysis.
To make the migration from the regular offline-featurestore API to the online-featurestore simple, for each example of reading from the online featurestore below, there is an accompanying example of reading from the offline feature store.
Featuregroups are stored as tables with the naming featuregroupname_version
as Hive tables for offline features, and MySQL tables for online features.
//primary key lookup in MySQL
val df = (Hops.queryFeaturestore("SELECT country FROM test_online_fg_scala_sdk_1 WHERE id=1")
.setOnline(true).read)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]
df.show(5)
+-------+ |country| +-------+ | Sweden| +-------+
//primary key lookup in MySQL
val df = (Hops.queryFeaturestore("SELECT country FROM test_online_fg_scala_sdk_1 WHERE id=1")
.setOnline(false).read)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]
df.show(5)
+-------+ |country| +-------+ | Sweden| +-------+
val df = Hops.getFeaturegroup("test_online_fg_scala_sdk").setOnline(true).read
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, date: date ... 2 more fields]
df.show(5)
+---+----------+------+-------+ | id| date| value|country| +---+----------+------+-------+ | 1|2019-03-02|0.4151| Sweden| | 3|2019-08-06|0.2151|Belgium| | 4|2019-08-06|0.8151| Russia| | 2|2019-05-01|1.2151|Ireland| +---+----------+------+-------+
val df = Hops.getFeaturegroup("test_online_fg_scala_sdk").setOnline(false).read
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, date: date ... 2 more fields]
df.show(5)
+---+----------+------+-------+ | id| date| value|country| +---+----------+------+-------+ | 1|2019-03-02|0.4151| Sweden| | 2|2019-05-01|1.2151|Ireland| | 3|2019-08-06|0.2151|Belgium| | 4|2019-08-06|0.8151| Russia| +---+----------+------+-------+
val df = Hops.getFeature("country").setOnline(true).read
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]
df.show(5)
+-------+ |country| +-------+ | Sweden| |Belgium| | Russia| |Ireland| +-------+
val df = Hops.getFeature("country").setOnline(false).read
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string]
df.show(5)
+-------+ |country| +-------+ | Sweden| |Ireland| |Belgium| | Russia| +-------+
The featues can potentially span multiple feature groups, as long as all feature groups have online serving enabled, the feature store query planner will join the features on the fly.
val features = List("country", "date")
val df = Hops.getFeatures(features).setOnline(true).read
features: List[String] = List(country, date) df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, date: date]
df.show(5)
+-------+----------+ |country| date| +-------+----------+ | Sweden|2019-03-02| |Belgium|2019-08-06| | Russia|2019-08-06| |Ireland|2019-05-01| +-------+----------+
val features = List("country", "date")
val df = Hops.getFeatures(features).setOnline(false).read
features: List[String] = List(country, date) df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [country: string, date: date]
df.show(5)
+-------+----------+ |country| date| +-------+----------+ | Sweden|2019-03-02| |Ireland|2019-05-01| |Belgium|2019-08-06| | Russia|2019-08-06| +-------+----------+
Hops.getFeaturegroups.setOnline(true).read
res1: java.util.List[String] = [test_online_fg_scala_sdk_types_1, online_featuregroup_test_types_1, test_online_fg_scala_sdk_1]
Hops.getFeaturesList.setOnline(true).read
res2: java.util.List[String] = [country_test, date_test, id, value_test, id, val_1_type_test, val_2_type_test, country, date, id, value]
By default when a feature group is created with create_featuregroup()
, the feature group will not have online serving enabled, all data will be stored in the offline feature group (Hive). To create a feature group with online serving, pass the flag online=True
to create_featuregroup()
(an example is provided in the beginning of this notebook).
If you want to enable online feature serving for a feature group dynamically, after the feature group have been created, you can use the API call enable_featuregroup_online
(this will create a MySQL table in the backend). Conversely, if you want to disable online feature serving, use the API call disable_featuregroup_online
(this will drop the MySQL table in the backend).
val sampleData = Seq(
Row(1, 0.4151f, 0.915f),
Row(2, 1.2151f, 0.151f),
Row(3, 0.2151f, 0.7511f),
Row(4, 0.8151f, 0.12541f)
)
val schema =
scala.collection.immutable.List(
StructField("id", IntegerType, true),
StructField("test_col_3", FloatType, true),
StructField("test_col_4", FloatType, true)
)
val sampleDf = spark.createDataFrame(
spark.sparkContext.parallelize(sampleData),
StructType(schema)
)
(Hops.createFeaturegroup("enable_online_features_test_scala_sdk")
.setDataframe(sampleDf)
.setPrimaryKey(List("id"))
.write())
sampleData: Seq[org.apache.spark.sql.Row] = List([1,0.4151,0.915], [2,1.2151,0.151], [3,0.2151,0.7511], [4,0.8151,0.12541]) schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(test_col_3,FloatType,true), StructField(test_col_4,FloatType,true)) sampleDf: org.apache.spark.sql.DataFrame = [id: int, test_col_3: float ... 1 more field]
Hops.enableFeaturegroupOnline("enable_online_features_test_scala_sdk").write
Hops.disableFeaturegroupOnline("enable_online_features_test_scala_sdk").write
When inserting data into a feature group you can control whether the data should be written only to the offline feature group, only to the online feature group, or to both, using the parameters online=True
and offline=True
:
val sampleData = Seq(
Row(5, Date.valueOf("2015-02-30"), 2.001f, "Iran"),
Row(6, Date.valueOf("2016-05-01"), 3.2171f, "Canada"))
val schema =
scala.collection.immutable.List(
StructField("id", IntegerType, true),
StructField("date", DateType, true),
StructField("value", FloatType, true),
StructField("country", StringType, true)
)
val sampleDf = spark.createDataFrame(
spark.sparkContext.parallelize(sampleData),
StructType(schema)
)
sampleData: Seq[org.apache.spark.sql.Row] = List([5,2015-03-02,2.001,Iran], [6,2016-05-01,3.2171,Canada]) schema: List[org.apache.spark.sql.types.StructField] = List(StructField(id,IntegerType,true), StructField(date,DateType,true), StructField(value,FloatType,true), StructField(country,StringType,true)) sampleDf: org.apache.spark.sql.DataFrame = [id: int, date: date ... 2 more fields]
(Hops.insertIntoFeaturegroup("test_online_fg_scala_sdk")
.setDataframe(sampleDf)
.setMode("append")
.setOnline(true)
.setOffline(false)
.write())
(Hops.insertIntoFeaturegroup("test_online_fg_scala_sdk")
.setDataframe(sampleDf)
.setMode("append")
.setOnline(false)
.setOffline(true)
.write())
(Hops.insertIntoFeaturegroup("test_online_fg_scala_sdk")
.setDataframe(sampleDf)
.setMode("overwrite")
.setOnline(true)
.setOffline(true)
.write())