import io.github.spencerpark.ijava.IJava; import io.github.spencerpark.jupyter.kernel.magic.common.Shell; IJava.getKernelInstance().getMagics().registerMagics(Shell.class); %sh asd %%loadFromPOM com.aerospike aerospike-client 6.0.0 import com.aerospike.client.AerospikeClient; AerospikeClient client = new AerospikeClient("localhost", 3000); System.out.println("Initialized the client and connected to the cluster.");; import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; import com.aerospike.client.Key; import com.aerospike.client.policy.WritePolicy; final String Namespace1 = "test"; final String Namespace2 = "test2"; final String Set1 = "batch-ops"; final String Set2 = "batch-ops2"; final String KeyPrefix = "id-"; // convenience function to truncate test data void truncateTestData() { try { client.truncate(null, Namespace1, null, null); client.truncate(null, Namespace2, null, null); } catch (AerospikeException e) { // ignore } } // convenience function to initialize test data void initializeTestData() { truncateTestData(); WritePolicy wpolicy = new WritePolicy(); wpolicy.sendKey = true; for (int i = 1; i <= 3; i++) { for (String ns : Arrays.asList(Namespace1, Namespace2)) { for (String set : Arrays.asList(Set1, Set2)) { Key key = new Key(ns, set, KeyPrefix+i); Bin bin1 = new Bin(new String("bin1"), i); Bin bin2 = new Bin(new String("bin2"), 10*i); HashMap map = new HashMap (); for (int j = 1; j <= i; j++) { map.put(j, j*10); } Bin bin3 = new Bin("bin3", map); client.put(wpolicy, key, bin1, bin2, bin3); } } } } // convenience function to print all records in a namespace and set // (please note this is not an efficient implementation to scan all records across sets/namespaces. // refer to set-index and scan documentation for additional pointers on this topic.) import com.aerospike.client.Record; import com.aerospike.client.ScanCallback; import com.aerospike.client.policy.ScanPolicy; public class ScanParallel implements ScanCallback { public void scanCallback(Key key, Record record) { System.out.format("\tKey %s: %s\n", key.userKey, record.bins); } } void printRecords() { System.out.println("Records in database:"); for (String ns : Arrays.asList(Namespace1, Namespace2)) { for (String set : Arrays.asList(Set1, Set2)) { System.out.format("Namespace: %s, set: %s: \n", ns, set); client.scanAll(null, ns, set, new ScanParallel()); } } } initializeTestData(); System.out.format("Test data populated.\n");; printRecords(); import com.aerospike.client.policy.Policy; import com.aerospike.client.task.RegisterTask; import com.aerospike.client.Language; import com.aerospike.client.lua.LuaConfig; import com.aerospike.client.lua.LuaCache; LuaConfig.SourceDirectory = "../udf"; String UDFFile = "update_example.lua"; String UDFModule = "update_example"; void registerUDF() { // clear the lua cache LuaCache.clearPackages(); Policy policy = new Policy(); // remove the current module, if any client.removeUdf(null, UDFFile); RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile, UDFFile, Language.LUA); task.waitTillComplete(); System.out.format("Registered the UDF module %s.", UDFFile);; } registerUDF(); import com.aerospike.client.BatchRecord; import com.aerospike.client.BatchResults; import com.aerospike.client.ResultCode; import com.aerospike.client.BatchWrite; import com.aerospike.client.BatchDelete; import com.aerospike.client.BatchUDF; import com.aerospike.client.BatchRead; import com.aerospike.client.policy.BatchPolicy; import com.aerospike.client.policy.BatchDeletePolicy; import com.aerospike.client.Bin; import com.aerospike.client.Key; import com.aerospike.client.Operation; import com.aerospike.client.Record; import com.aerospike.client.Value; import com.aerospike.client.cdt.MapOperation; import com.aerospike.client.cdt.MapPolicy; import com.aerospike.client.cdt.MapReturnType; import com.aerospike.client.cdt.ListReturnType; import com.aerospike.client.exp.Exp; import com.aerospike.client.exp.ListExp; import com.aerospike.client.exp.MapExp; import com.aerospike.client.exp.ExpOperation; import com.aerospike.client.exp.ExpReadFlags; import com.aerospike.client.exp.ExpWriteFlags; import com.aerospike.client.exp.Expression; // start with a clean initialized test data initializeTestData(); // Batch of 8 keys, 2 in each of these namespace/set combinations: // (test, batch-ops), (test, batch-ops2), (test2, batch-ops), (test2, batch-ops2) int NUM_KEYS = 8; Key[] keys = new Key[NUM_KEYS]; for (int i = 0; i < NUM_KEYS/4; i++) { keys[i] = new Key(Namespace1, Set1, KeyPrefix + (i+1)); keys[NUM_KEYS/4+i] = new Key(Namespace1, Set2, KeyPrefix + (i+1)); keys[2*NUM_KEYS/4+i] = new Key(Namespace2, Set1, KeyPrefix + (i+1)); keys[3*NUM_KEYS/4+i] = new Key(Namespace2, Set2, KeyPrefix + (i+1)); } // Perform the following operations on the keys. // 1) Read: get bin1 // 2) Write: increment bin2 by 1 // 3) Read: get bin2 // 4) Write: add a map element (0, 0) to bin3 // 5) Read: get the largest value in the map bin3 // send the multi-key operate batch request BatchResults bresults = client.operate(null, null, keys, Operation.get("bin1"), // Op 1, single bin1 op Operation.add(new Bin("bin2", Value.get(1))), // Op 2, first bin2 op Operation.get("bin2"), // Op 3, second bin2 op MapOperation.put(MapPolicy.Default, "bin3", Value.get(0), Value.get(0)), // Op 4, first bin3 op MapOperation.getByRank("bin3", -1, MapReturnType.VALUE) // Op 5, second bin3 op ); // check if all operations succeeded if (bresults.status) { System.out.println("All batch operations succeeded."); } else { System.out.println("Some batch operations failed."); } // process the BatchResults returned from the batch operation for (int i = 0; i < bresults.records.length; i++) { BatchRecord br = bresults.records[i]; Record rec = br.record; if (br.resultCode == ResultCode.OK) { // check individual key status long bin1Val = rec.getLong("bin1"); // bin1 has one operation, op result directly accessible List bin2Results = rec.getList("bin2"); // bin2 and bin3 have multiple ops; access results through a list List bin3Results = rec.getList("bin3"); // note the result order within each list matches ops order for the bin System.out.format("Result[%d]: key: %s/%s/%s, bin1: %d, bin2: %d, bin3 size: %d, bin3 max val: %d\n", i, br.key.namespace, br.key.setName, br.key.userKey, bin1Val, (long)bin2Results.get(1), (long)bin3Results.get(0), (long)bin3Results.get(1)); } else { // error in individual key's operations System.out.format("Result[%d]: key: %s, error: %s\n", i, br.key, ResultCode.getResultString(br.resultCode)); } } printRecords(); // start with a clean initialized test data initializeTestData(); // create a batch of 8 keys, 2 in each of these namespace/set combinations: // (test, batch-ops), (test, batch-ops2), (test2, batch-ops), (test2, batch-ops2) int NUM_KEYS = 9; // one extra slot for a non-existent key Key[] keys = new Key[NUM_KEYS]; // add a non-existent key 0 to test the error path keys[0] = new Key(Namespace1, Set1, KeyPrefix + 0); // populate valid keys for (int i = 0; i < NUM_KEYS/4; i++) { keys[i+1] = new Key(Namespace1, Set1, KeyPrefix + (i+1)); keys[NUM_KEYS/4+i+1] = new Key(Namespace1, Set2, KeyPrefix + (i+1)); keys[2*NUM_KEYS/4+i+1] = new Key(Namespace2, Set1, KeyPrefix + (i+1)); keys[3*NUM_KEYS/4+i+1] = new Key(Namespace2, Set2, KeyPrefix + (i+1)); } // perform the UDF function "increment_and_get" on the keys. // the function takes the bin name and increment value as parameters. String UDFModule = "update_example"; String UDFFunction = "increment_and_get"; // send the multi-key execute batch request BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault); bPolicy.respondAllKeys = true; // set to true/false and observe effect BatchResults bresults = client.execute(bPolicy, null, keys, UDFModule, UDFFunction, Value.get("bin2"), Value.get(1)); // increment bin2 by 1 // check if all operations succeeded if (bresults.status) { System.out.println("All batch operations succeeded."); } else { System.out.println("Some batch operations failed."); } // process the BatchResults returned from the batch operation for (int i = 0; i < bresults.records.length; i++) { BatchRecord br = bresults.records[i]; Record rec = br.record; if (br.resultCode == ResultCode.OK) { // check individual key status HashMap udfMap = (HashMap)rec.getUDFResult(); // cast udf result to map returned by udf long bin2Val = (long)udfMap.get("bin2"); // extract bin2 value from map // cast to map System.out.format("Result[%d]: key: %s/%s/%s, bin2: %d\n", i, br.key.namespace, br.key.setName, br.key.userKey, bin2Val); } else { // error in individual key's operations System.out.format("Result[%d]: key: %s, error: %s\n", i, br.key, ResultCode.getResultString(br.resultCode)); } } printRecords() // start with a clean initialized test data initializeTestData(); // create a batch of 8 keys, 2 in each of these namespace/set combinations: // (test, batch-ops), (test, batch-ops2), (test2, batch-ops), (test2, batch-ops2) int NUM_KEYS = 9; // one extra slot for a non-existent key Key[] keys = new Key[NUM_KEYS]; // add a non-existent key 0 to test the error path keys[0] = new Key(Namespace1, Set1, KeyPrefix + 0); // add valid keys for (int i = 0; i < NUM_KEYS/4; i++) { keys[i+1] = new Key(Namespace1, Set1, KeyPrefix + (i+1)); keys[NUM_KEYS/4+i+1] = new Key(Namespace1, Set2, KeyPrefix + (i+1)); keys[2*NUM_KEYS/4+i+1] = new Key(Namespace2, Set1, KeyPrefix + (i+1)); keys[3*NUM_KEYS/4+i+1] = new Key(Namespace2, Set2, KeyPrefix + (i+1)); } // send the multi-key delete batch request BatchResults bresults = client.delete(null, null, keys); // check if all operations succeeded if (bresults.status) { System.out.println("All batch operations succeeded."); } else { System.out.println("Some batch operations failed."); } // process the BatchResults returned from the batch operation for (int i = 0; i < bresults.records.length; i++) { BatchRecord br = bresults.records[i]; Record rec = br.record; if (br.resultCode == ResultCode.OK) { // check individual key status System.out.format("Result[%d]: key: %s/%s/%s deleted.\n", i, br.key.namespace, br.key.setName, br.key.userKey); } else { // error in individual key's operations System.out.format("Result[%d]: key: %s, error: %s\n", i, br.key, ResultCode.getResultString(br.resultCode)); } } printRecords() // start with a clean initialized test data initializeTestData(); // batch records array - each batch record holds a key and operations array // a batch record can be BatchRead, BatchWrite, BatchDelete, and BatchUDF, each //. with specific restrictions on allowed operations. List batchRecords = new ArrayList(); // 1. Read only operations with BatchRead. Operation[] ops1 = Operation.array( Operation.get("bin1"), MapOperation.getByKey("bin3", Value.get(1), MapReturnType.VALUE)); batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 1), ops1)); // 2. Read-Write operations with BatchWrite. Operation[] ops2 = Operation.array( Operation.add(new Bin("bin2", Value.get(1))), Operation.get("bin2"), MapOperation.put(MapPolicy.Default, "bin3", Value.get(0), Value.get(0)), Operation.get("bin3")); batchRecords.add(new BatchWrite(new Key(Namespace1, Set1, KeyPrefix + 2), ops2)); // 3. Delete with BatchDelete. batchRecords.add(new BatchDelete(new Key(Namespace1, Set1, KeyPrefix + 3))); // 4. Read-Write-Delete with BatchWrite. Operation[] ops4 = Operation.array( Operation.add(new Bin("bin2", Value.get(1))), Operation.get("bin2"), MapOperation.put(MapPolicy.Default, "bin3", Value.get(0), Value.get(0)), Operation.get("bin3"), Operation.delete()); batchRecords.add(new BatchWrite(new Key(Namespace2, Set1, KeyPrefix + 1), ops4)); // 5. UDF execution with BatchUDF. batchRecords.add(new BatchUDF(new Key(Namespace2, Set1, KeyPrefix + 2), UDFModule, UDFFunction, new Value[]{Value.get("bin2"), Value.get(1)})); // 6. Non-existent key operation. batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 0), ops1)); // key 0 does not exist // execute the batch BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault); bPolicy.respondAllKeys = false; // note key-not-found does not stop batch execution try { boolean status = client.operate(bPolicy, batchRecords); if (status) { System.out.println("All batch operations succeeded."); } else { System.out.println("Some batch operations failed."); } } catch (AerospikeException e) { System.out.format("%s", e); } // get and show results // 1. Read-Only operations with BatchRead. int i = 0; BatchRecord batchRec = batchRecords.get(i); Record rec = batchRec.record; Key key = batchRec.key; if (batchRec.resultCode == ResultCode.OK) { Object v1 = rec.getValue("bin1"); Object v2 = rec.getValue("bin3"); System.out.format("Result[%d]: key %s/%s/%s, bin1: %s, bin3[1]: %s\n", i, key.namespace, key.setName, key.userKey, v1, v2); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 2. Read-Write operations with BatchWrite. i = 1; batchRec = batchRecords.get(i); rec = batchRec.record; key = batchRec.key; if (batchRec.resultCode == ResultCode.OK) { Object v1 = rec.getValue("bin2"); Object v2 = rec.getValue("bin3"); System.out.format("Result[%d]: key %s/%s/%s, bin2 results: %s, bin3 results: %s\n", i, key.namespace, key.setName, key.userKey, v1, v2); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 3. Delete with BatchDelete. i = 2; batchRec = batchRecords.get(i); rec = batchRec.record; key = batchRec.key; if (batchRec.resultCode == ResultCode.OK) { System.out.format("Result[%d]: key %s/%s/%s, deleted.\n", i, key.namespace, key.setName, key.userKey); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 4. Read-Write-Delete with BatchWrite. i = 3; batchRec = batchRecords.get(i); rec = batchRec.record; key = batchRec.key; if (batchRec.resultCode == ResultCode.OK) { Object v1 = rec.getValue("bin2"); Object v2 = rec.getValue("bin3"); System.out.format("Result[%d]: key %s/%s/%s (deleted), bin2 results: %s, bin3 results: %s\n", i, key.namespace, key.setName, key.userKey, v1, v2); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 5. UDF execution with BatchUDF. i = 4; batchRec = batchRecords.get(i); rec = batchRec.record; key = batchRec.key; if (batchRec.resultCode == ResultCode.OK) { HashMap udfMap = (HashMap)rec.getUDFResult(); // cast udf result to map returned by udf long bin2Val = (long)udfMap.get("bin2"); // extract bin2 value from map // cast to map System.out.format("Result[%d]: key %s/%s/%s, bin2: %s\n", i, key.namespace, key.setName, key.userKey, bin2Val); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 6. Non-existent key operation. i = 5; batchRec = batchRecords.get(i); rec = batchRec.record; key = batchRec.key; if (batchRec.resultCode == ResultCode.OK) { Object v1 = rec.getValue("bin1"); Object v2 = rec.getValue("bin3"); System.out.format("Result[%d]: key %s/%s/%s, bin1: %s, bin3[1]: %s\n", i, key.namespace, key.setName, key.userKey, v1, v2); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } printRecords(); // start with a clean initialized test data initializeTestData(); // create a batch of 3 keys in (test, batch-ops) int NUM_KEYS = 3; Key[] keys = new Key[NUM_KEYS]; // add keys for (int i = 0; i < NUM_KEYS; i++) { keys[i] = new Key(Namespace1, Set1, KeyPrefix + (i+1)); } // create write and read expressions // new list = list of values from bin3 map - bin2 value Expression writeExp = Exp.build( ListExp.removeByValue(Exp.intBin("bin2"), MapExp.getByIndexRange(MapReturnType.VALUE, Exp.val(0), Exp.val(100), Exp.mapBin("bin3")))); // min(bin4) - bin1 Expression readExp = Exp.build( Exp.sub( ListExp.getByRank(ListReturnType.VALUE, Exp.Type.INT, Exp.val(0), Exp.listBin("bin4")), Exp.intBin("bin1"))); // send the multi-key operate batch request with write and read expressions BatchResults bresults = client.operate(null, null, keys, ExpOperation.write("bin4", writeExp, ExpWriteFlags.DEFAULT), ExpOperation.read("read-exp", readExp, ExpReadFlags.DEFAULT)); // check if all operations succeeded if (bresults.status) { System.out.println("All batch operations succeeded."); } else { System.out.println("Some batch operations failed."); } // process the BatchResults returned from the batch operation for (int i = 0; i < bresults.records.length; i++) { BatchRecord br = bresults.records[i]; Record rec = br.record; if (br.resultCode == ResultCode.OK) { // check individual key status Object wResult = rec.getValue("bin4"); // get op result for bin4 Object rResult = rec.getValue("read-exp"); // get op result for read-exp System.out.format("Result[%d]: key: %s/%s/%s, write-exp result: %s, read-exp: %s\n", i, br.key.namespace, br.key.setName, br.key.userKey, wResult, rResult); } else { // error in individual key's operations System.out.format("Result[%d]: key: %s, error: %s\n", i, br.key, ResultCode.getResultString(br.resultCode)); } } printRecords(); // start with a clean initialized test data initializeTestData(); // expression filter 5 <= bin2 <= 25 BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault); bPolicy.filterExp = Exp.build( // set the filter in batch policy Exp.and( Exp.ge(Exp.intBin("bin2"), Exp.val(5)), Exp.le(Exp.intBin("bin2"), Exp.val(25)))); // expression filter 15 <= bin2 <= 35 BatchDeletePolicy bdPolicy = new BatchDeletePolicy(client.batchDeletePolicyDefault); bdPolicy.filterExp = Exp.build( // is ignored Exp.and( Exp.ge(Exp.intBin("bin2"), Exp.val(15)), Exp.le(Exp.intBin("bin2"), Exp.val(35)))); // create a batch of 3 keys in (test, batch-ops) int NUM_KEYS = 3; Key[] keys = new Key[NUM_KEYS]; // add keys for (int i = 0; i < NUM_KEYS; i++) { keys[i] = new Key(Namespace1, Set1, KeyPrefix + (i+1)); } // send the multi-key delete batch request BatchResults bresults = client.delete(bPolicy, bdPolicy, keys); // check if all operations succeeded if (bresults.status) { System.out.println("All batch operations succeeded."); } else { System.out.println("Some batch operations failed."); } // process the BatchResults returned from the batch operation for (int i = 0; i < bresults.records.length; i++) { BatchRecord br = bresults.records[i]; Record rec = br.record; if (br.resultCode == ResultCode.OK) { // check individual key status System.out.format("Result[%d]: key: %s/%s/%s deleted.\n", i, br.key.namespace, br.key.setName, br.key.userKey); } else { // error in individual key's operations System.out.format("Result[%d]: key: %s, error: %s\n", i, br.key, ResultCode.getResultString(br.resultCode)); } } // start with a clean initialized test data initializeTestData(); // batch records array - each batch record holds a key and operations array List batchRecords = new ArrayList(); // 1. write+read Operation[] ops1 = Operation.array( Operation.add(new Bin("bin2", Value.get(1))), Operation.get("bin2")); batchRecords.add(new BatchWrite(new Key(Namespace1, Set1, KeyPrefix + 1), ops1)); // 2. read Operation[] ops2 = Operation.array( Operation.get("bin2")); batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 1), ops2)); // 3. UDF write+read batchRecords.add(new BatchUDF(new Key(Namespace1, Set1, KeyPrefix + 1), UDFModule, UDFFunction, new Value[]{Value.get("bin2"), Value.get(1)})); // 4. read Operation[] ops4 = Operation.array( Operation.get("bin2")); batchRecords.add(new BatchRead(new Key(Namespace1, Set1, KeyPrefix + 1), ops4)); // execute the batch BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault); bPolicy.allowInline = false; // set true or false and examine results try { client.operate(bPolicy, batchRecords); } catch (AerospikeException e) { System.out.format("%s", e); } // get and show results // 1. write+read int i = 0; BatchRecord batchRec = batchRecords.get(i); Record rec = batchRec.record; if (batchRec.resultCode == ResultCode.OK) { Object v1 = rec.getValue("bin2"); System.out.format("Result[%d]: bin2: %s\n", i, v1); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 2. read i = 1; batchRec = batchRecords.get(i); rec = batchRec.record; if (batchRec.resultCode == ResultCode.OK) { Object v1 = rec.getValue("bin2"); System.out.format("Result[%d]: bin2: %s\n", i, v1); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 3. UDF write+read i = 2; batchRec = batchRecords.get(i); rec = batchRec.record; if (batchRec.resultCode == ResultCode.OK) { HashMap udfMap = (HashMap)rec.getUDFResult(); // cast udf result to map returned by udf Object v1 = udfMap.get("bin2"); // extract bin2 value from map // cast to map System.out.format("Result[%d]: bin2: %s\n", i, v1); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // 4. read i = 3; batchRec = batchRecords.get(i); rec = batchRec.record; if (batchRec.resultCode == ResultCode.OK) { Object v1 = rec.getValue("bin2"); System.out.format("Result[%d]: bin2: %s\n", i, v1); } else { System.out.format("Result[%d]: error: %s\n", i, ResultCode.getResultString(batchRec.resultCode)); } // start with a clean initialized test data initializeTestData(); // batch records array - each batch record holds a key and operations array List batchRecords = new ArrayList(); // create a batch of 100 - on same record in (test, batch-ops) int NUM_ITERS = 100; // create write and read expressions // increment bin2 by (1 if bin2 == expected value else unknown) int expectedBinVal = 10; for (int i = 0; i < NUM_ITERS; i++) { Expression writeExp = Exp.build( Exp.add(Exp.intBin("bin2"), Exp.cond( Exp.eq(Exp.intBin("bin2"), Exp.val(expectedBinVal)), Exp.val(1), Exp.val(0)))); //Exp.unknown()))); Operation[] ops = Operation.array( ExpOperation.write("bin2", writeExp, ExpWriteFlags.DEFAULT)); batchRecords.add(new BatchWrite(new Key(Namespace1, Set1, KeyPrefix + 1), ops)); expectedBinVal++; } // execute the batch BatchPolicy bPolicy = new BatchPolicy(client.batchPolicyDefault); bPolicy.respondAllKeys = false; bPolicy.allowInline = false; // set true or false and examine results System.out.format("Batch of %d records, with flags allowInline=%b.\n", NUM_ITERS, bPolicy.allowInline); try { client.operate(bPolicy, batchRecords); } catch (AerospikeException e) { System.out.format("%s\n", e); } System.out.println("Done."); printRecords(); client.truncate(null, Namespace1, null, null); client.truncate(null, Namespace2, null, null); client.close(); System.out.println("Removed tutorial data and closed server connection.");