%%init_spark
launcher.jars = ["aerospike-spark-assembly-3.0.1.jar"]
launcher.master = "local[*]"
//Specify the Seed Host of the Aerospike Server
val AS_HOST = "172.16.39.141:3000"
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
Aerospike is schemaless, however spark adher to schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types.
To infer the schema, the connector samples a set of records (configurable through aerospike.schema.scan
) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records.
Note that __key
was not part of provided schema. So how can one query using __key
? We can just add __key
in provided schema with appropriate type. Similarly we can add __gen
or __ttl
etc.
val schemaWithPK: StructType = new StructType(Array(
StructField("__key",IntegerType, nullable = false),
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("salary",IntegerType, nullable = false)))
We recommend that you provide schema for queries that involve complex data types such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.
Spark assumes that the underlying data store (Aerospike in this case) follows a strict schema for all the records within a table. However, Aerospike is a No-SQL DB and is schemaless. Hence a single bin (mapped to a column ) within a set ( mapped to a table ) could technically hold values of multiple Aerospike supported types. The Spark connector reconciles this incompatibility with help of certain rules. Please choose the configuration that suits your use case. The strict configuration (aerospike.schema.flexible = false ) could be used when you have modeled your data in Aerospike to adhere to a strict schema i.e. each record within the set has the same schema.
If none of the column types in the user-specified schema match the bin types of a record in Aerospike, a record with NULLs is returned in the result set.
Please use the filter() in Spark to filter out NULL records. For e.g. df.filter("gender == NULL").show(false), where df is a dataframe and gender is a field that was not specified in the user-specified schema.
If the above mismatch is limited to fewer columns in the user-specified schema then NULL would be returned for those columns in the result set. Note: there is no way to tell apart a NULL due to missing value in the original data set and the NULL due to mismatch, at this point. Hence, the user would have to treat all NULLs as missing values. The columns that are not a part of the schema will be automatically filtered out in the result set by the connector.
Please note that if any field is set to NOT nullable i.e. nullable = false, your query will error out if there’s a type mismatch between an Aerospike bin and the column type specified in the user-specified schema.
import com.aerospike.client.policy.WritePolicy
import com.aerospike.spark.sql.AerospikeConnection
import org.apache.spark.sql.SparkSession
import com.aerospike.client.{AerospikeClient, AerospikeException, Bin, Key}
val conf = sc.getConf.clone();
conf.set("aerospike.seedhost" , AS_HOST)
conf.set("aerospike.schema.flexible" , "true") //by default it is always true
val client = AerospikeConnection.getClient(conf)
val flexsetname = "flexschema"
val wp = new WritePolicy()
wp.expiration = 6000 // expire data in 10 minutes
for (i <- 1 to 100) {
val key = new Key("test", flexsetname, i)
client.delete(null, key )
if( i %2 ==0){
client.put(wp, key, new Bin("one", i.toInt), new Bin("two", i.toInt))
}else{
client.put(wp, key, new Bin("one", i.toInt), new Bin("two", i.toString))
}
}
conf.set("aerospike.keyPath", "/etc/aerospike/features.conf")
conf.set("aerospike.namespace", "test")
spark.close()
val spark2= SparkSession.builder().config(conf).master("local[2]").getOrCreate()
val flexibleSchema= StructType (
Seq(
StructField("one", IntegerType, true ),
StructField("two", IntegerType, true )
)
)
spark2.read.format("aerospike").schema(flexibleSchema).option("aerospike.set", flexsetname).load().show()
//Please note that, in case of type mismatch all columns with odd value of `one`(which had string type) is set to null
If a mismatch between the user-specified schema and the schema of a record in Aerospike is detected at the bin/column level, your query will error out.
//When strict matching is set, we will get an exception due to type mismatch with schema provided.
import scala.util.Try
val df = Try{
spark2.sqlContext.read.
format("aerospike").
schema(flexibleSchema).
option("aerospike.schema.flexible", "false").
option("aerospike.set", flexsetname).
load().show()
}
//Create test data
val num_records=1000
val rand = scala.util.Random
//schema of input data
// val spark = SparkSession.builder().config(strictConf).master("local[*]").getOrCreate()
val schema: StructType = new StructType(
Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("salary",IntegerType, nullable = false)
))
val inputDF = {
val inputBuf= new ArrayBuffer[Row]()
for ( i <- 1 to num_records){
val name = "name" + i
val age = i%100
val salary = 50000 + rand.nextInt(50000)
val id = i
val r = Row(id, name, age,salary)
inputBuf.append(r)
}
val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq)
spark2.createDataFrame(inputRDD,schema)
}
inputDF.show(10)
//Write the Sample Data to Aerospike
inputDF.write.mode(SaveMode.Overwrite)
.format("aerospike") //aerospike specific format
.option("aerospike.writeset", "scala_input_data") //write to this set
.option("aerospike.updateByKey", "id") //indicates which columns should be used for construction of primary key
.save()
/*
Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column
using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d
like to be the Primary key, while loading data from the DB.
*/
val insertDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.schema(schema)
.option("aerospike.updateByKey", "id") //required for sql inserts
.option("aerospike.set", "scala_input_data")
.load()
val sqlView="inserttable"
insertDFWithSchema.createOrReplaceTempView(sqlView)
//
//V2 datasource doesn't allow insert into a view.
//
// spark.sql(s"insert into $sqlView values (20000, 'insert_record1', 200, 23000), (20001, 'insert_record2', 201, 23001)")
// spark
// .sqlContext
// .read
// .format("aerospike")
// .schema(schema)
// .option("aerospike.seedhost",AS_HOST)
// .option("aerospike.featurekey", "/etc/aerospike/features.conf")
// .option ("aerospike.namespace", "test")
// .option("aerospike.set", "input_data").load.where("id >2000").show()
spark2.sql(s"select * from $sqlView").show()
// Create a Spark DataFrame by using the Connector Schema inference mechanism
val loadedDFWithoutSchema=spark2
.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data") //read the data from this set
.load
loadedDFWithoutSchema.printSchema()
//Notice that schema of loaded data has some additional fields.
// When connector infers schema, it also adds internal metadata.
//Data can be loaded with known schema as well.
val loadedDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.schema(schema)
.option("aerospike.set", "scala_input_data").load
loadedDFWithSchema.show(5)
val complex_data_json="resources/nested_data.json"
val alias= StructType(List(
StructField("first_name",StringType, false),
StructField("last_name",StringType, false)))
val name= StructType(List(
StructField("first_name",StringType, false),
StructField("aliases",ArrayType(alias), false )
))
val street_adress= StructType(List(
StructField("street_name", StringType, false),
StructField("apt_number" , IntegerType, false)))
val address = StructType( List(
StructField ("zip" , LongType, false),
StructField("street", street_adress, false),
StructField("city", StringType, false)))
val workHistory = StructType(List(
StructField ("company_name" , StringType, false),
StructField( "company_address" , address, false),
StructField("worked_from", StringType, false)))
val person= StructType ( List(
StructField("name" , name, false, Metadata.empty),
StructField("SSN", StringType, false,Metadata.empty),
StructField("home_address", ArrayType(address), false),
StructField("work_history", ArrayType(workHistory), false)))
val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json)
cmplx_data_with_schema.printSchema()
cmplx_data_with_schema.write.mode(SaveMode.Overwrite)
.format("aerospike") //aerospike specific format
.option("aerospike.seedhost", AS_HOST) //db hostname, can be added multiple hosts, delimited with ":"
.option("aerospike.namespace", "test") //use this namespace
.option("aerospike.writeset", "scala_complex_input_data") //write to this set
.option("aerospike.updateByKey", "name.first_name") //indicates which columns should be used for construction of primary key
.save()
val loadedComplexDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_complex_input_data") //read the data from this set
.schema(person)
.load
loadedComplexDFWithSchema.printSchema()
//Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types.
__key
with, with no OR
between two bins.In case of batchget queries we can also apply filters upon metadata columns like __gen
or __ttl
etc. To do so, these columns should be exposed through schema (if schema provided).
val batchGet1= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when schema is not provided.
.option("aerospike.log.level", "debug")
.load.where("__key = 829")
batchGet1.show()
//Please be aware Aerospike database supports only equality test with PKs in primary key query.
//So, a where clause with "__key >10", would result in scan query!
//In this query we are doing *OR* between PK subqueries
val somePrimaryKeys= 1.to(10).toSeq
val someMoreKeys= 12.to(14).toSeq
val batchGet2= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.
.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("__key") isin (someMoreKeys:_*) ))
batchGet2.show(5)
//We should got in total 13 records.
val somePrimaryKeys= 1.to(10).toSeq
val scanQuery1= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.
.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("age") >50 ))
scanQuery1.show()
//Since there is OR between PKs and Bin. It will be treated as Scan query.
//Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored.
//Find all people who have atleast 5 jobs in past.
loadedComplexDFWithSchema
.withColumn("past_jobs", col("work_history.company_name"))
.withColumn("num_jobs", size(col("past_jobs")))
.where(col("num_jobs") >4).show()
aerospike.partition.factor: number of logical aerospike partitions [0-15] aerospike.maxthreadcount : maximum number of threads to use for writing data into Aerospike aerospike.compression : compression of java client-server communication aerospike.batchMax : maximum number of records per read request (default 5000) aerospike.recordspersecond : same as java client
aerospike.keyType : Primary key type hint for schema inference. Always set it properly if primary key type is not string
See https://www.aerospike.com/docs/connect/processing/spark/reference.html for detailed description of the above properties