%%init_spark launcher.jars = ["/opt/spark-nb/aerospike-jar-link"] launcher.master = "local[1]" //Specify the Seed Host of the Aerospike Server val AS_HOST = "127.0.0.1:3000" import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession //Create test data val conf = sc.getConf.clone(); conf.set("aerospike.seedhost" , AS_HOST) conf.set("aerospike.namespace", "test") spark.close() val spark2= SparkSession.builder().config(conf).master("local[2]").getOrCreate() val num_records=1000 val rand = scala.util.Random val schema: StructType = new StructType( Array( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = false), StructField("age", IntegerType, nullable = false), StructField("salary",IntegerType, nullable = false) )) val inputDF = { val inputBuf= new ArrayBuffer[Row]() for ( i <- 1 to num_records){ val name = "name" + i val age = i%100 val salary = 50000 + rand.nextInt(50000) val id = i val r = Row(id, name, age,salary) inputBuf.append(r) } val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq) spark2.createDataFrame(inputRDD,schema) } inputDF.show(10) //Write the Sample Data to Aerospike inputDF.write.mode(SaveMode.Overwrite) .format("aerospike") //aerospike specific format .option("aerospike.writeset", "scala_input_data") //write to this set .option("aerospike.updateByKey", "id") //indicates which columns should be used for construction of primary key .option("aerospike.sendKey", "true") .save() /* Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d like to be the Primary key, while loading data from the DB. */ val insertDFWithSchema=spark2 .sqlContext .read .format("aerospike") .schema(schema) .option("aerospike.set", "scala_input_data") .load() val sqlView="inserttable" insertDFWithSchema.createOrReplaceTempView(sqlView) // //V2 datasource doesn't allow insert into a view. // spark2.sql(s"select * from $sqlView").show() // Create a Spark DataFrame by using the Connector Schema inference mechanism val loadedDFWithoutSchema=spark2 .sqlContext .read .format("aerospike") .option("aerospike.set", "scala_input_data") //read the data from this set .load loadedDFWithoutSchema.printSchema() //Notice that schema of loaded data has some additional fields. // When connector infers schema, it also adds internal metadata. spark2.sparkContext.getConf.getAll.foreach(println _) //Data can be loaded with known schema as well. val loadedDFWithSchema=spark2 .sqlContext .read .format("aerospike") .schema(schema) .option("aerospike.seedhost",AS_HOST) .option("aerospike.set", "scala_input_data").load loadedDFWithSchema.show(5) val complex_data_json="resources/nested_data.json" val alias= StructType(List( StructField("first_name",StringType, false), StructField("last_name",StringType, false))) val name= StructType(List( StructField("first_name",StringType, false), StructField("aliases",ArrayType(alias), false ) )) val street_adress= StructType(List( StructField("street_name", StringType, false), StructField("apt_number" , IntegerType, false))) val address = StructType( List( StructField ("zip" , LongType, false), StructField("street", street_adress, false), StructField("city", StringType, false))) val workHistory = StructType(List( StructField ("company_name" , StringType, false), StructField( "company_address" , address, false), StructField("worked_from", StringType, false))) val person= StructType ( List( StructField("name" , name, false, Metadata.empty), StructField("SSN", StringType, false,Metadata.empty), StructField("home_address", ArrayType(address), false), StructField("work_history", ArrayType(workHistory), false))) val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json) cmplx_data_with_schema.printSchema() cmplx_data_with_schema.write.mode(SaveMode.Overwrite) .format("aerospike") //aerospike specific format .option("aerospike.seedhost", AS_HOST) //db hostname, can be added multiple hosts, delimited with ":" .option("aerospike.namespace", "test") //use this namespace .option("aerospike.writeset", "scala_complex_input_data") //write to this set .option("aerospike.updateByKey", "SSN") //indicates which columns should be used for construction of primary key .save() val loadedComplexDFWithSchema=spark2 .sqlContext .read .format("aerospike") .option("aerospike.seedhost",AS_HOST) .option("aerospike.set", "scala_complex_input_data") //read the data from this set .schema(person) .load loadedComplexDFWithSchema.show(2) loadedComplexDFWithSchema.printSchema() loadedComplexDFWithSchema.cache() //Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types. val batchGet1= spark2.sqlContext .read .format("aerospike") .option("aerospike.seedhost",AS_HOST) .option("aerospike.set", "scala_input_data") .option("aerospike.keyType", "int") //used to hint primary key(PK) type when schema is not provided. .load.where("__key = 829") batchGet1.show() //Please be aware Aerospike database supports only equality test with PKs in primary key query. //So, a where clause with "__key >10", would result in scan query! //In this query we are doing *OR* between PK subqueries val somePrimaryKeys= 1.to(10).toSeq val someMoreKeys= 12.to(14).toSeq val batchGet2= spark2.sqlContext .read .format("aerospike") .option("aerospike.seedhost",AS_HOST) .option("aerospike.set", "scala_input_data") .option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema. .load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("__key") isin (someMoreKeys:_*) )) batchGet2.show(15) //We should got in total 13 records. val somePrimaryKeys= 1.to(10).toSeq val scanQuery1= spark2.sqlContext .read .format("aerospike") .option("aerospike.set", "scala_input_data") .option("aerospike.seedhost",AS_HOST) .option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema. .load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("age") >50 )) scanQuery1.show() //Since there is OR between PKs and Bin. It will be treated as Scan query. //Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored. //number_of_spark_partitions (num_sp)=2^{aerospike.partition.factor} //total number of records = Math.ceil((float)aerospike.sample.size/num_sp) * (num_sp) //use lower partition factor for more accurate sampling val setname="scala_input_data" val sample_size=101 val df3=spark2.read.format("aerospike") .option("aerospike.partition.factor","2") .option("aerospike.seedhost",AS_HOST) .option("aerospike.set",setname) .option("aerospike.sample.size","101") //allows to sample approximately spacific number of record. .load() val df4=spark2.read.format("aerospike") .option("aerospike.seedhost",AS_HOST) .option("aerospike.partition.factor","6") .option("aerospike.set",setname) .option("aerospike.sample.size","101") //allows to sample approximately spacific number of record. .load() //Notice that more records were read than requested due to the underlying partitioning logic related to the partition factor as described earlier, hence we use Spark limit() function additionally to return the desired number of records. val count3=df3.count() val count4=df4.count() //Note how limit got only 101 record from df4 which have 128 records. val dfWithLimit=df4.limit(101) val limitCount=dfWithLimit.count() val pushdownset="scala_input_data" // we are using this set created above import com.aerospike.spark.utility.AerospikePushdownExpressions //We can construct dynamix expression only when library is unshaded. // id % 5 == 0 // Equvalent Exp: Exp.eq(Exp.mod(Exp.intBin("a"), Exp.`val`(5)), Exp.`val`(0)) // These can be only done with unshaded connector // val expIntBin=AerospikePushdownExpressions.intBin("id") // id is the name of column // val expMODIntBinEqualToZero=AerospikePushdownExpressions.eq( // AerospikePushdownExpressions.mod(expIntBin, AerospikePushdownExpressions.`val`(5)), // AerospikePushdownExpressions.`val`(0)) // val expMODIntBinToBase64= AerospikePushdownExpressions.build(expMODIntBinEqualToZero).getBase64 // convert to base64 Expression object val expMODIntBinToBase64= "kwGTGpNRAqJpZAUA" val pushDownDF =spark2.sqlContext .read .format("aerospike") .schema(schema) .option("aerospike.seedhost",AS_HOST) .option("aerospike.set", pushdownset) .option("aerospike.pushdown.expressions", expMODIntBinToBase64) .load() pushDownDF.count() //note this should return 200, becuase there are 200 records whose id bin is divisible be 5 val outputSchema= StructType( List(StructField("name", name, false), StructField("SSN", StringType, false), StructField("home_address", ArrayType(address), false)) ) import spark2.implicits._ //Create a set of PKs whose records you'd like to look up in the Aerospike database val ssns = Seq("825-55-3247", "289-18-1554", "756-46-4088", "525-31-0299", "456-45-2200", "200-71-7765") val ssnDF = ssns.toDF("SSN") import com.aerospike.spark._ // to import aerojoin functionality //scala_complex_input_data is the set in Aerospike database that you are using to look up the keys stored in ssnDF val outputDF=aerolookup(ssnDF,"SSN", "scala_complex_input_data",outputSchema, "test") outputDF.show(100) import com.aerospike.spark._ PythonUtil.sindexList("test") val siBins= Seq("int","str","arr") val siSet= "scala_siset" val siSchema: StructType = new StructType( Array( StructField(siBins(0), IntegerType, nullable = false), StructField(siBins(1), StringType, nullable = false), StructField(siBins(2),ArrayType(IntegerType), nullable = false) )) val siDF = { val siRecords=50 val inputBuf= new ArrayBuffer[Row]() for ( i <- 1 to siRecords){ val str = "name" + i val arr = i until i+3 val int = i val r = Row( int, str, arr) inputBuf.append(r) } val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq) spark2.createDataFrame(inputRDD,siSchema) } //Write the secondary index Data to Aerospike siDF.write.mode("overwrite").format("aerospike").option("aerospike.writeset", siSet) .option("aerospike.updateByKey", siBins(0)).save() import scala.util.Try val num_idx= "scala_id_idx" val str_idx= "scala_name_idx" val arr_idx= "scala_arr_idx" val StringIndexType = com.aerospike.client.query.IndexType.STRING val NumericIndexType = com.aerospike.client.query.IndexType.NUMERIC val indices= Seq(num_idx,str_idx,arr_idx) val indexTypes= Seq(NumericIndexType, StringIndexType, NumericIndexType) val client = AerospikeConnection.getClient(spark2.conf) //drop any exsting index Try { indices.foreach(client.dropIndex(null, "test", siSet, _))} //create indices client.createIndex(null, "test", siSet, indices(0), siBins(0), NumericIndexType) client.createIndex(null, "test", siSet, indices(1), siBins(1), StringIndexType) client.createIndex(null, "test", siSet, indices(2), siBins(2), NumericIndexType, com.aerospike.client.query.IndexCollectionType.LIST) //list indices defined over this set PythonUtil.sindexList("test") //automatically an appropriate secondary index is selected val siIdDF = spark2.read.format("aerospike").schema(siSchema).option("aerospike.set", siSet) .option("aerospike.partition.factor",1).option("aerospike.log.level","info").load() siIdDF.where(col(siBins(0)) >= 40).show() //should get 10 records //search for `using secondary index: scala_id_idx` in INFO logs //user specified index "aerospike.sindex" spark2.read.format("aerospike").schema(siSchema) .option("aerospike.set", siSet) .option("aerospike.log.leve", "info") .option("aerospike.sindex",indices(1)) //index name specified .load().where( col(siBins(1)) === "name1").show() //should get 1 records //user specified filter in JSON format val arrayQuery ="{ \"name\": \"arr\", \"type\": \"NUMERIC\", \"colType\": 1, \"value\": 10 }" /// "name" is bin name, colType =1 indicates sindex over array datatype. val siArrayDF = spark2.read.format("aerospike").schema(siSchema) .option("aerospike.set", siSet) .option("aerospike.sindex.filter",arrayQuery) .option("aerospike.sindex", indices(2)).load() siArrayDF.show() //should print 3 records,