import io.github.spencerpark.ijava.IJava; import io.github.spencerpark.jupyter.kernel.magic.common.Shell; IJava.getKernelInstance().getMagics().registerMagics(Shell.class); %sh asd %%loadFromPOM com.aerospike aerospike-client 5.0.0 import com.aerospike.client.AerospikeClient; import com.aerospike.client.Bin; import com.aerospike.client.Key; import com.aerospike.client.policy.WritePolicy; import java.util.Random; String[] groups = {"A", "B", "C", "D", "E"}; Random rand = new Random(1); AerospikeClient client = new AerospikeClient("localhost", 3000); System.out.println("Initialized the client and connected to the cluster."); String Namespace = "test"; String Set = "sql-aggregate"; WritePolicy wpolicy = new WritePolicy(); wpolicy.sendKey = true; for (int i = 1; i <= 1000; i++) { Key key = new Key(Namespace, Set, "id-"+i); Bin bin1 = new Bin(new String("bin1"), i); Bin bin2 = new Bin(new String("bin2"), 1000+i); Bin bin3 = new Bin(new String("bin3"), groups[rand.nextInt(groups.length)]); client.put(wpolicy, key, bin1, bin2, bin3); } System.out.format("Test data populated");; import com.aerospike.client.policy.Policy; import com.aerospike.client.query.IndexType; import com.aerospike.client.task.IndexTask; import com.aerospike.client.AerospikeException; import com.aerospike.client.ResultCode; String IndexName = "test_sql_aggregate_bin1_number_idx"; Policy policy = new Policy(); policy.socketTimeout = 0; // Do not timeout on index create. try { IndexTask task = client.createIndex(policy, Namespace, Set, IndexName, "bin1", IndexType.NUMERIC); task.waitTillComplete(); } catch (AerospikeException ae) { if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) { throw ae; } } System.out.format("Created number index %s on ns=%s set=%s bin=%s.", IndexName, Namespace, Set, "bin1");; import com.aerospike.client.policy.Policy; import com.aerospike.client.task.RegisterTask; import com.aerospike.client.Language; import com.aerospike.client.lua.LuaConfig; import com.aerospike.client.lua.LuaCache; LuaConfig.SourceDirectory = "../udf"; String UDFFile = "aggregate_fns.lua"; String UDFModule = "aggregate_fns"; void registerUDF() { // clear the lua cache LuaCache.clearPackages(); Policy policy = new Policy(); // remove the current module, if any client.removeUdf(null, UDFFile); RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile, UDFFile, Language.LUA); task.waitTillComplete(); System.out.format("Registered the UDF module %s.", UDFFile);; } registerUDF(); import com.aerospike.client.query.Statement; import com.aerospike.client.Value; import com.aerospike.client.query.RecordSet; import com.aerospike.client.query.ResultSet; Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "groupby_with_sum", Value.get("bin3"), Value.get("bin2")); ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed GROUP BY with SUM."); while (rs.next()) { Object obj = rs.getObject(); System.out.format("Returned object: %s", obj.toString()); } rs.close(); Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "groupby_with_count_having", Value.get("bin3"), Value.get("B"), Value.get("D")); ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed GROUP BY with COUNT and HAVING."); while (rs.next()) { Object obj = rs.getObject(); System.out.format("Returned object: %s", obj.toString()); } rs.close(); Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "groupby_with_count_orderby", Value.get("bin3"), Value.get("count")); ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed GROUP BY with COUNT and ORDER BY."); while (rs.next()) { Object obj = rs.getObject(); System.out.format("Returned object: %s", obj.toString()); } rs.close(); import com.aerospike.client.query.Filter; Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); // range filter using the secondary index on bin1 stmt.setFilter(Filter.range("bin1", 101, 200)); stmt.setAggregateFunction(UDFModule, "distinct", Value.get("bin3")); ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed DISTINCT."); while (rs.next()) { Object obj = rs.getObject(); System.out.format("Returned object: %s", obj.toString()); } rs.close(); Statement stmt = new Statement(); stmt.setNamespace(Namespace); // range filter using the secondary index on bin1 stmt.setFilter(Filter.range("bin1", 101, 200)); stmt.setSetName(Set); stmt.setAggregateFunction(UDFModule, "limit", Value.get("bin2"), Value.get(10)); ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed LIMIT N."); while (rs.next()) { Object obj = rs.getObject(); System.out.format("Returned object: %s", obj.toString()); } rs.close(); Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); // range filter using the secondary index on bin1 stmt.setFilter(Filter.range("bin1", 101, 200)); stmt.setAggregateFunction(UDFModule, "top_n", Value.get("bin2"), Value.get(5)); ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed TOP N."); while (rs.next()) { Object obj = rs.getObject(); System.out.format("Returned object: %s", obj.toString()); } rs.close(); client.dropIndex(null, Namespace, Set, IndexName); client.truncate(null, Namespace, null, null); client.close(); System.out.println("Removed tutorial data and closed server connection.");