This notebook is an adjunct to Feature Store series of notebooks, and shows how to construct "pushdown expressions" for use in Aerospike Spark Connector.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
Aerispike Expressions are filters or predicates that are used in a scan to select results. The Spark Connector allows the external "base64" representation of an expression to be specified. This expression is pushed down to the database for evaluation, resulting in the exact records being returned in a Spark dataframe. Contrast this to the "where" predicate in which only part of the predicate may be pushed down, and the rest computed on Spark. This can result in a very large number of records returned for further filtering on Spark.
Also, many Aerospike filter expressions cannot be specified using the "where" predicate (for example, record metadata based predicates), and in such cases expressions must be used.
Currently the external base64 representation of an expression is not available from the Python client. Hence we must use the Java client to obtain it. It can then be used in the Spark Connector's aerospike.pushdown.expressions
option.
We will describe how base64 representation of an expression is obtained using the Java client with some examples. This notebook can be used to derive other pushdown expressions following the prescribed pattern.
The main topics in this notebook include:
This tutorial assumes familiarity with the following topics:
This notebook requires that Aerospike database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Install the Aerospike Java client version 5.1.3 (or higher) that has the support for expressions.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.1.3</version>
</dependency>
</dependencies>
Initialize the client. Also, define constants including the namespace test
and set pushdown-expressions
and a convenient function truncateTestData
.
import com.aerospike.client.AerospikeClient;
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
Initialized the client and connected to the cluster.
Ensure your test data is populated in the database. The code examples below assume the data from the Model Training with Aerospike notebook. You can modify the namespace, set, and other parameters to suit your examples. You can also create other expressions on your own to use with the Spark Connector.
Write the filter expression in Java.
Test the expression.
Obtain the base64 representation of the expression.
Transfer the base64 representation for use in Spark Connector.
Below are four code examples that illustrate the recipe described above.
// imports
import java.util.ArrayList;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;
import com.aerospike.client.exp.ListExp;
import com.aerospike.client.exp.MapExp;
import com.aerospike.client.cdt.ListReturnType;
import com.aerospike.client.cdt.MapReturnType;
import com.aerospike.client.query.RegexFlag;
import com.aerospike.client.query.Statement;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.Record;
// The examples assume the data from in Model Training notebook.
final String Namespace = "test";
final String Set = "feature-metadata";
// test function
void filterRecords(String namespace, String set, Expression expr) {
Statement stmt = new Statement();
stmt.setNamespace(namespace);
stmt.setSetName(set);
QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);
policy.filterExp = expr;
RecordSet rs = client.query(policy, stmt);
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
rs.close();
}
Filter records by one, none, all, any, or a specified number of matching elements in its list bin.
Assuming we want to filter records having one or more of the specific tags from the list bin tags
, the logical expression would look something like:
Exp.GT(
ListExp.getByValueList(None, ReturnType.COUNT,
Exp.val(specific_tags), exp.ListBin("tags"))),
Exp.val(0)
)
The outer expression compares for the value returned from the first argument to be greater than 0. The first argument is the count of matching tags from the list specific_tags in the list bin tags
.
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
Exp.gt(
ListExp.getByValueList(ListReturnType.COUNT,
Exp.val(new ArrayList<String>(Arrays.asList("label","f_tag1"))), Exp.listBin("tags")),
Exp.val(0)));
// 2. Test the expression.
System.out.println("Results of filter expression query (with either label or f_tag1 in the tags bin):");
filterRecords(Namespace, Set, expr);
Results of filter expression query (with either label or f_tag1 in the tags bin): key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer} key=null bins={attrs={entity=cctxn}, description=Label indicating fraud or not, fgname=CC1, fid=CC1_Class, name=Class, tags=[label], type=integer} key=null bins={attrs={f_attr1=1.0, f_attr3=three}, description=f_desc2, fgname=fg_name1, fid=fg_name1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double} key=null bins={attrs={etype=etype1, f_attr1=v2}, description=f_desc2, fgname=fgname1, fid=fgname1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double} key=null bins={attrs={f_attr1=1, f_attr2=two}, description=f_desc1, fgname=fg_name1, fid=fg_name1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;
// 4. Transfer the base64 representation for use in Spark Connector.
// You can use the base64 string in the Spark Connector's
// "option.aerospike.pushdown.expressions" option.
Base64: kwOVfwIAkxcFkn6SpgNsYWJlbKcDZl90YWcxk1EEpHRhZ3MA
Filter records by a key=value in its map bin.
Assuming we want to filter records having a key "k" with value "v" from the map bin attrs
, the logical expression would look something like:
MapExp.getByKey(MapReturnType.VALUE,
Exp.Type.STRING, Exp.val("k"), Exp.mapBin("attrs")),
Exp.val("v"))
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
Exp.eq(MapExp.getByKey(MapReturnType.VALUE,
Exp.Type.STRING, Exp.val("f_attr1"), Exp.mapBin("attrs")),
Exp.val("v1")));
// 2. Test the expression.
System.out.println("Results of filter expression query (with a f_attr1=v1 in attrs bin):");
filterRecords(Namespace, Set, expr);
Results of filter expression query (with a f_attr1=v1 in attrs bin): key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer}
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;
// 4. Transfer the base64 representation for use in Spark Connector.
// You can use the base64 string in the Spark Connector's
// "option.aerospike.pushdown.expressions" option.
Base64: kwGVfwMAk2EHqANmX2F0dHIxk1EFpWF0dHJzowN2MQ==
Filter records matching a pattern in its string bin.
Assuming we want to filter records matching a pattern of a prefix and a suffix in a string bin name
, the logical expression would look something like:
Exp.regexCompare("^prefix.*suffix$", RegexFlag.ICASE, Exp.stringBin("name"))
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
Exp.regexCompare("^C.*2$", RegexFlag.ICASE, Exp.stringBin("fid")));
// 2. Test the expression.
System.out.println("Results of filter expression query (with a value starting with a C and ending in a 2 in the fid bin):");
filterRecords(Namespace, Set, expr);
Results of filter expression query (with a value starting with a C and ending in a 2 in the fid bin): key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V2, name=V2, tags=[pca], type=double} key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V12, name=V12, tags=[pca], type=double} key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V22, name=V22, tags=[pca], type=double}
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;
// 4. Transfer the base64 representation for use in Spark Connector.
// You can use the base64 string in the Spark Connector's
// "option.aerospike.pushdown.expressions" option.
Base64: lAcCpl5DLioyJJNRA6NmaWQ=
Let's create a composite filters by OR'ing all the above filters. You can similarly assemble a variety of composite filters to suit your needs.
// 1. Write the filter expression in Java.
Expression expr = Exp.build(
Exp.or(
Exp.or(
Exp.gt(
ListExp.getByValueList(ListReturnType.COUNT,
Exp.val(new ArrayList<String>(Arrays.asList("label","f_tag1"))), Exp.listBin("tags")),
Exp.val(0)),
Exp.eq(MapExp.getByKey(MapReturnType.VALUE,
Exp.Type.STRING, Exp.val("f_attr1"), Exp.mapBin("attrs")),
Exp.val("v1"))),
Exp.regexCompare("^C.*2$", RegexFlag.ICASE, Exp.stringBin("fid"))));
// 2. Test the expression.
System.out.println("Results of filter expression query (OR'ing all above expressions:");
filterRecords(Namespace, Set, expr);
Results of filter expression query (OR'ing all above expressions: key=null bins={attrs={entity=cctxn}, description=Label indicating fraud or not, fgname=CC1, fid=CC1_Class, name=Class, tags=[label], type=integer} key=null bins={attrs={etype=etype1, f_attr1=v1}, description=f_desc1, fgname=fgname1, fid=fgname1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer} key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V2, name=V2, tags=[pca], type=double} key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V12, name=V12, tags=[pca], type=double} key=null bins={attrs={f_attr1=1.0, f_attr3=three}, description=f_desc2, fgname=fg_name1, fid=fg_name1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double} key=null bins={attrs={etype=etype1, f_attr1=v2}, description=f_desc2, fgname=fgname1, fid=fgname1_f_name2, name=f_name2, tags=[f_tag1, f_tag3], type=double} key=null bins={attrs={f_attr1=1, f_attr2=two}, description=f_desc1, fgname=fg_name1, fid=fg_name1_f_name1, name=f_name1, tags=[f_tag1, f_tag2], type=integer} key=null bins={attrs={entity=cctxn}, description=Transformed version of PCA, fgname=CC1, fid=CC1_V22, name=V22, tags=[pca], type=double}
// 3. Obtain the base64 representation of the expression.
System.out.format("Base64: %s\n", expr.getBase64());;
// 4. Transfer the base64 representation for use in Spark Connector.
// You can use the base64 string in the Spark Connector's
// "option.aerospike.pushdown.expressions" option.
Base64: kxGTEZMDlX8CAJMXBZJ+kqYDbGFiZWynA2ZfdGFnMZNRBKR0YWdzAJMBlX8DAJNhB6gDZl9hdHRyMZNRBaVhdHRyc6MDdjGUBwKmXkMuKjIkk1EDo2ZpZA==
Following the pattern above and examples in the resources at the end, you can create your own expressions to test and then use in the Spark Connector's pushdown option.
// 1. Write the filter expression in Java.
// 2. Test the expression.
// 3. Obtain the base64 representation of the expression.
// 4. Transfer the base64 representation for use in Spark Connector.
The tutorial described how base64 representation of Aerospike expressions is obtained for use in the Aerospike Spark Connector with many examples.
Use of pushdown expressions is desirable and sometimes necessary because of the efficiency and unique functionality they provide.
Currently the external base64 representation of an expression is not available from the Python client. Hence we must use the Java client to obtain it. It can then be used in the Spark Connector's aerospike.pushdown.expressions
option.
Use the resources listed below to write and test your expressions in this notebook (or any other Java client enviromment) for use with the Spark Connector.
Close the server connection.
client.close();
System.out.println("Closed server connection.");
Closed server connection.
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.