import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
%%loadFromPOM
// To execute, first convert the cell type from markdown to code. String luaCode = "-- sum_example.lua" + "\n" + "" + "\n" + "local function reducer(val1,val2)" + "\n" + " return val1 + val2" + "\n" + "end" + "\n" + "" + "\n" + "function sum_single_bin(stream,name)" + "\n" + " local function mapper(rec)" + "\n" + " return rec[name]" + "\n" + " end" + "\n" + " return stream : map(mapper) : reduce(reducer)" + "\n" + "end"; CreateUDFModule("sum_example.lua", luaCode); System.out.format("Lua module %s created.", "udf/sum_example.lua"); //import com.aerospike.client.task.RegisterTask; import com.aerospike.client.Language; String UDFDir = "./udf"; String UDFFile = "sum_example.lua"; RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile, UDFFile, Language.LUA); task.waitTillComplete(); System.out.format("Registered the UDF module %s.", UDFFile); import com.aerospike.client.query.Statement; import com.aerospike.client.query.Filter; import com.aerospike.client.Value; int begin = 4; int end = 7; Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setBinNames(BinInt, BinStr); stmt.setFilter(Filter.range(BinInt, begin, end)); System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d", Namespace, Set, BinInt, begin, end); String UDFModule = "sum_example"; String UDFFunction = "sum_single_bin"; stmt.setAggregateFunction(UDFModule, UDFFunction, Value.get(BinInt)); System.out.format("Aggregate function %s added for server processing.", UDFFunction); import com.aerospike.client.query.ResultSet; ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed the query with UDF; got results."); System.out.println("Processing results:"); try { int expected = 22; // 4 + 5 + 6 + 7 int count = 0; while (rs.next()) { Object object = rs.getObject(); long sum; if (object instanceof Long) { sum = (Long)rs.getObject(); } else { System.out.println("Return value not a long: " + object); continue; } if (expected == (int)sum) { System.out.format("Sum matched! Value=%d.", expected); } else { System.out.format("Sum mismatch: Expected %d. Received %d.", expected, (int)sum); } count++; } if (count == 0) { System.out.println("Query failed. No records returned."); } } finally { rs.close(); } // Execute this cell to create UDF module "udf/update_example.lua"
// To execute, first convert the cell type from markdown to code. String luaCode = "-- update_example.lua" + "\n" + "" + "\n" + "function multiplyBy(rec, binName, factor)" + "\n" + " rec[binName] = rec[binName] * factor" + "\n" + " aerospike:update(rec)" + "\n" + "end"; CreateUDFModule("update_example.lua", luaCode); System.out.format("Lua module %s created.", "udf/update_example.lua"); //String UDFFile = "update_example.lua"; RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile, UDFFile, Language.LUA); task.waitTillComplete(); System.out.format("Registered the UDF module %s.", UDFFile); Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setBinNames(BinInt, BinStr); int begin = 3; int end = 9; // Filter is evaluated using a secondary index and therefore can only reference an indexed bin. stmt.setFilter(Filter.range(BinInt, begin, end)); System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d", Namespace, Set, BinInt, begin, end); // Predicate Expressions are applied on query results on server side. import com.aerospike.client.policy.WritePolicy; import com.aerospike.client.exp.Exp; import com.aerospike.client.query.RegexFlag; WritePolicy policy = new WritePolicy(client.writePolicyDefault); policy.filterExp = Exp.build( Exp.or( Exp.regexCompare(".*smith.*", RegexFlag.ICASE, Exp.stringBin(BinStr)), Exp.regexCompare(".*jones.*", RegexFlag.ICASE, Exp.stringBin(BinStr)))); System.out.format("Predicate Expression: (valstr ilike '%%smith%%' || valstr ilike '%%jones%%')"); // Execute the update UDF function on records that match the statement filter and policy filter. // Records are not returned to the client. This asynchronous server call will return // before the command is complete. The user can optionally wait for command completion // by using the returned ExecuteTask instance. import com.aerospike.client.task.ExecuteTask; import com.aerospike.client.Value; int MultiplicationFactor = 5; ExecuteTask task = client.execute(policy, stmt, "update_example", "multiplyBy", Value.get(BinInt), Value.get(MultiplicationFactor)); task.waitTillComplete(3000, 0); // poll time 3s, no timeout System.out.format("Executed the query and filter expression and applied UDF update to records."); import com.aerospike.client.Record; for (int i = 1; i <= NumRecords; i++) { Key key = new Key(Namespace, Set, i); Record record = client.get(null, key, BinInt, BinStr); System.out.println(record); } client.dropIndex(null, Namespace, Set, IndexName); client.close(); System.out.println("Index dropped and server connection closed.");