%%init_spark launcher.jars = ["aerospike-spark-assembly-3.0.1.jar"] launcher.master = "local[*]" //Specify the Seed Host of the Aerospike Server val AS_HOST = "172.16.39.141:3000" import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SaveMode import com.aerospike.client.policy.WritePolicy import com.aerospike.spark.sql.AerospikeConnection import org.apache.spark.sql.SparkSession import com.aerospike.client.{AerospikeClient, AerospikeException, Bin, Key} val conf = sc.getConf.clone(); conf.set("aerospike.seedhost" , AS_HOST) conf.set("aerospike.schema.flexible" , "true") //by default it is always true val client = AerospikeConnection.getClient(conf) val flexsetname = "flexschema" val wp = new WritePolicy() wp.expiration = 6000 // expire data in 10 minutes for (i <- 1 to 100) { val key = new Key("test", flexsetname, i) client.delete(null, key ) if( i %2 ==0){ client.put(wp, key, new Bin("one", i.toInt), new Bin("two", i.toInt)) }else{ client.put(wp, key, new Bin("one", i.toInt), new Bin("two", i.toString)) } } conf.set("aerospike.keyPath", "/etc/aerospike/features.conf") conf.set("aerospike.namespace", "test") spark.close() val spark2= SparkSession.builder().config(conf).master("local[2]").getOrCreate() val flexibleSchema= StructType ( Seq( StructField("one", IntegerType, true ), StructField("two", IntegerType, true ) ) ) spark2.read.format("aerospike").schema(flexibleSchema).option("aerospike.set", flexsetname).load().show() //Please note that, in case of type mismatch all columns with odd value of `one`(which had string type) is set to null //When strict matching is set, we will get an exception due to type mismatch with schema provided. import scala.util.Try val df = Try{ spark2.sqlContext.read. format("aerospike"). schema(flexibleSchema). option("aerospike.schema.flexible", "false"). option("aerospike.set", flexsetname). load().show() } //Create test data val num_records=1000 val rand = scala.util.Random //schema of input data // val spark = SparkSession.builder().config(strictConf).master("local[*]").getOrCreate() val schema: StructType = new StructType( Array( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = false), StructField("age", IntegerType, nullable = false), StructField("salary",IntegerType, nullable = false) )) val inputDF = { val inputBuf= new ArrayBuffer[Row]() for ( i <- 1 to num_records){ val name = "name" + i val age = i%100 val salary = 50000 + rand.nextInt(50000) val id = i val r = Row(id, name, age,salary) inputBuf.append(r) } val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq) spark2.createDataFrame(inputRDD,schema) } inputDF.show(10) //Write the Sample Data to Aerospike inputDF.write.mode(SaveMode.Overwrite) .format("aerospike") //aerospike specific format .option("aerospike.writeset", "scala_input_data") //write to this set .option("aerospike.updateByKey", "id") //indicates which columns should be used for construction of primary key .save() /* Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d like to be the Primary key, while loading data from the DB. */ val insertDFWithSchema=spark2 .sqlContext .read .format("aerospike") .schema(schema) .option("aerospike.updateByKey", "id") //required for sql inserts .option("aerospike.set", "scala_input_data") .load() val sqlView="inserttable" insertDFWithSchema.createOrReplaceTempView(sqlView) // //V2 datasource doesn't allow insert into a view. // // spark.sql(s"insert into $sqlView values (20000, 'insert_record1', 200, 23000), (20001, 'insert_record2', 201, 23001)") // spark // .sqlContext // .read // .format("aerospike") // .schema(schema) // .option("aerospike.seedhost",AS_HOST) // .option("aerospike.featurekey", "/etc/aerospike/features.conf") // .option ("aerospike.namespace", "test") // .option("aerospike.set", "input_data").load.where("id >2000").show() spark2.sql(s"select * from $sqlView").show() // Create a Spark DataFrame by using the Connector Schema inference mechanism val loadedDFWithoutSchema=spark2 .sqlContext .read .format("aerospike") .option("aerospike.set", "scala_input_data") //read the data from this set .load loadedDFWithoutSchema.printSchema() //Notice that schema of loaded data has some additional fields. // When connector infers schema, it also adds internal metadata. //Data can be loaded with known schema as well. val loadedDFWithSchema=spark2 .sqlContext .read .format("aerospike") .schema(schema) .option("aerospike.set", "scala_input_data").load loadedDFWithSchema.show(5) val complex_data_json="resources/nested_data.json" val alias= StructType(List( StructField("first_name",StringType, false), StructField("last_name",StringType, false))) val name= StructType(List( StructField("first_name",StringType, false), StructField("aliases",ArrayType(alias), false ) )) val street_adress= StructType(List( StructField("street_name", StringType, false), StructField("apt_number" , IntegerType, false))) val address = StructType( List( StructField ("zip" , LongType, false), StructField("street", street_adress, false), StructField("city", StringType, false))) val workHistory = StructType(List( StructField ("company_name" , StringType, false), StructField( "company_address" , address, false), StructField("worked_from", StringType, false))) val person= StructType ( List( StructField("name" , name, false, Metadata.empty), StructField("SSN", StringType, false,Metadata.empty), StructField("home_address", ArrayType(address), false), StructField("work_history", ArrayType(workHistory), false))) val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json) cmplx_data_with_schema.printSchema() cmplx_data_with_schema.write.mode(SaveMode.Overwrite) .format("aerospike") //aerospike specific format .option("aerospike.seedhost", AS_HOST) //db hostname, can be added multiple hosts, delimited with ":" .option("aerospike.namespace", "test") //use this namespace .option("aerospike.writeset", "scala_complex_input_data") //write to this set .option("aerospike.updateByKey", "name.first_name") //indicates which columns should be used for construction of primary key .save() val loadedComplexDFWithSchema=spark2 .sqlContext .read .format("aerospike") .option("aerospike.set", "scala_complex_input_data") //read the data from this set .schema(person) .load loadedComplexDFWithSchema.printSchema() //Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types. val batchGet1= spark2.sqlContext .read .format("aerospike") .option("aerospike.set", "scala_input_data") .option("aerospike.keyType", "int") //used to hint primary key(PK) type when schema is not provided. .option("aerospike.log.level", "debug") .load.where("__key = 829") batchGet1.show() //Please be aware Aerospike database supports only equality test with PKs in primary key query. //So, a where clause with "__key >10", would result in scan query! //In this query we are doing *OR* between PK subqueries val somePrimaryKeys= 1.to(10).toSeq val someMoreKeys= 12.to(14).toSeq val batchGet2= spark2.sqlContext .read .format("aerospike") .option("aerospike.set", "scala_input_data") .option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema. .load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("__key") isin (someMoreKeys:_*) )) batchGet2.show(5) //We should got in total 13 records. val somePrimaryKeys= 1.to(10).toSeq val scanQuery1= spark2.sqlContext .read .format("aerospike") .option("aerospike.set", "scala_input_data") .option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema. .load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("age") >50 )) scanQuery1.show() //Since there is OR between PKs and Bin. It will be treated as Scan query. //Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored. //Find all people who have atleast 5 jobs in past. loadedComplexDFWithSchema .withColumn("past_jobs", col("work_history.company_name")) .withColumn("num_jobs", size(col("past_jobs"))) .where(col("num_jobs") >4).show()