Aerospike queries allow filtering based on a predicate and User Defined Functions (UDFs) offer arbitrary server side processing. This notebook illustrates how a query and a UDF can be combined in a useful pattern. Two examples are given: the first with a query with a UDF aggregate function, and the second with a query, predicate expression and a UDF update function. The code for the first example is also available in this repo.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
This notebook requires that Aerospike Database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Here the Java client version 5.0.0 is installed.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
The test data has ten records with user-key 1 through 10, two bins (fields) "binint" and "binstr", in the namespace "test" and set "demo". The two bins are initialized with the user key and a string of the form "(id). (name)".
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
Initialized the client and connected to the cluster.
import com.aerospike.client.policy.ClientPolicy;
String Namespace = "test";
String Set = "demo";
String BinInt = "binint";
String BinStr = "binstr";
int NumRecords = 10;
String Names[] = {"1. Clark", "2. Keenan", "3. Smith", "4. Jones", "5. Clark",
"6. Jones", "7. Iyer", "8. Smith", "9. Hernandez", "10. Smith"};
ClientPolicy policy = new ClientPolicy();
for (int i = 1; i <= NumRecords; i++) {
Key key = new Key(Namespace, Set, i);
Bin bin1 = new Bin(BinInt, i);
Bin bin2 = new Bin(BinStr, Names[i-1]);
client.put(policy.writePolicyDefault, key, bin1, bin2);
}
System.out.format("Written %d records in ns=%s set=%s with userkeys 1-%d.",
NumRecords, Namespace, Set, NumRecords);
Written 10 records in ns=test set=demo with userkeys 1-10.
java.io.PrintStream@3c8819ec
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "idx_numeric_test_demo_binint";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
BinInt, IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, BinInt);
Created index idx_numeric_test_demo_binint on ns=test set=demo bin=binint.
java.io.PrintStream@3c8819ec
In the first example, we will demonstrate how an aggregate function (sum) is calculated over a stream of records returned by a query. First we will create the aggregate function in an UDF module. A UDF function like a stored procedure that is executed on the server - on all nodes of the cluster. All streams of the partial node-specific answers are then combined locally using the same UDF function. For this reason, the UDF module must be registered to the server for the first phase of parallel processing across all node, and also available locally for the final phase of aggregation.
Examine the following Lua code that is aggregating (reducing) a stream of records into a sum of bin values. Create the UDF module "sum_example.lua" in "udf" directory.
-- sum_example.lua local function reducer(val1,val2) return val1 + val2 end function sum_single_bin(stream,name) local function mapper(rec) return rec[name] end return stream : map(mapper) : reduce(reducer) end
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileWriter;
void CreateUDFModule(String name, String code) {
try {
if (!Files.exists(Paths.get("./udf"))) {
Files.createDirectory(Paths.get("./udf"));
}
FileWriter fw = new FileWriter("./udf/" + name);
fw.write(luaCode);
fw.close();
}
catch (Exception e) {
System.out.format("Failed to create Lua module %s, exception: %s.",
"udf/"+name, e);
}
}
// Execute this cell to create UDF module "udf/sum_example.lua" <pre>
// To execute, first convert the cell type from markdown to code.
String luaCode =
"-- sum_example.lua" + "\n" +
"" + "\n" +
"local function reducer(val1,val2)" + "\n" +
" return val1 + val2" + "\n" +
"end" + "\n" +
"" + "\n" +
"function sum_single_bin(stream,name)" + "\n" +
" local function mapper(rec)" + "\n" +
" return rec[name]" + "\n" +
" end" + "\n" +
" return stream : map(mapper) : reduce(reducer)" + "\n" +
"end";
CreateUDFModule("sum_example.lua", luaCode);
System.out.format("Lua module %s created.", "udf/sum_example.lua"); // </pre>
Lua module udf/sum_example.lua created.
java.io.PrintStream@3c8819ec
Register the lua module for the aggregate function with the server.
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
String UDFDir = "./udf";
String UDFFile = "sum_example.lua";
RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile, UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);
Registered the UDF module sum_example.lua.
java.io.PrintStream@3c8819ec
The query statement includes elements such as namespace, set, bins to retrieve, and filter or predicate.
import com.aerospike.client.query.Statement;
import com.aerospike.client.query.Filter;
import com.aerospike.client.Value;
int begin = 4;
int end = 7;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setBinNames(BinInt, BinStr);
stmt.setFilter(Filter.range(BinInt, begin, end));
System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d",
Namespace, Set, BinInt, begin, end);
Query on ns=test set=demo, with bin binint >= 4 <= 7
java.io.PrintStream@3c8819ec
Include the aggregate processing and its parameters in the query statement.
String UDFModule = "sum_example";
String UDFFunction = "sum_single_bin";
stmt.setAggregateFunction(UDFModule, UDFFunction, Value.get(BinInt));
System.out.format("Aggregate function %s added for server processing.", UDFFunction);
Aggregate function sum_single_bin added for server processing.
java.io.PrintStream@3c8819ec
Let's now execute the query.
import com.aerospike.client.query.ResultSet;
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed the query with UDF; got results.");
Executed the query with UDF; got results.
The expected sum for the records from 4 to 7 (both inclusive) is 4+5+6+7 = 22.
System.out.println("Processing results:");
try {
int expected = 22; // 4 + 5 + 6 + 7
int count = 0;
while (rs.next()) {
Object object = rs.getObject();
long sum;
if (object instanceof Long) {
sum = (Long)rs.getObject();
}
else {
System.out.println("Return value not a long: " + object);
continue;
}
if (expected == (int)sum) {
System.out.format("Sum matched! Value=%d.", expected);
}
else {
System.out.format("Sum mismatch: Expected %d. Received %d.", expected, (int)sum);
}
count++;
}
if (count == 0) {
System.out.println("Query failed. No records returned.");
}
}
finally {
rs.close();
}
Processing results: Sum matched! Value=22.
We will illustrate an update UDF function with a query and predicate expression.
Let's say we want to:
Records with user-keys 3, 4, 6 and 8 meet these conditions.
This update can be achieved in different ways using a combination of query, predicate expression, and UDF. For the purpose of this exercise, we use a query with the "between" predicate, a predicate expression for string comparison, and a UDF to update the integer bin.
Let's start defining them one by one starting with a new UDF.
Examine the code below, It simply multiplies a bin value by the input factor and updates the record.
-- update_example.lua function multiplyBy(rec, binName, factor) rec[binName] = rec[binName] * factor aerospike:update(rec) end
// Execute this cell to create UDF module "udf/update_example.lua" <pre>
// To execute, first convert the cell type from markdown to code.
String luaCode =
"-- update_example.lua" + "\n" +
"" + "\n" +
"function multiplyBy(rec, binName, factor)" + "\n" +
" rec[binName] = rec[binName] * factor" + "\n" +
" aerospike:update(rec)" + "\n" +
"end";
CreateUDFModule("update_example.lua", luaCode);
System.out.format("Lua module %s created.", "udf/update_example.lua"); //</pre>
Lua module udf/update_example.lua created.
java.io.PrintStream@3c8819ec
String UDFFile = "update_example.lua";
RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);
Registered the UDF module update_example.lua.
java.io.PrintStream@3c8819ec
Specify the namespace, set, bins, and query filter.
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setBinNames(BinInt, BinStr);
int begin = 3;
int end = 9;
// Filter is evaluated using a secondary index and therefore can only reference an indexed bin.
stmt.setFilter(Filter.range(BinInt, begin, end));
System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d",
Namespace, Set, BinInt, begin, end);
Query on ns=test set=demo, with bin binint >= 3 <= 9
java.io.PrintStream@3c8819ec
In addition to the predicate in the query (which requires a secondary index), additional filtering can be specified using a predicate expression. A predicate expression is specified as part of the request policy and does not require a secondary index. It is evaluated on each record returned after applying the query predicate, and only the records that evaluate True are processed further (in this case for update with the UDF function).
Here the predicate expression is the string bin has either "smith" or "jones" in it. We use an expression with an OR clause that combines two regular expression matches.
// Predicate Expressions are applied on query results on server side.
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.query.RegexFlag;
WritePolicy policy = new WritePolicy(client.writePolicyDefault);
policy.filterExp = Exp.build(
Exp.or(
Exp.regexCompare(".*smith.*", RegexFlag.ICASE, Exp.stringBin(BinStr)),
Exp.regexCompare(".*jones.*", RegexFlag.ICASE, Exp.stringBin(BinStr))));
System.out.format("Predicate Expression: (valstr ilike '%%smith%%' || valstr ilike '%%jones%%')");
Predicate Expression: (valstr ilike '%smith%' || valstr ilike '%jones%')
java.io.PrintStream@3c8819ec
// Execute the update UDF function on records that match the statement filter and policy filter.
// Records are not returned to the client. This asynchronous server call will return
// before the command is complete. The user can optionally wait for command completion
// by using the returned ExecuteTask instance.
import com.aerospike.client.task.ExecuteTask;
import com.aerospike.client.Value;
int MultiplicationFactor = 5;
ExecuteTask task = client.execute(policy, stmt, "update_example", "multiplyBy",
Value.get(BinInt), Value.get(MultiplicationFactor));
task.waitTillComplete(3000, 0); // poll time 3s, no timeout
System.out.format("Executed the query and filter expression and applied UDF update to records.");
Executed the query and filter expression and applied UDF update to records.
java.io.PrintStream@3c8819ec
Remember records 3, 4, 6, and 8 should have received the update, that is, their binint values should be multipled by the specified factor (5).
import com.aerospike.client.Record;
for (int i = 1; i <= NumRecords; i++) {
Key key = new Key(Namespace, Set, i);
Record record = client.get(null, key, BinInt, BinStr);
System.out.println(record);
}
(gen:1),(exp:351567276),(bins:(binint:1),(binstr:1. Clark)) (gen:1),(exp:351567276),(bins:(binint:2),(binstr:2. Keenan)) (gen:2),(exp:351567283),(bins:(binint:15),(binstr:3. Smith)) (gen:2),(exp:351567283),(bins:(binint:20),(binstr:4. Jones)) (gen:1),(exp:351567276),(bins:(binint:5),(binstr:5. Clark)) (gen:2),(exp:351567283),(bins:(binint:30),(binstr:6. Jones)) (gen:1),(exp:351567276),(bins:(binint:7),(binstr:7. Iyer)) (gen:2),(exp:351567283),(bins:(binint:40),(binstr:8. Smith)) (gen:1),(exp:351567276),(bins:(binint:9),(binstr:9. Hernandez)) (gen:1),(exp:351567276),(bins:(binint:10),(binstr:10. Smith))
client.dropIndex(null, Namespace, Set, IndexName);
client.close();
System.out.println("Index dropped and server connection closed.");
Index dropped and server connection closed.
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.