This tutorial is Part 1 of how to implement SQL aggregate queries in Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
In this notebook, we will see how specific aggregate statements in SQL can be implemented in Aerospike.
SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL aggregate queries. You should be able to understand them and find them useful even without deep familiarity with SQL.
This notebook is the second in the SQL Operations series that consists of the following notebooks:
The specific topics and aggregate functions we discuss include:
In Part 2, we describe GROUP BY processing and some additional aggregates.
The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.
All UDF functions for this notebook are placed in "aggregate_fns.lua" file under the "udf" subdirectory. If the subdirectory or file is not there, you may download the file from here and place it there using the notebook's File->Open followed by Upload/New menus.
You are encouraged to experiment with the Lua code in the module. Be sure to save your changes and then run the convenience function "registerUDF()" in a code cell for the changes to take effect.
This tutorial assumes familiarity with the following topics:
This notebook requires that Aerospike datbase is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Install the Java client.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
The test data has 1000 records with user-key "id-1" through "id-1000", two integer bins (fields) "bin1" (1-1000) and "bin2" (1001-2000), and one string bin "bin3" (random 5 values "A" through "E"), in the namespace "test" and set "sql-aggregate".
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import java.util.Random;
String[] groups = {"A", "B", "C", "D", "E"};
Random rand = new Random(1);
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
String Namespace = "test";
String Set = "sql-aggregate";
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 1000; i++) {
Key key = new Key(Namespace, Set, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
Bin bin3 = new Bin(new String("bin3"), groups[rand.nextInt(groups.length)]);
client.put(wpolicy, key, bin1, bin2, bin3);
}
System.out.format("Test data popuated");;
Initialized the client and connected to the cluster. Test data popuated
To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on "bin1" in "sql-aggregate" set.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_sql_aggregate_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1.
Processing aggregates in Aerospike involves processing a stream of records through a pipeline of operators on server as well as client.
Four types of operators are supported: Filter, Map, Aggregate, and Reduce. The operators work with one of the following data types as input and output: Record, Integer, String, Map (the data type, not to be confused with the Map operator), and List. Only the initial filter(s) and first non-filter operator in the pipeline can consume Record type.
The operators may appear any number of times and in any order in the pipeline.
The operator pipeline is typically processed in two phases: first phase on server nodes and the second phase on client.
Thus, the first reduce operation if specified in the pipeline is executed on all server nodes as well as on client. If there is no reduce operator in the pipeline, the application will receive the combined results returned from server nodes.
Post aggregation processing involves operators after the first reduce in the pipeline, usually for sorting, filtering, and final transformation, and takes place on the client side.
Aggregation processing in Aerospike is defined using User Defined Functions (UDFs). UDFs are written in Lua with arbitrary logic and are executed on both server and client as explained above. Since aggregates by definition involve multiple records, only stream UDFs are discussed below (versus record UDFs whose scope of execution is a single record).
A stream UDF specifies the pipeline of operators for processing aggregates. Different aggregates differ in their UDF functions, whereas the Aerospike APIs are the same to specify the aggregate processing.
The UDFs and logic are described in appropriate sections for each aggregate function below. For additional context and details, please refer to the documentation.
SELECT aggregate(col) FROM namespace.set
The UDF function that specifies the processing pipeline is different, and is specified and executed with the following API operations for synchronous execution.
void Statement::setAggregateFunction(String udfModule, String udfFunction, ... Value udfArgs))
ResultSet rs = Client::queryAggregate(QueryPolicy policy, Statement stmt);
Simple aggregate computations are possible with a simple pipeline of map and reduce. Such simple computations can save the aggregation state in a single numeric or string value during stream processing. Examples include single aggregate computations for:
Let us separately implement COUNT and SUM aggregate functions on a single bin.
Note, all UDF functions for this notebook are assumed to be in "aggregate_fns.lua" file under "udf" directory. Please refer to "Working with UDF Module" section above.
As explained above, the logic for aggregation resides in a stream UDF.
COUNT
Examine the following Lua code that implements COUNT. The pipeline consists of map and reduce operators.
-- count and sum reducer local function add_values(val1, val2) return (val1 or 0) + (val2 or 0) end -- count mapper -- note closures are used to access aggregate parameters such as bin local function rec_to_count_closure(bin) local function rec_to_count(rec) -- if bin is specified: if bin exists in record return 1 else 0; if no bin is specified, return 1 return (not bin and 1) or ((rec[bin] and 1) or 0) end return rec_to_count end -- count function count(stream) return stream : map(rec_to_count_closure()) : reduce(add_values) end
SUM
Examine the following Lua code that implements SUM. The pipeline consists of map and reduce operators.
- mapper for various single bin aggregates local function rec_to_bin_value_closure(bin) local function rec_to_bin_value(rec) -- if a numeric bin exists in record return its value; otherwise return nil local val = rec[bin] if (type(val) ~= "number") then val = nil end return val end return rec_to_bin_value end -- sum function sum(stream, bin) return stream : map(rec_to_bin_value_closure(bin)) : reduce(add_values) end
Register the UDF with the server by executing the following code cell.
The registerUDF() function below can be run conveniently when the UDF is modified (you are encouraged to experiment with the UDF code). The function invalidates the cache, removes the currently registered module, and registers the latest version.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.lua.LuaCache;
LuaConfig.SourceDirectory = "../udf";
String UDFFile = "aggregate_fns.lua";
String UDFModule = "aggregate_fns";
void registerUDF() {
// clear the lua cache
LuaCache.clearPackages();
Policy policy = new Policy();
// remove the current module, if any
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
}
registerUDF();
Registered the UDF module aggregate_fns.lua.
SELECT COUNT(bin2) FROM test.sql-aggregate
SELECT SUM(bin2) FROM test.sql-aggregate
Here we will execute the "count" and "sum" functions on "bin2" in all (1000) records in the set. The expected sum for bin2 values (1001 + 1002 + ... + 2000) is 1500500.
import com.aerospike.client.query.Statement;
import com.aerospike.client.Value;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.ResultSet;
// COUNT
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "count", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed COUNT.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s\n", obj.toString());
}
rs.close();
// SUM
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "sum", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s\n", obj.toString());
}
rs.close();
Executed COUNT. Returned object: 1000 Executed SUM. Returned object: 1500500
SELECT agg(col) FROM namespace.set WHERE condition
The WHERE clause must be implemented using either query's index predicate or UDF's stream filter. Let's implement this specific query:
SELECT SUM(bin2) FROM test.sql-aggregate WHERE bin1 >= 3 AND bin1 <= 7
Let's first use query filter and then UDF stream filter to illustrate. In both cases, the filter is 2<=bin1<=7. The expected sum (1002 + 1003 + .. + 1007) is 6027.
import com.aerospike.client.query.Filter;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 2, 7));
stmt.setAggregateFunction(UDFModule, "sum", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM using the query filter.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed SUM using the query filter. Returned object: 6027
Now let's implement the range filter function in UDF.
SUM_RANGE
Examine the following Lua code that implements the SUM with a range filter. It takes sum_bin, range_bin, and range limits range_low and range_high. The pipeline consists of filter followed by map and reduce operators.
-- range filter local function range_filter_closure(range_bin, range_low, range_high) local function range_filter(rec) -- if bin value is in [low,high] return true, false otherwise local val = rec[range_bin] if (not val or type(val) ~= "number") then val = nil end return (val and (val >= range_low and val <= range_high)) or false end return ranger_filter end -- sum of range: sum(sum_bin) where range_bin in [range_low, range_high] function sum_range(stream, sum_bin, range_bin, range_low, range_high) return stream : filter(range_filter_closure(range_bin, range_low, range_high)) : map(rec_to_bin_value_closure(sum_bin)) : reduce(add_values) end
With the same range (2 <= bin1 <= 7), we expect the same results.
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "sum_range",
Value.get("bin2"), Value.get("bin1"), Value.get(2), Value.get(7));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM-RANGE using the filter operator.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed SUM-RANGE using the filter operator. Returned object: 6027
Note, you cannot use expression filters with queryAggregate as they are ignored. Below, all records in the set are aggregated in sum even when the expression filter 2 <= bin1 <= 7 that is specified in the policy.
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
QueryPolicy policy = new QueryPolicy(client.queryPolicyDefault);
policy.filterExp = Exp.build(
Exp.and(
Exp.ge(Exp.intBin("bin1"), Exp.val(2)),
Exp.le(Exp.intBin("bin1"), Exp.val(7))));
stmt.setAggregateFunction(UDFModule, "sum", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed SUM using expression filter 2 <= bin1 <=7");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed SUM using expression filter 2 <= bin1 <=7 Returned object: 1500500
SELECT MIN(bin2) FROM test.sql-aggregate
SELECT MAX(bin2) FROM test.sql-aggregate
Examine the following Lua code that implements the aggregate functions MIN and MAX.
MIN
The pipeline consists of a simple map and reduce.
MAX is very similar to MIN above.
-- min reducer local function get_min(val1, val2) local min = nil if val1 then if val2 then if val1 < val2 then min = val1 else min = val2 end else min = val1 end else if val2 then min = val2 end end return min end -- min function min(stream, bin) return stream : map(rec_to_bin_value_closure(bin)) : reduce(get_min) end -- max reducer local function get_max(val1, val2) local max = nil if val1 then if val2 then if val1 > val2 then max = val1 else max = val2 end else max = val1 end else if val2 then max = val2 end end return max end -- max function max(stream, bin) return stream : map(rec_to_bin_value_closure(bin)) : reduce(get_max) end
// MIN
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "min", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed MIN.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s\n", obj.toString());
}
rs.close();
// MAX
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "max", Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed MAX.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed MIN. Returned object: 1001 Executed MAX. Returned object: 2000
SELECT agg1(bin1), agg2(bin2), ... FROM namespace.set WHERE condition
The aggregate operator is used when you need to track a more complex state during stream processing. For example, to compute multiple aggregates in one query or to compute aggregates that need other aggregates for evaluation such as AVERAGE (SUM/COUNT) and RANGE (MAX-MIN).
We will illustrate the aggregate operator for AVERAGE and RANGE computations of bin1 and bin2 respectively. The aggregate function will compute SUM, COUNT, MIN, and MAX of appropriate bins needed for AVERAGE and RANGE computations at the end.
SELECT AVERAGE(bin1), RANGE(bin2), ... FROM test.sql-aggregate
We will implement a new UDF "average_range" for this.
Note that the reducer function entails merging two partial stream aggregates into one by adding their "sum" and "count" values ("map merge"). The final phase of reduce happens on the client to arrive at the final Sum and Count. The final map operator is a client-only operation that takes the aggregate (map) as input and outputs the average and range values.
AVERAGE_RANGE
It takes the bins whose AVERAGE and RANGE are needed. The pipeline consists of map, aggregate, reduce, and map operators.
-- map function to compute average and range local function compute_final_stats(stats) local ret = map(); ret['AVERAGE'] = stats["sum"] / stats["count"] ret['RANGE'] = stats["max"] - stats["min"] return ret end -- merge partial stream maps into one local function merge_stats(a, b) local ret = map() ret["sum"] = add_values((a["sum"], b["sum"]) ret["count"] = add_values(a["count"], b["count"]) ret["min"] = get_min(a["min"], b["min"]) ret["max"] = get_max(a["max"], b["max"]) return ret end -- aggregate operator to compute stream state for average_range local function aggregate_stats(agg, val) agg["count"] = (agg["count"] or 0) + ((val["bin_avg"] and 1) or 0) agg["sum"] = (agg["sum"] or 0) + (val["bin_avg"] or 0) agg["min"] = get_min(agg["min"], val["bin_range"]) agg["max"] = get_max(agg["max"], val["bin_range"]) return agg end -- average_range function average_range(stream, bin_avg, bin_range) local function rec_to_bins(rec) -- extract the values of the two bins in ret local ret = map() ret["bin_avg"] = rec[bin_avg] ret["bin_range"] = rec[bin_range] return ret end return stream : map(rec_to_bins) : aggregate(map(), aggregate_stats) : reduce(merge_stats) : map(compute_final_stats) end
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "average_range", Value.get("bin1"), Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed AVERAGE+RANGE.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed AVERAGE+RANGE. Returned object: {AVERAGE=500.5, RANGE=999}
Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.
Remove tutorial data and close connection.
client.dropIndex(null, Namespace, Set, IndexName);
client.truncate(null, Namespace, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");
Removed tutorial data and closed server connection.
Here are some links for further exploration
Resources
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.