import io.github.spencerpark.ijava.IJava; import io.github.spencerpark.jupyter.kernel.magic.common.Shell; IJava.getKernelInstance().getMagics().registerMagics(Shell.class); %sh asd %%loadFromPOM com.aerospike aerospike-client 5.0.0 io.netty netty-all 4.1.53.Final compile final String Namespace = "test"; final String Set = "async-ops"; // truncate data, close client and event loops - called multiple times to initialize with different options // described in greater detail later void Cleanup() { try { client.truncate(null, Namespace, Set, null); } catch (AerospikeException e) { // ignore } client.close(); eventLoops.close(); }; import com.aerospike.client.async.EventPolicy; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import com.aerospike.client.async.NettyEventLoops; import com.aerospike.client.async.EventLoops; import com.aerospike.client.async.NioEventLoops; enum EventLoopType{DIRECT_NIO, NETTY_NIO, NETTY_EPOLL}; // a function to create event loops with specified parameters EventLoops InitializeEventLoops(EventLoopType eventLoopType, int numLoops, int commandsPerEventLoop, int maxCommandsInQueue) { EventPolicy eventPolicy = new EventPolicy(); eventPolicy.maxCommandsInProcess = commandsPerEventLoop; eventPolicy.maxCommandsInQueue = maxCommandsInQueue; EventLoops eventLoops = null; switch(eventLoopType) { case DIRECT_NIO: eventLoops = new NioEventLoops(eventPolicy, numLoops); break; case NETTY_NIO: NioEventLoopGroup nioGroup = new NioEventLoopGroup(numLoops); eventLoops = new NettyEventLoops(eventPolicy, nioGroup); break; case NETTY_EPOLL: EpollEventLoopGroup epollGroup = new EpollEventLoopGroup(numLoops); eventLoops = new NettyEventLoops(eventPolicy, epollGroup); break; default: System.out.println("Error: Invalid event loop type"); } return eventLoops; } // initialize event loops final int NumLoops = 2; final int CommandsPerEventLoop = 50; final int DelayQueueSize = 50; EventLoops eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, NumLoops, CommandsPerEventLoop, DelayQueueSize); System.out.format("Event loops initialized with num-loops: %s, commands-per-event-loop: %s, delay-queue-size: %s.\n", NumLoops, CommandsPerEventLoop, DelayQueueSize);; import com.aerospike.client.policy.ClientPolicy; import com.aerospike.client.Host; import com.aerospike.client.AerospikeClient; // a function to initialize the client with specified parameters AerospikeClient InitializeClient(EventLoops eventLoops, int numLoops, int commandsPerEventLoop, Host[] hosts) { ClientPolicy clientPolicy = new ClientPolicy(); clientPolicy.eventLoops = eventLoops; int concurrentMax = commandsPerEventLoop * numLoops; if (clientPolicy.maxConnsPerNode < concurrentMax) { clientPolicy.maxConnsPerNode = concurrentMax; } AerospikeClient client = new AerospikeClient(clientPolicy, hosts); return client; } // initialize the client Host[] hosts = Host.parseHosts("localhost", 3000); AerospikeClient client = InitializeClient(eventLoops, NumLoops, CommandsPerEventLoop, hosts); System.out.print("Client initialized.\n"); import com.aerospike.client.async.Throttles; // creates event loop throttles with specified parameters Throttles InitializeThrottles(int numLoops, int commandsPerEventLoop) { Throttles throttles = new Throttles(numLoops, commandsPerEventLoop); return throttles; } // initialize event loop throttles Throttles throttles = InitializeThrottles(NumLoops, CommandsPerEventLoop); System.out.format("Throttles initialized for %s loops with %s concurrent operations per loop.\n", NumLoops, CommandsPerEventLoop); // initialize the atomic integer to keep track of async operations count import java.util.concurrent.atomic.AtomicInteger; AtomicInteger asyncOpCount = new AtomicInteger(); System.out.format("Atomic operation count initialized.");; import com.aerospike.client.Key; import com.aerospike.client.listener.WriteListener; import com.aerospike.client.async.Monitor; import com.aerospike.client.AerospikeException; // write listener // - implements success and failure handlers // - releases a slot on success or failure for another insert to proceed // - signals completion through monitor on failure or when the write count reaches the expected final count // - prints progress every "progressFreq" records*/ class MyWriteListener implements WriteListener { private final Key key; private final int eventLoopIndex; private final int finalCount; private Monitor monitor; private final int progressFreq; public MyWriteListener(Key key, int eventLoopIndex, int finalCount, Monitor monitor, int progressFreq) { this.key = key; this.eventLoopIndex = eventLoopIndex; this.finalCount = finalCount; this.monitor = monitor; this.progressFreq = progressFreq; } // Write success callback. public void onSuccess(Key key) { // Write succeeded. throttles.addSlot(eventLoopIndex, 1); int currentCount = asyncOpCount.incrementAndGet(); if ( progressFreq > 0 && currentCount % progressFreq == 0) { System.out.format("Inserted %s records.\n", currentCount); } if (currentCount == finalCount) { monitor.notifyComplete(); } } // Error callback. public void onFailure(AerospikeException e) { System.out.format("Put failed: namespace=%s set=%s key=%s exception=%s\n", key.namespace, key.setName, key.userKey, e.getMessage()); monitor.notifyComplete(); } } System.out.print("Write listener defined."); import java.util.concurrent.TimeUnit; import com.aerospike.client.Bin; import com.aerospike.client.policy.WritePolicy; import com.aerospike.client.async.EventLoop; long InsertRecords(int numRecords, EventLoops eventLoops, Throttles throttles, int progressFreq) { long startTime = System.nanoTime(); Monitor monitor = new Monitor(); asyncOpCount.set(0); WritePolicy policy = new WritePolicy(); for (int i = 0; i < numRecords; i++) { Key key = new Key(Namespace, Set, "id-"+i); Bin bin1 = new Bin(new String("bin1"), i); Bin bin2 = new Bin(new String("bin2"), numRecords*10+i); EventLoop eventLoop = eventLoops.next(); int eventLoopIndex = eventLoop.getIndex(); if (throttles.waitForSlot(eventLoopIndex, 1)) { try { client.put(eventLoop, new MyWriteListener(key, eventLoopIndex, numRecords, monitor, progressFreq), policy, key, bin1, bin2); } catch (Exception e) { throttles.addSlot(eventLoopIndex, 1); } } } monitor.waitTillComplete(); long endTime = System.nanoTime(); return (endTime - startTime); } final int NumRecords = 100000; long elapsedTime = InsertRecords(NumRecords, eventLoops, throttles, NumRecords/4); System.out.format("Inserted %s records with %s event-loops and %s commands-per-loop in %s milliseconds.\n", NumRecords, NumLoops, CommandsPerEventLoop, elapsedTime/1000000);; // truncates database and closes client and event-loops Cleanup(); System.out.println("Removed data and closed client and event loops."); import com.aerospike.client.policy.ScanPolicy; import com.aerospike.client.listener.RecordSequenceListener; import com.aerospike.client.Record; import com.aerospike.client.exp.Exp; // Scan callback class ScanRecordSequenceListener implements RecordSequenceListener { private EventLoops eventLoops; private Throttles throttles; private Monitor scanMonitor; private AtomicInteger writeCount = new AtomicInteger(); private int scanCount = 0; private final int progressFreq; public ScanRecordSequenceListener(EventLoops eventLoops, Throttles throttles, Monitor scanMonitor, int progressFreq) { this.eventLoops = eventLoops; this.throttles = throttles; this.scanMonitor = scanMonitor; this.progressFreq = progressFreq; } public void onRecord(Key key, Record record) throws AerospikeException { ++scanCount; if ( progressFreq > 0 && scanCount % progressFreq == 0) { System.out.format("Scan returned %s records.\n", scanCount); } // submit async update operation with throttle EventLoop eventLoop = eventLoops.next(); int eventLoopIndex = eventLoop.getIndex(); if (throttles.waitForSlot(eventLoopIndex, 1)) { // throttle by waiting for an available slot try { WritePolicy policy = new WritePolicy(); Bin bin2 = new Bin(new String("bin2"), 1); client.add(eventLoop, new WriteListener() { // inline write listener public void onSuccess(final Key key) { // Write succeeded. throttles.addSlot(eventLoopIndex, 1); int currentCount = writeCount.incrementAndGet(); if ( progressFreq > 0 && currentCount % progressFreq == 0) { System.out.format("Processed %s records.\n", currentCount); } } public void onFailure(AerospikeException e) { System.out.format("Put failed: namespace=%s set=%s key=%s exception=%s\n", key.namespace, key.setName, key.userKey, e.getMessage()); throttles.addSlot(eventLoopIndex, 1); int currentCount = writeCount.incrementAndGet(); if ( progressFreq > 0 && currentCount % progressFreq == 0) { System.out.format("Processed %s records.\n", currentCount); } } }, policy, key, bin2); } catch (Exception e) { System.out.format("Error: exception in write listener - %s", e.getMessage()); } } } public void onSuccess() { if (scanCount != writeCount.get()) { // give the last write some time to finish try { Thread.sleep(100); } catch(InterruptedException e) { System.out.format("Error: exception - %s", e); } } scanMonitor.notifyComplete(); } public void onFailure(AerospikeException e) { System.out.format("Error: scan failed with exception - %s", e); scanMonitor.notifyComplete(); } } // cleanup prior state Cleanup(); // initialize data, event loops and client int numRecords = 100000; int numLoops = 2; int commandsPerLoop = 25; int delayQueueSize = 0; eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, numLoops, commandsPerLoop, delayQueueSize); client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts); throttles = InitializeThrottles(numLoops, commandsPerLoop); InsertRecords(numRecords, eventLoops, throttles, 0); System.out.format("Inserted %s records.\n", numRecords); EventLoop eventLoop = eventLoops.next(); Monitor scanMonitor = new Monitor(); int progressFreq = 100; // issue async scan that in turn issues async update on each returned record ScanPolicy policy = new ScanPolicy(); policy.filterExp = Exp.build( Exp.and( Exp.le(Exp.intBin("bin1"), Exp.val(1000)), Exp.ge(Exp.intBin("bin1"), Exp.val(1)))); client.scanAll(eventLoop, new ScanRecordSequenceListener(eventLoops, throttles, scanMonitor, progressFreq), policy, Namespace, Set); scanMonitor.waitTillComplete(); System.out.format("Done: nested async scan and update");; // clean up the current state Cleanup(); // initialize data, event loops and client int numRecords = 100; int numLoops = 2; int commandsPerLoop = 25; int delayQueueSize = 20; int noThrottle = 10000; //effectively no throttle eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, numLoops, commandsPerLoop, delayQueueSize); client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts); throttles = InitializeThrottles(numLoops, noThrottle); // attempt to insert records above the available slots and delay queue capacity long elapsedTime = InsertRecords(numRecords, eventLoops, throttles, 0); System.out.format("%s ops/ms with event-loops: %s and commands-per-loop: %s.\n", numRecords/(elapsedTime/1000000), numLoops, commandsPerLoop);; // Throughput with parameterized async insertion int numRecords = 100000; EventLoopType[] eventLoopOptions = {EventLoopType.DIRECT_NIO, EventLoopType.NETTY_NIO, EventLoopType.NETTY_EPOLL}; int[] numLoopsOptions = {2, 4, 8}; int[] commandsPerLoopOptions = {50, 100, 200}; for (EventLoopType eventLoopType: eventLoopOptions) { for (int numLoops: numLoopsOptions) { for (int commandsPerLoop: commandsPerLoopOptions) { Cleanup(); eventLoops = InitializeEventLoops(eventLoopType, numLoops, commandsPerLoop, 0); client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts); throttles = InitializeThrottles(numLoops, commandsPerLoop); long elapsedTime = InsertRecords(numRecords, eventLoops, throttles, 0); System.out.format("%s ops/ms with %s %s event-loops and %s commands-per-loop.\n", numRecords/(elapsedTime/1000000), numLoops, eventLoopType, commandsPerLoop); } } } System.out.println("Done.");; Cleanup(); System.out.println("Removed tutorial data and closed server connection.");