This tutorial is Part 2 of how to implement SQL aggregate queries in Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
In this notebook, we will see how specific aggregate statements in SQL can be implemented in Aerospike.
SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL aggregate queries. You should be able to understand them and find them useful even without deep familiarity with SQL.
This notebook is the third in the SQL Operations series that consists of the following notebooks:
Part 1 of Aggregate functions describes simpler aggregate processing of a stream of records.
The specific topics and aggregate functions we discuss in this notebook include:
Stream Partitioning with GROUP BY
Additional aggregate functions
The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.
This tutorial assumes familiarity with the following topics:
All UDF functions for this notebook are placed in "aggregate_fns.lua" file under the "udf" subdirectory. If the subdirectory or file is not there, you may download the file from here and place it there using the notebook's File->Open followed by Upload/New menus.
You are encouraged to experiment with the Lua code in the module. Be sure to save your changes and then run the convenience function "registerUDF()" in a code cell for the changes to take effect.
This notebook requires that Aerospike Database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Install the Java client.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
The test data has 1000 records with user-key "id-1" through "id-1000", two integer bins (fields) "bin1" (1-1000) and "bin2" (1001-2000), and one string bin "bin3" (random 5 values "A" through "E"), in the namespace "test" and set "sql-aggregate".
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
import java.util.Random;
String[] groups = {"A", "B", "C", "D", "E"};
Random rand = new Random(1);
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
String Namespace = "test";
String Set = "sql-aggregate";
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 1000; i++) {
Key key = new Key(Namespace, Set, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
Bin bin3 = new Bin(new String("bin3"), groups[rand.nextInt(groups.length)]);
client.put(wpolicy, key, bin1, bin2, bin3);
}
System.out.format("Test data populated");;
Initialized the client and connected to the cluster. Test data populated
To use the query API with index based filter, a secondary index must exist on the filter bin. Here we create a numeric index on "bin1" in "sql-aggregate" set.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_sql_aggregate_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Created number index test_sql_aggregate_bin1_number_idx on ns=test set=sql-aggregate bin=bin1.
(This section is repeated for convenience from Part 1. Please skip to the the next section if you are familiar with the execution model.)
Processing aggregates in Aerospike involves processing a stream of records through a pipeline of operators on server as well as client.
Four types of operators are supported: Filter, Map, Aggregate, and Reduce. The operators work with one of the following data types as input and output: Record, Integer, String, Map (the data type, not to be confused with the Map operator), and List. Only the initial filter(s) and first non-filter operator in the pipeline can consume Record type.
The operators may appear any number of times and in any order in the pipeline.
The operator pipeline is typically processed in two phases: first phase on server nodes and the second phase on client.
Thus, the first reduce operation if specified in the pipeline is executed on all server nodes as well as on client. If there is no reduce operator in the pipeline, the application will receive the combined results returned from server nodes.
Post aggregation processing involves operators after the first reduce in the pipeline, usually for sorting, filtering, and final transformation, and takes place on the client side.
Aggregation processing in Aerospike is defined using User Defined Functions (UDFs). UDFs are written in Lua with arbitrary logic and are executed on both server and client as explained above. Since aggregates by definition involve multiple records, only stream UDFs are discussed below (versus record UDFs whose scope of execution is a single record).
A stream UDF specifies the pipeline of operators for processing aggregates. Different aggregates differ in their UDF functions, whereas the Aerospike APIs are the same to specify the aggregate processing.
The UDFs and logic are described in appropriate sections for each aggregate function below. For additional context and details, please refer to the documentation.
Note, all UDF functions for this notebook are assumed to be in "aggregate_fns.lua" file under "udf" directory. Please refer to "Working with UDF Module" section above.
Register the UDF with the server by executing the following code cell.
The registerUDF() function below can be run conveniently when the UDF is modified (you are encouraged to experiment with the UDF code). The function invalidates the cache, removes the currently registered module, and registers the latest version.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.lua.LuaCache;
LuaConfig.SourceDirectory = "../udf";
String UDFFile = "aggregate_fns.lua";
String UDFModule = "aggregate_fns";
void registerUDF() {
// clear the lua cache
LuaCache.clearPackages();
Policy policy = new Policy();
// remove the current module, if any
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
}
registerUDF();
Registered the UDF module aggregate_fns.lua.
SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1
GROUP BY processing partitions the record stream into multiple partitions, one for each distinct value of the grouped-by bin. The aggregate operator in the pipeline outputs a nested map - an outer map of all partitions or distinct values of the grouped-by bin, and an inner map for each partition to maintain each partition's aggregates. The bins aggregated for each group, such as agg(bin2) in the above SQL statement, are stored within a group's map.
Reduce uses map-merge to merge partial aggregates. Since map-merge currently does not handle nested maps, merging at multiple levels have to be explicitly specified as shown.
The filter "inner-condition" can be specified on any bins in the record, and can be processed using a query predicate filter and/or a stream filter operator. This is as described in Part 1, and so the example below will omit the WHERE clause for simplicity.
SELECT bin1, SUM(bin2) FROM test.sql-aggregate GROUP BY bin1
We will implement a new UDF "groupby_with_sum" for this.
GROUPBY_WITH_SUM
It takes two bins: the bin to group-by and the bin to sum. The pipeline consists of map, aggregate, and reduce operators.
-- nested map merge for group-by sum/count; explicit map merge at each nested level local function merge_group_sum(a, b) local function merge_group(x, y) -- inner map merge return map.merge(x, y, add_values) end -- outer map merge return map.merge(a, b, merge_group) end -- aggregate for group-by sum -- creates a map for each distinct group value and adds the value tagged for a group to the group's sum local function group_sum(agg, groupval) if not agg[groupval["group"]] then agg[groupval["group"]] = map() end agg[groupval["group"]]["sum"] = (agg[groupval["group"]]["sum"] or 0) + (groupval["value"] or 0) return agg end -- group-by with sum function groupby_with_sum(stream, bin_grpby, bin_sum) local function rec_to_group_and_bin(rec) -- tag the group by bin_grpby value, return a map containing group and bin_sum value local ret = map() ret["group"] = rec[bin_grpby] local val = rec[bin_sum] if (not val or type(val) ~= "number") then val = 0 end ret["value"] = val return ret end return stream : map(rec_to_group_and_bin) : aggregate(map(), group_sum) : reduce(merge_group_sum) end
import com.aerospike.client.query.Statement;
import com.aerospike.client.Value;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.ResultSet;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "groupby_with_sum", Value.get("bin3"), Value.get("bin2"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed GROUP BY with SUM.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed GROUP BY with SUM. Returned object: {A={sum=276830}, B={sum=296246}, C={sum=260563}, D={sum=332231}, E={sum=334630}}
SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition
Note the inner filter "inner-condition" can be specified using any bins in the record, whereas the outer filter and ORDER BY must use selected (aggregated) bins from the query. We will focus on the outer condition in the following example which outputs the count of distinct bin3 values in the range "B" and "E".
SELECT bin3, COUNT(*) FROM test.sql-aggreate GROUP BY bin3 HAVING "B" <= bin3 AND bin3 <= "E"
Processing for Having clause can be done by using a filter operator after reduce.
Here we implement a new UDF "groupby_with_count_having" for this.
GROUPBY_WITH_COUNT_HAVING
It takes the group-by bin and the range values for the groups. The pipeline consists of map, aggregate, reduce, and filter operators.
-- aggregate for group-by count -- creates a map for each distinct group value and increments the tagged group's count local function group_count(agg, group) if not agg[group] then agg[group] = map() end agg[group]["count"] = (agg[group]["count"] or 0) + ((group and 1) or 0) return agg end -- map function for group-by processing local function rec_to_group_closure(bin_grpby) local function rec_to_group(rec) -- returns group-by bin value in a record return rec[bin_grpby] end return rec_to_group end -- group-by having example: count(*) having low <= count <= high function groupby_with_count_having(stream, bin_grpby, having_range_low, having_range_high) local function process_having(stats) -- filters groups with count in the range local ret = map() for key, value in map.pairs(stats) do if (key >= having_range_low and key <= having_range_high) then ret[key] = value end end return ret end return stream : map(rec_to_group_closure(bin_grpby)) : aggregate(map(), group_count) : reduce(merge_group_sum) : map(process_having) end
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "groupby_with_count_having", Value.get("bin3"), Value.get("B"), Value.get("D"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed GROUP BY with COUNT and HAVING.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed GROUP BY with COUNT and HAVING. Returned object: {D={count=222}, B={count=196}, C={count=172}}
SELECT bin1, agg(bin2) FROM namespace.set WHERE inner-condition GROUP BY bin1 HAVING outer-condition ORDER BY bin
In the following example, the count of distinct bin3 values is produced in descending order.
SELECT bin3, COUNT(*) FROM test.sql-aggregate GROUP BY bin3 ORDER BY COUNT
Processing for Order By clause can be done by using a map operator at the end that outputs an ordered list.
The UDF "groupby_with_count_orderby" is very similar to the HAVING example.
GROUPBY_WITH_COUNT_ORDERBY
It takes two bins to group-by order-by. The pipeline consists of map, aggregate, reduce, and map operators.
-- group-by count(*) order-by count function groupby_with_count_orderby(stream, bin_grpby, bin_orderby) local function orderby(t, order) -- collect the keys local keys = {} for k in pairs(t) do keys[#keys+1] = k end -- sort by the order by passing the table and keys a, b, table.sort(keys, function(a,b) return order(t, a, b) end) -- return the iterator function local i = 0 return function() i = i + 1 if keys[i] then return keys[i], t[keys[i] ] end end end local function process_orderby(stats) -- uses lua table sort to sort aggregate map into a list -- list has k and v separately added for sorted entries local ret = list() local t = {} for k,v in map.pairs(stats) do t[k] = v end for k,v in orderby(t, function(t, a, b) return t[a][bin_orderby] < t[b][bin_orderby] end) do list.append(ret, k) list.append(ret, v) end return ret end return stream : map(rec_to_group) : aggregate(map(), group_count) : reduce(merge_group_count) : map(process_orderby) end
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "groupby_with_count_orderby", Value.get("bin3"), Value.get("count"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed GROUP BY with COUNT and ORDER BY.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed GROUP BY with COUNT and ORDER BY. Returned object: [C, {count=172}, A, {count=187}, B, {count=196}, D, {count=222}, E, {count=223}]
Let us see how DISTINCT, LIMIT, and TOP N can be processed. Only the first two appear in SQL syntax, and the third is a special case of a LIMIT query.
SELECT DISTINCT(bin) FROM namespace.set WHERE condition
DISTINCT can be processed by storing all values in a map (in the aggregate state) that is keyed on the value(s) of the bin(s) so only unique values are retained.
In the following example, distinct bin3 values are produced for records whose bin1 is in the range [101,200].
SELECT DISTINCT bin3 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200
The UDF "distinct" implements a single bin distinct.
DISTINCT
It takes the bin and returns its distinct values. The pipeline consists of map, aggregate, reduce, and map operators.
-- return map keys in a list local function map_to_list(values) local ret = list() for k in map.keys(values) do list.append(ret, k) end return ret end -- merge partial aggregate maps local function merge_values(a, b) return map.merge(a, b, function(v1, v2) return ((v1 or v2) and 1) or nil end) end -- map for distinct; using map unique keys local function distinct_values(agg, value) if value then agg[value] = 1 end return agg end -- distinct function distinct(stream, bin) local function rec_to_bin_value(rec) -- simply return bin value in rec return rec[bin] end return stream : map(rec_to_bin_value) : aggregate(map(), distinct_values) : reduce(merge_values) : map(map_to_list) end
import com.aerospike.client.query.Filter;
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 101, 200));
stmt.setAggregateFunction(UDFModule, "distinct", Value.get("bin3"));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed DISTINCT.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed DISTINCT. Returned object: [A, C, B, E, D]
SELECT bin FROM namespace.set WHERE condition LIMIT N
In the following example, up to 10 values in bin2 are produced for records whose bin1 is in the range [101,200].
SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 LIMIT 10
The UDF "limit" returns a single bin with an upper limit on number of results returned.
LIMIT
It takes the bin and max limit. and returns up to max number of bin values. The pipeline consists of aggregate and reduce.
function limit(stream, bin, max) local function list_limit(agg, rec) -- add to list if the list size is below the limit if list.size(agg) < max then local ret = map() ret[bin] = rec[bin] list.append(agg, ret) end return agg end local function list_merge_limit(a, b) local ret = list() list.concat(ret, list.take(a, max)) list.concat(ret, list.take(b, (max > list.size(ret) and max-list.size(ret)) or 0)) return ret end return stream : aggregate(list(), list_limit) : reduce(list_merge_limit) end
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 101, 200));
stmt.setSetName(Set);
stmt.setAggregateFunction(UDFModule, "limit", Value.get("bin2"), Value.get(10));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed LIMIT N.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed LIMIT N. Returned object: [{bin2=1128}, {bin2=1160}, {bin2=1192}, {bin2=1129}, {bin2=1161}, {bin2=1193}, {bin2=1130}, {bin2=1162}, {bin2=1194}, {bin2=1131}]
SELECT bin FROM namespace.set WHERE condition ORDER BY bin DESC LIMIT N
TOP N can be processed by retaining top N values in a list in aggregate as well as reduce operators.
In the following example, top 10 values in bin2 are produced for records whose bin1 is in the range [101,200].
SELECT bin2 FROM test.sql-aggregate WHERE bin1 >= 101 bin1 <= 200 ORDER BY bin2 DESC LIMIT 10
The UDF "top_n" returns the top N values from a bin.
TOP_N
It takes the bin and N. and returns top N bin values. The pipeline consists of map, aggregate, reduce, and map.
-- top n function top_n(stream, bin, n) local function get_top_n(values) -- return top n values in a map as an ordered list -- uses lua table sort local t = {} local i = 1 for k in map.keys(values) do t[i] = k i = i + 1 end table.sort(t, function(a,b) return a > b end) local ret = list() local i = 0 for k, v in pairs(t) do list.append(ret, v) i = i + 1 if i == n then break end end return ret end local function top_n_values(agg, value) if value then agg[value] = 1 end -- if map size exceeds n*10, trim to top n if map.size(agg) > n*10 then local new_agg = map() local trimmed = trim_to_top_n(agg) for value in list.iterator(trimmed) do new_agg[value] = 1 end agg = new_agg end return agg end return stream : map(rec_to_bin_value_closure(bin)) : aggregate(map(), top_n_values) : reduce(merge_values) : map(get_top_n) end
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
// range filter using the secondary index on bin1
stmt.setFilter(Filter.range("bin1", 101, 200));
stmt.setAggregateFunction(UDFModule, "top_n", Value.get("bin2"), Value.get(5));
ResultSet rs = client.queryAggregate(null, stmt);
System.out.println("Executed TOP N.");
while (rs.next()) {
Object obj = rs.getObject();
System.out.format("Returned object: %s", obj.toString());
}
rs.close();
Executed TOP N. Returned object: [1200, 1199, 1198, 1197, 1196]
Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various aggregate statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.
Remove tutorial data and close connection.
client.dropIndex(null, Namespace, Set, IndexName);
client.truncate(null, Namespace, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");
Removed tutorial data and closed server connection.
Here are some links for further exploration
Resources
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open, and select Upload.