This tutorial describes how SQL change operation equivalents can be performed in Aerospike.
This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the Aerospike Notebooks Repo.
In this notebook, we will see how specific modify statements in SQL can be implemented in Aerospike.
SQL is a widely known data access language. The examples in this notebook provide patterns for implementing specific SQL change equivalents in Aerospike. You should be able to understand them and find them useful even without a deep familiarity with SQL.
This notebook is the third in the SQL Operations series that consists of the following notebooks:
The specific topics and aggregate functions we discuss in this notebook include:
The purpose of this notebook is to illustrate Aerospike implementation for specific SQL operations. Check out Aerospike Presto Connector for ad-hoc SQL access to Aerospike data.
This tutorial assumes familiarity with the following topics:
This notebook requires that Aerospike Database is running.
import io.github.spencerpark.ijava.IJava;
import io.github.spencerpark.jupyter.kernel.magic.common.Shell;
IJava.getKernelInstance().getMagics().registerMagics(Shell.class);
%sh asd
Install the Java client.
%%loadFromPOM
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
The test data has 10 records with user-key "id-1" through "id-10", two integer bins (fields) "bin1" (1-10) and "bin2" (1001-1010) in the namespace "test" and set "sql-update".
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.WritePolicy;
AerospikeClient client = new AerospikeClient("localhost", 3000);
System.out.println("Initialized the client and connected to the cluster.");
String Namespace = "test";
String Set = "sql-update";
Initialized the client and connected to the cluster.
In this tutorial, you will be executing many shell commands including Aerospike tools like asadm and aql in a terminal. Open a terminal tab by selecting File->Open from the notebook menu, and then New->Terminal.
Aerospike is a distributed key-value database with support for the document-oriented data model. An Aerospike Database can hold multiple "namespaces" which are equivalent to databases or schemas in the relational model. A namespace hold records (rows), organized in sets (tables) and are accessed using a unique key that serves as the record id. A record can contain one or more bins (columns), and a bin can hold a value of different data types.
The following simple data types are available in Aerospike.
In addition, a bin can hold Collection Data Type (CDTs) such as List and Map, as well as complex data types such as Geospatial and HyperLogLog.
Sets and records do not conform to a schema. A set can have records with different bins, and a record can have different bins at different times. Moreover a bin is not limited to a specific type, and can hold multiple data types in different records. Type specific operations apply only to the matching bins of the right type.
For a detailed description of the data model see the Data Model overview.
There is no API to create a namespace. A namespace is added through the config and requires a server restart.
Review the config file at /etc/aerospike/aerospike.conf by opening a terminal window through File->Open menu section followed up New->Terminal.
cat /etc/aerospike/aerospike.conf
As you can see, there is one namespace "test" configured in the server.
You can create a simple namespace "create-test" by following the following steps.
cp /etc/aerospike/aerospike.conf ~/notebooks/java/aerospike.conf
namespace create-test { memory-size 1G }
/etc/init.d/aerospike stop; asd --config-file ~/notebooks/java/aerospike.conf
asadm -e "info namespace"
After the new namespace is successfully created, insert a few records by executing the following cell.
// define a convenience functions to examine all records in a namespace and set
import com.aerospike.client.Record;
import com.aerospike.client.ScanCallback;
import com.aerospike.client.policy.ScanPolicy;
public class ScanParallel implements ScanCallback {
public void scanCallback(Key key, Record record) {
System.out.format("namespace=%s set=%s key=%s bins=%s\n", key.namespace, key.setName, key.userKey, record.bins);
}
}
void examineRecords(String ns, String set) {
client.scanAll(null, ns, set, new ScanParallel());
}
// insert records in the new namespace and verify by reading back
String NamespaceCreateTest = "create-test";
String SetCreateTest = "test-set";
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 10; i++) {
Key key = new Key(NamespaceCreateTest, SetCreateTest, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
client.put(wpolicy, key, bin1, bin2);
}
System.out.format("Inserted 10 records in ns=%s set=%s.\n", NamespaceCreateTest, SetCreateTest);
System.out.format("Retrieving all records in ns=%s.\n", NamespaceCreateTest);
examineRecords(NamespaceCreateTest, null); // null value for a set returns all records in the namespace
Inserted 10 records in ns=create-test set=test-set. Retrieving all records in ns=create-test. namespace=create-test set=test-set key=id-10 bins={bin1=10, bin2=1010} namespace=create-test set=test-set key=id-5 bins={bin1=5, bin2=1005} namespace=create-test set=test-set key=id-1 bins={bin1=1, bin2=1001} namespace=create-test set=test-set key=id-7 bins={bin1=7, bin2=1007} namespace=create-test set=test-set key=id-3 bins={bin1=3, bin2=1003} namespace=create-test set=test-set key=id-8 bins={bin1=8, bin2=1008} namespace=create-test set=test-set key=id-9 bins={bin1=9, bin2=1009} namespace=create-test set=test-set key=id-4 bins={bin1=4, bin2=1004} namespace=create-test set=test-set key=id-2 bins={bin1=2, bin2=1002} namespace=create-test set=test-set key=id-6 bins={bin1=6, bin2=1006}
The truncate API removes all records in a set or the entire namespace. (If the namespace has records in a "null" set, those must be removed through iterating over them using a UDF.)
truncate(InfoPolicy policy, String ns, String set, Calendar beforeLastUpdate)
The following cell truncates all sets in the namespace "create-test", and verifies there are no records in the namespace after truncation.
// truncate the namespace
client.truncate(null, NamespaceCreateTest, null, null); // null set value truncates all sets in the namespace
// examine records in the namespace - should be empty
System.out.format("Retrieving all records in namespace %s.\n", NamespaceCreateTest);
examineRecords(NamespaceCreateTest, null); // null set value returns all records in the namespace
System.out.format("Done.");;
Retrieving all records in namespace create-test. Done.
There is no API to delete a namespace. A namespace has one or more dedicated storage devices, and they must be wiped clean to delete the namespace. Since "create-test" is an in-memory namespace, you can remove it from the config and restart the server to delete it. Verify deletion with the asadm command "info namespace".
/etc/init.d/aerospike restart
asadm -e "info namespace"
Any operation on a deleted namespace will return an error.
import com.aerospike.client.AerospikeException;
try {
examineRecords(NamespaceCreateTest, null); // null value for a set returns all records in the namespace
}
catch (AerospikeException ae) {
System.out.format("Error in retrieving records from namespace %s: %s.\n", NamespaceCreateTest, ae.getMessage());
}
Error in retrieving records from namespace create-test: Error 20,1,30000,0,5,BB9020011AC4202 127.0.0.1 3000: Namespace not found.
There is no explicit operation to create a set. A set is created when the first record is inserted in the set.
Create a new set "create-set" in the namespace "test" by inserting a record.
String SetCreateTest = "create-set";
Key key = new Key(Namespace, SetCreateTest, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), 1000+100);
client.put(wpolicy, key, bin1, bin2);
Examine the sets in the namespace by issuing the following command in the terminal tab.
aql -c "show sets"
A set is schemaless, and can hold records that have different schemas or bins. A bin has no type associated with it, and can hold values of any type. Further, there are no constraints on bin values such as not null unique, default, primary/foreign key, and so on. Therefore ALTER operation on a set to modify its schema is not needed.
The following cell populates "create-set" with two records with different schemas.
Key key = new Key(Namespace, SetCreateTest, "id-101");
Bin bin1 = new Bin(new String("bin1"), "string");
Bin bin3 = new Bin(new String("bin3"), "new bin");
client.put(wpolicy, key, bin1, bin3);
Key key = new Key(Namespace, SetCreateTest, "id-102");
Bin bin1 = new Bin(new String("bin1"), 111.222);
Bin bin2 = new Bin(new String("bin2"), 0.1);
Bin bin4 = new Bin(new String("bin4"), "");
client.put(wpolicy, key, bin1, bin2, bin4);
examineRecords(Namespace, SetCreateTest);
namespace=test set=create-set key=id-101 bins={bin1=string, bin3=new bin} namespace=test set=create-set key=id-100 bins={bin1=100, bin2=1100} namespace=test set=create-set key=id-102 bins={bin1=111.222, bin2=0.1, bin4=}
All records in a set can be truncated using the truncate API:
truncate(InfoPolicy policy, String ns, String set, Calendar beforeLastUpdate)
The following truncates all records in set "create-set" in namespace "create-test".
client.truncate(null, Namespace, SetCreateTest, null);
Verify all records in the set have been deleted.
System.out.format("Retrieving all records in set=%s.\n", SetCreateTest);
examineRecords(Namespace, SetCreateTest);
System.out.format("Done.");;
Retrieving all records in set=create-set. Done.
There is no notion of deleting a set as a set is just a name that a record is tagged with. The namespace must be deleted to remove the set name.
An index is created on a bin for a specific value type. Integer, string, and geojson types are currently supported for indexing. An index supports equality and range queries for integer values, equality queries on string values, and containment queries on geojson values.
createIndex(Policy policy, String namespace, String setName, String indexName, String binName, IndexType indexType)
An index can also be created on a collection type value (List and Map) for fast access within the collection by item value (List), or key and range values (Map).
Below we illustrate creating an integer index on bin1.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_sql_update_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Created number index test_sql_update_bin1_number_idx on ns=test set=sql-update bin=bin1.
Examine the new index in the namespace by issuing the following command in the terminal tab.
aql -c "show indexes"
The API dropIndex deletes an index.
dropIndex(Policy policy, String namespace, String setName, String indexName)
client.dropIndex(null, Namespace, Set, IndexName);
System.out.format("Dropped index %s on ns=%s set=%s bin=%s.",
IndexName, Namespace, Set, "bin1");;
Dropped index test_sql_update_bin1_number_idx on ns=test set=sql-update bin=bin1.
Examine the index is dropped by issuing the following command in the terminal tab.
aql -c "show indexes"
INSERT INTO namespace.set VALUES (id=key, bin=value, ...) UPDATE namespace.set SET (bin=value, ...) WHERE id=key DELETE FROM namespace.set WHERE id=key
The put operation handles Create (Insert) and Update. The "record-exists-action" specified within the write-policy defines the operation semantics when the record already exists, with the following variants:
Note, there is no replace operation in SQL.
put(WritePolicy policy, Key key, Bin... bins)
Below we illustrate various record-exists-action options for create/update/replace of a single record.
import com.aerospike.client.policy.RecordExistsAction;
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), 1000+100);
// ensure the record doesn't exist by removing it
try {
client.delete(wpolicy, key);
}
catch (AerospikeException ae) {
// ignore if doesn't exist
}
// update-only must fail since the record doesn't exist
try {
wpolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
client.put(wpolicy, key, bin1, bin2);
System.out.format("update-only succeeded");
}
catch (AerospikeException ae) {
System.out.format("Error on update-only: %s.\n", ae.getMessage());
}
// replace-only must fail since the record doesn't exist
try {
wpolicy.recordExistsAction = RecordExistsAction.REPLACE_ONLY;
client.put(wpolicy, key, bin1, bin2);
System.out.format("replace-only succeeded");
}
catch (AerospikeException ae) {
System.out.format("Error on replace-only: %s.\n", ae.getMessage());
}
// create-only should succeed since the record doesn't exist
try {
wpolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Create-only succeeded: key=%s bins=%s\n", key.userKey, record.bins);
}
catch (AerospikeException ae) {
System.out.format("Error on create-only: %s.\n", ae.getMessage());
}
// update should succeed irrespective of the record's existence
try {
Bin bin3 = new Bin(new String("bin3"), "new bin");
wpolicy.recordExistsAction = RecordExistsAction.UPDATE;
client.put(wpolicy, key, bin3);
Record record = client.get(null, key);
System.out.format("Update succeeded: key=%s bins=%s\n", key.userKey, record.bins);
}
catch (AerospikeException ae) {
System.out.format("Error: %s.\n", ae.getMessage());
}
// replace should succeed irrespective of the record's existence
try {
Bin bin4 = new Bin(new String("bin4"), "another bin");
wpolicy.recordExistsAction = RecordExistsAction.REPLACE;
client.put(wpolicy, key, bin4);
Record record = client.get(null, key);
System.out.format("Replace succeeded: key=%s bins=%s\n", key.userKey, record.bins);
}
catch (AerospikeException ae) {
System.out.format("Error: %s.\n", ae.getMessage());
}
Error on update-only: Error 2,1,30000,0,0,BB9020011AC4202 127.0.0.1 3000: Key not found. Error on replace-only: Error 2,1,30000,0,0,BB9020011AC4202 127.0.0.1 3000: Key not found. Create-only succeeded: key=id-100 bins={bin1=100, bin2=1100} Update succeeded: key=id-100 bins={bin1=100, bin2=1100, bin3=new bin} Replace succeeded: key=id-100 bins={bin4=another bin}
Aerospike allows many type specific update operations. For integer and string types, they include:
UPDATE namespace.set SET (bin = bin + intval) WHERE id=key add(WritePolicy policy, Key key, Bin... bins) UPDATE namespace.set SET (bin = bin + strval) WHERE id=key append(WritePolicy policy, Key key, Bin... bins) UPDATE namespace.set SET (bin = strval + bin) WHERE id=key prepend(WritePolicy policy, Key key, Bin... bins)
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), "John");
wpolicy.recordExistsAction = RecordExistsAction.REPLACE;
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);
// add
bin1 = new Bin(new String("bin1"), 1);
client.add(null, key, bin1);
Record record = client.get(null, key);
System.out.format("After add: key=%s bins=%s\n", key.userKey, record.bins);
// append
bin2 = new Bin(new String("bin2"), " Doe");
client.append(null, key, bin2);
Record record = client.get(null, key);
System.out.format("After append: key=%s bins=%s\n", key.userKey, record.bins);
//prepend
bin2 = new Bin(new String("bin2"), "Mr ");
client.prepend(null, key, bin2);
Record record = client.get(null, key);
System.out.format("After prepend: key=%s bins=%s\n", key.userKey, record.bins);;
Record: key=id-100 bins={bin1=100, bin2=John} After add: key=id-100 bins={bin1=101, bin2=John} After append: key=id-100 bins={bin1=101, bin2=John Doe} After prepend: key=id-100 bins={bin1=101, bin2=Mr John Doe}
In addition, List, Map, Geospatial, HyperLogLog, and Blob types allow a wide range of update operations. Refer to the tutorials on List and Map, and documentation.
DELETE FROM namespace.set WHERE id=key delete(WritePolicy policy, Key key)
Create a record, delete it, and verify deletion.
// create a record
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), 1000+100);
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);
// delete the record
client.delete(wpolicy, key);
System.out.format("Record deleted: key=%s\n", key.userKey);
// verify it's deleted
Record record = client.get(null, key);
if (record == null) {
System.out.format("No record found for key=%s \n", key.userKey);;
}
else {
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);;
}
Record: key=id-100 bins={bin1=100, bin2=1100} Record deleted: key=id-100 No record found for key=id-100
Multiple single record updates are possible through operate.
UPDATE namespace.set SET (bin=value, ...) WHERE id=key operate(WritePolicy policy, Key key, Operation... operations)
Note, the operate function allows read (SELECT) operations to be included in the list, and executes and returns the results in the order they are specified. See this tutorial that illustrates multi-ops.
The following code creates a record with an integer and a string bin, and then in one operate call increments the integer bin, appends to the string bin, and reads the updated record back.
import com.aerospike.client.Operation;
// create a record
Key key = new Key(Namespace, Set, "id-100");
Bin bin1 = new Bin(new String("bin1"), 100);
Bin bin2 = new Bin(new String("bin2"), "John");
client.put(wpolicy, key, bin1, bin2);
Record record = client.get(null, key);
System.out.format("Record: key=%s bins=%s\n", key.userKey, record.bins);
// Add integer, update string, read record in one operate call
bin1 = new Bin(new String("bin1"), 1);
bin2 = new Bin(new String("bin2"), " Doe");
record = client.operate(null, key, Operation.add(bin1), Operation.append(bin2), Operation.get());
System.out.format("Operate results: key=%s bins=%s\n", key.userKey, record.bins);;
Record: key=id-100 bins={bin1=100, bin2=John} Operate results: key=id-100 bins={bin1=101, bin2=John Doe}
UPDATE namespace.set SET (bin=value, ...) WHERE condition DELETE FROM namespace.set WHERE condition
Multi-record updates and deletes are possibly by specifying the WHERE condition using the query filter (specified in a Statement object) and expression filter (specified in the write policy). If both filters are specified, the two are ANDed. The query filter operates on one bin and requires a supporting secondary index to exist on the bin. The remaining parts of the condition must be specified in the expression filter. For best performance, use the most selective condition in a query filter when possible.
Note, there is no API for batch insertion of multiple records.
Updates and deletes can be done in two ways:
The list can be specified in two ways:
Statement::setOps(Operation[] ops] AerospikeClient::execute(WritePolicy policy, Statement statement)
AerospikeClient::execute(WritePolicy policy, Statement statement, Operation[] ops)
Note:
First we will populate data, create secondary index, and define a convenience function, and then illustrate how to execute bin updates using the two methods.
// populate data, create secondary index, and define a convenience function
// add records with keys "id-1" to "id-10" and
// bins bin1 (integer values 1-10) and bin2 (integer values 1000-1010).
WritePolicy wpolicy = new WritePolicy();
wpolicy.sendKey = true;
for (int i = 1; i <= 10; i++) {
Key key = new Key(Namespace, Set, "id-"+i);
Bin bin1 = new Bin(new String("bin1"), i);
Bin bin2 = new Bin(new String("bin2"), 1000+i);
client.put(wpolicy, key, bin1, bin2);
}
System.out.format("Test data populated.\n");;
// create an integer secondary index on bin1
import com.aerospike.client.policy.Policy;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.task.IndexTask;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.ResultCode;
String IndexName = "test_sql_update_bin1_number_idx";
Policy policy = new Policy();
policy.socketTimeout = 0; // Do not timeout on index create.
try {
IndexTask task = client.createIndex(policy, Namespace, Set, IndexName,
"bin1", IndexType.NUMERIC);
task.waitTillComplete();
}
catch (AerospikeException ae) {
if (ae.getResultCode() != ResultCode.INDEX_ALREADY_EXISTS) {
throw ae;
}
}
System.out.format("Created number index %s on ns=%s set=%s bin=%s.\n",
IndexName, Namespace, Set, "bin1");;
// define convenience functions
import com.aerospike.client.query.Statement;
import com.aerospike.client.query.Filter;
import com.aerospike.client.Record;
void printRecordRange(String header, String namespace, String set, String bin, int rangeLow, int rangeHigh) {
System.out.format("%s\n", header);
Statement stmt = new Statement();
stmt.setNamespace(namespace);
stmt.setSetName(set);
stmt.setFilter(Filter.range(bin, rangeLow, rangeHigh));
RecordSet rs = client.query(null, stmt);
while (rs.next()) {
Key key = rs.getKey();
Record record = rs.getRecord();
System.out.format("key=%s bins=%s\n", key.userKey, record.bins);
}
rs.close();
}
Test data populated. Created number index test_sql_update_bin1_number_idx on ns=test set=sql-update bin=bin1.
import com.aerospike.client.Value;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.exp.Exp;
import com.aerospike.client.Operation;
import com.aerospike.client.task.ExecuteTask;
// 1. list of updates specified in Statement
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setFilter(Filter.range("bin1", 4, 7));
Operation ops[] = { new Operation(Operation.Type.ADD, "bin2", Value.get(1)),
Operation.put(new Bin("bin3", "new1"))};
stmt.setOperations(ops);
ExecuteTask task = client.execute(null, stmt);
task.waitTillComplete(3000, 3000);
printRecordRange("After executing updates in Statement with filter:", Namespace, Set, "bin1", 4, 7);
// 2. list of updates as argument to execute
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setFilter(Filter.range("bin1", 4, 7));
ExecuteTask task = client.execute(null, stmt, new Operation(Operation.Type.ADD, "bin2", Value.get(1)),
Operation.put(new Bin("bin3", "new2")));
task.waitTillComplete(3000, 3000);
printRecordRange("After executing updates in Ops list argument:", Namespace, Set, "bin1", 4, 7);
// 3. updates in Statement as well as in the execute argument - Statement operations are ignored
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
stmt.setFilter(Filter.range("bin1", 4, 7));
Operation ops[] = { new Operation(Operation.Type.ADD, "bin2", Value.get(1))};
stmt.setOperations(ops);
WritePolicy policy = new WritePolicy();
policy.maxRetries = 0;
ExecuteTask task = client.execute(policy, stmt, Operation.put(new Bin("bin3", "new3")));
task.waitTillComplete(3000, 3000);
printRecordRange("After executing updates in Statement and Ops list argument (Statement updates are ignored):",
Namespace, Set, "bin1", 4, 7);
initial state: key=id-4 bins={bin1=4, bin2=1004} key=id-5 bins={bin1=5, bin2=1005} key=id-6 bins={bin1=6, bin2=1006} key=id-7 bins={bin1=7, bin2=1007} After executing updates in Statement with filter: key=id-4 bins={bin1=4, bin2=1005, bin3=new1} key=id-5 bins={bin1=5, bin2=1006, bin3=new1} key=id-6 bins={bin1=6, bin2=1007, bin3=new1} key=id-7 bins={bin1=7, bin2=1008, bin3=new1} initial state: key=id-4 bins={bin1=4, bin2=1005, bin3=new1} key=id-5 bins={bin1=5, bin2=1006, bin3=new1} key=id-6 bins={bin1=6, bin2=1007, bin3=new1} key=id-7 bins={bin1=7, bin2=1008, bin3=new1} After executing updates in Ops list argument: key=id-4 bins={bin1=4, bin2=1006, bin3=new2} key=id-5 bins={bin1=5, bin2=1007, bin3=new2} key=id-6 bins={bin1=6, bin2=1008, bin3=new2} key=id-7 bins={bin1=7, bin2=1009, bin3=new2} initial state: key=id-4 bins={bin1=4, bin2=1006, bin3=new2} key=id-5 bins={bin1=5, bin2=1007, bin3=new2} key=id-6 bins={bin1=6, bin2=1008, bin3=new2} key=id-7 bins={bin1=7, bin2=1009, bin3=new2} After executing updates in Statement and Ops list argument (Statement updates are ignored): key=id-4 bins={bin1=4, bin2=1006, bin3=new3} key=id-5 bins={bin1=5, bin2=1007, bin3=new3} key=id-6 bins={bin1=6, bin2=1008, bin3=new3} key=id-7 bins={bin1=7, bin2=1009, bin3=new3}
A record oriented User Defined Function (UDF) can be used for a single record or multi-record updates.
UPDATE namespace.set SET (bin=fn(args), ...) WHERE condition Single Record: Object execute(WritePolicy policy, Key key, String packageName, String functionName, Value... functionArgs) Multi-Record: ExecuteTask execute(WritePolicy policy, Statement statement, String packageName, String functionName, Value... functionArgs)
A multi-record update is illustrated below. The Statement object is used to specify a predicate or query filter to identify records. Any updates specified in Statement are ignored. The WHERE condition can also be specified using an expression filter as shown below.
Note:
Examine the following Lua code that updates two bins by adding and appending the values provided.
-- update the specified bins by adding and appending the values provided function add_append(rec, binName1, addVal, binName2, appendVal) rec[binName1] = rec[binName1] + addVal rec[binName2] = rec[binName2] .. appendVal aerospike:update(rec) end
Note, the UDF function is expected to be in "update_example.lua" file under "udf" directory.
Register the UDF with the server by executing the following code cell. The function invalidates the cache, removes the currently registered module, and registers the latest version.
import com.aerospike.client.policy.Policy;
import com.aerospike.client.task.RegisterTask;
import com.aerospike.client.Language;
import com.aerospike.client.lua.LuaConfig;
import com.aerospike.client.lua.LuaCache;
LuaConfig.SourceDirectory = "../udf";
String UDFFile = "update_example.lua";
String UDFModule = "update_example";
void registerUDF() {
// clear the lua cache
LuaCache.clearPackages();
Policy policy = new Policy();
// remove the current module, if any
client.removeUdf(null, UDFFile);
RegisterTask task = client.register(policy, LuaConfig.SourceDirectory+"/"+UDFFile,
UDFFile, Language.LUA);
task.waitTillComplete();
System.out.format("Registered the UDF module %s.", UDFFile);;
}
registerUDF();
Registered the UDF module update_example.lua.
Run the UDF on multiple records using an expression filter to specify records with bin1 values in the range 4-7 (both inclusive).
printRecordRange("initial state:", Namespace, Set, "bin1", 4, 7);
Statement stmt = new Statement();
stmt.setNamespace(Namespace);
stmt.setSetName(Set);
WritePolicy policy = new WritePolicy();
policy.filterExp = Exp.build(
Exp.and(
Exp.ge(Exp.intBin("bin1"), Exp.val(4)),
Exp.le(Exp.intBin("bin1"), Exp.val(7))));
policy.maxRetries = 0;
ExecuteTask task = client.execute(policy, stmt, UDFModule, "add_append",
Value.get("bin2"), Value.get(1), Value.get("bin3"), Value.get("!!!"));
task.waitTillComplete(3000, 3000);
printRecordRange("After executing UDF add_append:",
Namespace, Set, "bin1", 4, 7);
initial state: key=id-4 bins={bin1=4, bin2=1006, bin3=new3} key=id-5 bins={bin1=5, bin2=1007, bin3=new3} key=id-6 bins={bin1=6, bin2=1008, bin3=new3} key=id-7 bins={bin1=7, bin2=1009, bin3=new3} After executing UDF add_append: key=id-4 bins={bin1=4, bin2=1007, bin3=new3!!!} key=id-5 bins={bin1=5, bin2=1008, bin3=new3!!!} key=id-6 bins={bin1=6, bin2=1009, bin3=new3!!!} key=id-7 bins={bin1=7, bin2=1010, bin3=new3!!!}
Updates by default are applied in memory, but can be made more durable or persistent by specifying "commit-to-device" option in the namespace configuration.
A record by default is created never to expire, but a different time-to-live (ttl) can be specified. An expired record is automatically removed and its space reclaimed, thus relieving the lifecycle management burden in applications where it makes sense.
Use of collection data types (CDTs) like List and Map to store objects or records is a common pattern in Aerospike and is recommended for performance. Refer to this tutorial on modeling with lists.
Many developers that are familiar with SQL would like to see how SQL operations translate to Aerospike. We looked at how to implement various modification statements. This should be generally useful irrespective of the reader's SQL knowledge. While the examples here use synchronous execution, many operations can also be performed asynchronously.
Remove tutorial data and close connection.
client.dropIndex(null, Namespace, Set, IndexName);
client.truncate(null, Namespace, null, null);
client.close();
System.out.println("Removed tutorial data and closed server connection.");
Removed tutorial data and closed server connection.
Here are some links for further exploration
Resources
Visit Aerospike notebooks repo to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload.