import io.github.spencerpark.ijava.IJava; import io.github.spencerpark.jupyter.kernel.magic.common.Shell; IJava.getKernelInstance().getMagics().registerMagics(Shell.class); %sh asd %%loadFromPOM com.aerospike aerospike-client 5.0.0 import com.aerospike.client.AerospikeClient; import com.aerospike.client.Bin; import com.aerospike.client.Key; AerospikeClient client = new AerospikeClient("localhost", 3000); System.out.println("Initialized the client and connected to the cluster."); import com.aerospike.client.policy.ClientPolicy; String Namespace = "test"; String Set = "demo"; String BinInt = "binint"; String BinStr = "binstr"; int NumRecords = 10; String Names[] = {"1. Clark", "2. Keenan", "3. Smith", "4. Jones", "5. Clark", "6. Jones", "7. Iyer", "8. Smith", "9. Hernandez", "10. Smith"}; ClientPolicy policy = new ClientPolicy(); for (int i = 1; i <= NumRecords; i++) { Key key = new Key(Namespace, Set, i); Bin bin1 = new Bin(BinInt, i); Bin bin2 = new Bin(BinStr, Names[i-1]); client.put(policy.writePolicyDefault, key, bin1, bin2); } System.out.format("Written %d records in ns=%s set=%s with userkeys 1-%d.", NumRecords, Namespace, Set, NumRecords); import com.aerospike.client.policy.Policy; import com.aerospike.client.query.IndexType; import com.aerospike.client.task.IndexTask; import com.aerospike.client.AerospikeException; import com.aerospike.client.ResultCode; String IndexName = "idx_numeric_test_demo_binint"; Policy policy = new Policy(); policy.socketTimeout = 0; // Do not timeout on index create. try { IndexTask task = client.createIndex(policy, Namespace, Set, IndexName, BinInt, IndexType.NUMERIC); task.waitTillComplete(); } catch (AerospikeException ae) { if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) { throw ae; } } System.out.format("Created index %s on ns=%s set=%s bin=%s.", IndexName, Namespace, Set, BinInt); import java.nio.file.Files; import java.nio.file.Paths; import java.io.FileWriter; void CreateUDFModule(String name, String code) { try { if (!Files.exists(Paths.get("./udf"))) { Files.createDirectory(Paths.get("./udf")); } FileWriter fw = new FileWriter("./udf/" + name); fw.write(luaCode); fw.close(); } catch (Exception e) { System.out.format("Failed to create Lua module %s, exception: %s.", "udf/"+name, e); } } // Execute this cell to create UDF module "udf/sum_example.lua"
// To execute, first convert the cell type from markdown to code.

String luaCode = 
    "-- sum_example.lua" + "\n" +
    "" + "\n" +
    "local function reducer(val1,val2)" + "\n" +
    "    return val1 + val2"  + "\n" +
    "end"  + "\n" +
    "" + "\n" +
    "function sum_single_bin(stream,name)"  + "\n" +
    "    local function mapper(rec)"  + "\n" +
    "        return rec[name]"  + "\n" +
    "    end"  + "\n" +
    "    return stream : map(mapper) : reduce(reducer)"  + "\n" +
    "end";
CreateUDFModule("sum_example.lua", luaCode);

System.out.format("Lua module %s created.", "udf/sum_example.lua"); // 
import com.aerospike.client.task.RegisterTask; import com.aerospike.client.Language; String UDFDir = "./udf"; String UDFFile = "sum_example.lua"; RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile, UDFFile, Language.LUA); task.waitTillComplete(); System.out.format("Registered the UDF module %s.", UDFFile); import com.aerospike.client.query.Statement; import com.aerospike.client.query.Filter; import com.aerospike.client.Value; int begin = 4; int end = 7; Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setBinNames(BinInt, BinStr); stmt.setFilter(Filter.range(BinInt, begin, end)); System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d", Namespace, Set, BinInt, begin, end); String UDFModule = "sum_example"; String UDFFunction = "sum_single_bin"; stmt.setAggregateFunction(UDFModule, UDFFunction, Value.get(BinInt)); System.out.format("Aggregate function %s added for server processing.", UDFFunction); import com.aerospike.client.query.ResultSet; ResultSet rs = client.queryAggregate(null, stmt); System.out.println("Executed the query with UDF; got results."); System.out.println("Processing results:"); try { int expected = 22; // 4 + 5 + 6 + 7 int count = 0; while (rs.next()) { Object object = rs.getObject(); long sum; if (object instanceof Long) { sum = (Long)rs.getObject(); } else { System.out.println("Return value not a long: " + object); continue; } if (expected == (int)sum) { System.out.format("Sum matched! Value=%d.", expected); } else { System.out.format("Sum mismatch: Expected %d. Received %d.", expected, (int)sum); } count++; } if (count == 0) { System.out.println("Query failed. No records returned."); } } finally { rs.close(); } // Execute this cell to create UDF module "udf/update_example.lua"
// To execute, first convert the cell type from markdown to code.

String luaCode = 
    "-- update_example.lua" + "\n" +
    "" + "\n" +
    "function multiplyBy(rec, binName, factor)" + "\n" +
    "    rec[binName] = rec[binName] * factor" + "\n" +
    "    aerospike:update(rec)" + "\n" +
    "end";
CreateUDFModule("update_example.lua", luaCode);

System.out.format("Lua module %s created.", "udf/update_example.lua"); //
String UDFFile = "update_example.lua"; RegisterTask task = client.register(policy, UDFDir+"/"+UDFFile, UDFFile, Language.LUA); task.waitTillComplete(); System.out.format("Registered the UDF module %s.", UDFFile); Statement stmt = new Statement(); stmt.setNamespace(Namespace); stmt.setSetName(Set); stmt.setBinNames(BinInt, BinStr); int begin = 3; int end = 9; // Filter is evaluated using a secondary index and therefore can only reference an indexed bin. stmt.setFilter(Filter.range(BinInt, begin, end)); System.out.format("Query on ns=%s set=%s, with bin %s >= %d <= %d", Namespace, Set, BinInt, begin, end); // Predicate Expressions are applied on query results on server side. import com.aerospike.client.policy.WritePolicy; import com.aerospike.client.exp.Exp; import com.aerospike.client.query.RegexFlag; WritePolicy policy = new WritePolicy(client.writePolicyDefault); policy.filterExp = Exp.build( Exp.or( Exp.regexCompare(".*smith.*", RegexFlag.ICASE, Exp.stringBin(BinStr)), Exp.regexCompare(".*jones.*", RegexFlag.ICASE, Exp.stringBin(BinStr)))); System.out.format("Predicate Expression: (valstr ilike '%%smith%%' || valstr ilike '%%jones%%')"); // Execute the update UDF function on records that match the statement filter and policy filter. // Records are not returned to the client. This asynchronous server call will return // before the command is complete. The user can optionally wait for command completion // by using the returned ExecuteTask instance. import com.aerospike.client.task.ExecuteTask; import com.aerospike.client.Value; int MultiplicationFactor = 5; ExecuteTask task = client.execute(policy, stmt, "update_example", "multiplyBy", Value.get(BinInt), Value.get(MultiplicationFactor)); task.waitTillComplete(3000, 0); // poll time 3s, no timeout System.out.format("Executed the query and filter expression and applied UDF update to records."); import com.aerospike.client.Record; for (int i = 1; i <= NumRecords; i++) { Key key = new Key(Namespace, Set, i); Record record = client.get(null, key, BinInt, BinStr); System.out.println(record); } client.dropIndex(null, Namespace, Set, IndexName); client.close(); System.out.println("Index dropped and server connection closed.");