Set launcher.jars
with path to the downloaded binary
%%init_spark
launcher.jars = ["aerospike-spark-assembly-3.2.0.jar"]
launcher.master = "local[*]"
//Specify the Seed Host of the Aerospike Server
val AS_HOST = "127.0.0.1:3000"
Intitializing Scala interpreter ...
Spark Web UI available at http://192.168.1.2:4040 SparkContext available as 'sc' (version = 3.0.2, master = local[*], app id = local-1634672659617) SparkSession available as 'spark'
AS_HOST: String = 127.0.0.1:3000
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import com.aerospike.spark.sql.AerospikeConnection
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SaveMode import com.aerospike.spark.sql.AerospikeConnection import org.apache.spark.sql.SparkSession
Aerospike is schemaless, however Spark adher to schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types.
To infer the schema, the connector samples a set of records (configurable through aerospike.schema.scan
) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records.
Note that __key
was not part of provided schema. So how can one query using __key
? We can just add __key
in provided schema with appropriate type. Similarly we can add __gen
or __ttl
etc.
val schemaWithPK: StructType = new StructType(Array(
StructField("__key",IntegerType, nullable = false),
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("salary",IntegerType, nullable = false)))
We recommend that you provide schema for queries that involve collection data types such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.
//Create test data
val conf = sc.getConf.clone();
conf.set("aerospike.seedhost" , AS_HOST)
conf.set("aerospike.namespace", "test")
spark.close()
val spark2= SparkSession.builder().config(conf).master("local[2]").getOrCreate()
val num_records=1000
val rand = scala.util.Random
val schema: StructType = new StructType(
Array(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("salary",IntegerType, nullable = false)
))
val inputDF = {
val inputBuf= new ArrayBuffer[Row]()
for ( i <- 1 to num_records){
val name = "name" + i
val age = i%100
val salary = 50000 + rand.nextInt(50000)
val id = i
val r = Row(id, name, age,salary)
inputBuf.append(r)
}
val inputRDD = spark2.sparkContext.parallelize(inputBuf.toSeq)
spark2.createDataFrame(inputRDD,schema)
}
inputDF.show(10)
//Write the Sample Data to Aerospike
inputDF.write.mode(SaveMode.Overwrite)
.format("aerospike") //aerospike specific format
.option("aerospike.writeset", "scala_input_data") //write to this set
.option("aerospike.updateByKey", "id") //indicates which columns should be used for construction of primary key
.option("aerospike.sendKey", "true")
.save()
+---+------+---+------+ | id| name|age|salary| +---+------+---+------+ | 1| name1| 1| 85780| | 2| name2| 2| 71636| | 3| name3| 3| 73747| | 4| name4| 4| 82932| | 5| name5| 5| 81313| | 6| name6| 6| 64316| | 7| name7| 7| 77750| | 8| name8| 8| 81108| | 9| name9| 9| 85952| | 10|name10| 10| 64510| +---+------+---+------+ only showing top 10 rows
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7d8d81c8 spark2: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@66e370cb num_records: Int = 1000 rand: util.Random.type = scala.util.Random$@3f8e0951 schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(name,StringType,false), StructField(age,IntegerType,false), StructField(salary,IntegerType,false)) inputDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
/*
Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column
using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d
like to be the Primary key, while loading data from the DB.
*/
val insertDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.schema(schema)
.option("aerospike.set", "scala_input_data")
.load()
val sqlView="inserttable"
insertDFWithSchema.createOrReplaceTempView(sqlView)
//
//V2 datasource doesn't allow insert into a view.
//
spark2.sql(s"select * from $sqlView").show()
+---+-------+---+------+ | id| name|age|salary| +---+-------+---+------+ |132|name132| 32| 94993| |647|name647| 47| 65819| | 45| name45| 45| 69453| |558|name558| 58| 90792| |608|name608| 8| 70999| |687|name687| 87| 88312| |335|name335| 35| 62312| |372|name372| 72| 88944| | 94| name94| 94| 71473| |890|name890| 90| 73927| |334|name334| 34| 59027| |911|name911| 11| 64513| |352|name352| 52| 90479| |907|name907| 7| 54111| |148|name148| 48| 58722| |315|name315| 15| 51807| |163|name163| 63| 89747| |882|name882| 82| 66781| |426|name426| 26| 83889| |602|name602| 2| 52264| +---+-------+---+------+ only showing top 20 rows
insertDFWithSchema: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields] sqlView: String = inserttable
// Create a Spark DataFrame by using the Connector Schema inference mechanism
val loadedDFWithoutSchema=spark2
.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data") //read the data from this set
.load
loadedDFWithoutSchema.printSchema()
//Notice that schema of loaded data has some additional fields.
// When connector infers schema, it also adds internal metadata.
root |-- __key: string (nullable = true) |-- __digest: binary (nullable = true) |-- __expiry: integer (nullable = false) |-- __generation: integer (nullable = false) |-- __ttl: integer (nullable = false) |-- age: long (nullable = true) |-- name: string (nullable = true) |-- salary: long (nullable = true) |-- id: long (nullable = true)
loadedDFWithoutSchema: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields]
//Data can be loaded with known schema as well.
val loadedDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.schema(schema)
.option("aerospike.set", "scala_input_data").load
loadedDFWithSchema.show(5)
+---+-------+---+------+ | id| name|age|salary| +---+-------+---+------+ |132|name132| 32| 94993| |647|name647| 47| 65819| |608|name608| 8| 70999| |687|name687| 87| 88312| |372|name372| 72| 88944| +---+-------+---+------+ only showing top 5 rows
loadedDFWithSchema: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
val complex_data_json="resources/nested_data.json"
val alias= StructType(List(
StructField("first_name",StringType, false),
StructField("last_name",StringType, false)))
val name= StructType(List(
StructField("first_name",StringType, false),
StructField("aliases",ArrayType(alias), false )
))
val street_adress= StructType(List(
StructField("street_name", StringType, false),
StructField("apt_number" , IntegerType, false)))
val address = StructType( List(
StructField ("zip" , LongType, false),
StructField("street", street_adress, false),
StructField("city", StringType, false)))
val workHistory = StructType(List(
StructField ("company_name" , StringType, false),
StructField( "company_address" , address, false),
StructField("worked_from", StringType, false)))
val person= StructType ( List(
StructField("name" , name, false, Metadata.empty),
StructField("SSN", StringType, false,Metadata.empty),
StructField("home_address", ArrayType(address), false),
StructField("work_history", ArrayType(workHistory), false)))
val cmplx_data_with_schema=spark2.read.schema(person).json(complex_data_json)
cmplx_data_with_schema.printSchema()
cmplx_data_with_schema.write.mode(SaveMode.Overwrite)
.format("aerospike") //aerospike specific format
.option("aerospike.seedhost", AS_HOST) //db hostname, can be added multiple hosts, delimited with ":"
.option("aerospike.namespace", "test") //use this namespace
.option("aerospike.writeset", "scala_complex_input_data") //write to this set
.option("aerospike.updateByKey", "SSN") //indicates which columns should be used for construction of primary key
.save()
root |-- name: struct (nullable = true) | |-- first_name: string (nullable = true) | |-- aliases: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- first_name: string (nullable = true) | | | |-- last_name: string (nullable = true) |-- SSN: string (nullable = true) |-- home_address: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- zip: long (nullable = true) | | |-- street: struct (nullable = true) | | | |-- street_name: string (nullable = true) | | | |-- apt_number: integer (nullable = true) | | |-- city: string (nullable = true) |-- work_history: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- company_name: string (nullable = true) | | |-- company_address: struct (nullable = true) | | | |-- zip: long (nullable = true) | | | |-- street: struct (nullable = true) | | | | |-- street_name: string (nullable = true) | | | | |-- apt_number: integer (nullable = true) | | | |-- city: string (nullable = true) | | |-- worked_from: string (nullable = true)
complex_data_json: String = resources/nested_data.json alias: org.apache.spark.sql.types.StructType = StructType(StructField(first_name,StringType,false), StructField(last_name,StringType,false)) name: org.apache.spark.sql.types.StructType = StructType(StructField(first_name,StringType,false), StructField(aliases,ArrayType(StructType(StructField(first_name,StringType,false), StructField(last_name,StringType,false)),true),false)) street_adress: org.apache.spark.sql.types.StructType = StructType(StructField(street_name,StringType,false), StructField(apt_number,IntegerType,false)) address: org.apache.spark.sql.types.StructType = StructType(StructField(zip,LongType,false), StructField(street,StructType(StructField(street_name,StringType,false), StructField(apt_number,IntegerType,false)),fal...
val loadedComplexDFWithSchema=spark2
.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_complex_input_data") //read the data from this set
.schema(person)
.load
loadedComplexDFWithSchema.show(2)
loadedComplexDFWithSchema.printSchema()
loadedComplexDFWithSchema.cache()
//Please note the difference in types of loaded data in both cases. With schema, we extactly infer complex types.
+--------------------+-----------+--------------------+--------------------+ | name| SSN| home_address| work_history| +--------------------+-----------+--------------------+--------------------+ |[Adrian, [[Angel,...|116-62-5644|[[93231, [Anderso...|[[Giles-Thomas, [...| |[Raymond, [[Lisa,...|731-02-0039|[[95337, [Michael...|[[Taylor-Swanson,...| +--------------------+-----------+--------------------+--------------------+ only showing top 2 rows root |-- name: struct (nullable = false) | |-- first_name: string (nullable = false) | |-- aliases: array (nullable = false) | | |-- element: struct (containsNull = true) | | | |-- first_name: string (nullable = false) | | | |-- last_name: string (nullable = false) |-- SSN: string (nullable = false) |-- home_address: array (nullable = false) | |-- element: struct (containsNull = true) | | |-- zip: long (nullable = false) | | |-- street: struct (nullable = false) | | | |-- street_name: string (nullable = false) | | | |-- apt_number: integer (nullable = false) | | |-- city: string (nullable = false) |-- work_history: array (nullable = false) | |-- element: struct (containsNull = true) | | |-- company_name: string (nullable = false) | | |-- company_address: struct (nullable = false) | | | |-- zip: long (nullable = false) | | | |-- street: struct (nullable = false) | | | | |-- street_name: string (nullable = false) | | | | |-- apt_number: integer (nullable = false) | | | |-- city: string (nullable = false) | | |-- worked_from: string (nullable = false)
loadedComplexDFWithSchema: org.apache.spark.sql.DataFrame = [name: struct<first_name: string, aliases: array<struct<first_name:string,last_name:string>>>, SSN: string ... 2 more fields] res5: loadedComplexDFWithSchema.type = [name: struct<first_name: string, aliases: array<struct<first_name:string,last_name:string>>>, SSN: string ... 2 more fields]
__key
or __digest
with, with no OR
between two bins.In case of batchget queries we can also apply filters upon metadata columns like __gen
or __ttl
etc. To do so, these columns should be exposed through schema (if schema provided).
val batchGet1= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when schema is not provided.
.load.where("__key = 829")
batchGet1.show()
//Please be aware Aerospike database supports only equality test with PKs in primary key query.
//So, a where clause with "__key >10", would result in scan query!
+-----+--------------------+---------+------------+------+---+-------+------+---+ |__key| __digest| __expiry|__generation| __ttl|age| name|salary| id| +-----+--------------------+---------+------------+------+---+-------+------+---+ | 829|[C0 B6 C4 DE 68 D...|373232668| 7|863996| 29|name829| 52040|829| +-----+--------------------+---------+------------+------+---+-------+------+---+
batchGet1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]
//In this query we are doing *OR* between PK subqueries
val somePrimaryKeys= 1.to(10).toSeq
val someMoreKeys= 12.to(14).toSeq
val batchGet2= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.
.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("__key") isin (someMoreKeys:_*) ))
batchGet2.show(15)
//We should got in total 13 records.
+-----+--------------------+---------+------------+------+---+------+------+---+ |__key| __digest| __expiry|__generation| __ttl|age| name|salary| id| +-----+--------------------+---------+------------+------+---+------+------+---+ | 1|[89 31 AB FE 54 D...|373232667| 7|863995| 1| name1| 85780| 1| | 4|[93 F1 65 F0 E8 9...|373232667| 7|863995| 4| name4| 82932| 4| | 3|[D4 A1 0B A5 12 0...|373232667| 7|863995| 3| name3| 73747| 3| | 7|[30 94 D4 E7 9E 8...|373232667| 7|863995| 7| name7| 77750| 7| | 5|[3E F5 94 A9 3A A...|373232667| 7|863995| 5| name5| 81313| 5| | 14|[06 66 ED 38 08 F...|373232667| 7|863995| 14|name14| 62020| 14| | 13|[EA 78 AB 39 FC C...|373232667| 7|863994| 13|name13| 68518| 13| | 2|[41 DB A8 23 03 4...|373232667| 7|863994| 2| name2| 71636| 2| | 8|[60 AB E7 17 C8 5...|373232667| 7|863994| 8| name8| 81108| 8| | 9|[1B 6D CD D8 D2 5...|373232667| 7|863994| 9| name9| 85952| 9| | 6|[C2 4D 37 CC 2B 2...|373232667| 7|863994| 6| name6| 64316| 6| | 12|[F8 4E EC 27 8F 1...|373232667| 7|863994| 12|name12| 50610| 12| | 10|[8D 0F 84 CD B0 7...|373232667| 7|863994| 10|name10| 64510| 10| +-----+--------------------+---------+------------+------+---+------+------+---+
somePrimaryKeys: scala.collection.immutable.Range = Range 1 to 10 someMoreKeys: scala.collection.immutable.Range = Range 12 to 14 batchGet2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]
val somePrimaryKeys= 1.to(10).toSeq
val scanQuery1= spark2.sqlContext
.read
.format("aerospike")
.option("aerospike.set", "scala_input_data")
.option("aerospike.keyType", "int") //used to hint primary key(PK) type when inferred without schema.
.load.where((col("__key") isin (somePrimaryKeys:_*)) || ( col("age") >50 ))
scanQuery1.show()
//Since there is OR between PKs and Bin. It will be treated as Scan query.
//Primary keys are not stored in bins(by default), hence only filters corresponding to bins are honored.
+-----+--------------------+---------+------------+------+---+-------+------+---+ |__key| __digest| __expiry|__generation| __ttl|age| name|salary| id| +-----+--------------------+---------+------------+------+---+-------+------+---+ | 558|[14 80 A2 9D D2 E...|373232667| 7|863994| 58|name558| 90792|558| | 687|[1A 30 21 88 39 A...|373232667| 7|863994| 87|name687| 88312|687| | 372|[1B 40 51 DD 64 F...|373232668| 7|863995| 72|name372| 88944|372| | 352|[23 A0 99 06 1F 7...|373232668| 7|863995| 52|name352| 90479|352| | 94|[26 E0 C4 85 CE 9...|373232667| 7|863994| 94| name94| 71473| 94| | 890|[26 30 F7 1A D3 A...|373232668| 7|863995| 90|name890| 73927|890| | 163|[3E D0 72 42 15 9...|373232667| 7|863994| 63|name163| 89747|163| | 882|[3E C0 28 CE F2 5...|373232668| 7|863995| 82|name882| 66781|882| | 673|[45 10 C1 D6 80 3...|373232667| 7|863994| 73|name673| 62097|673| | 991|[47 A0 D4 EC 12 1...|373232668| 7|863995| 91|name991| 72096|991| | 293|[48 40 20 B0 E6 D...|373232668| 7|863995| 93|name293| 56381|293| | 679|[57 80 24 4F 1D 3...|373232667| 7|863994| 79|name679| 78991|679| | 153|[5D E0 05 75 BF 3...|373232667| 7|863994| 53|name153| 79984|153| | 485|[6B 80 7E E1 A4 5...|373232668| 7|863995| 85|name485| 93192|485| | 997|[72 10 81 9D E2 E...|373232668| 7|863995| 97|name997| 88342|997| | 482|[85 B0 B1 3F 49 A...|373232668| 7|863995| 82|name482| 69004|482| | 166|[8A 00 3E 64 19 D...|373232667| 7|863994| 66|name166| 74354|166| | 590|[8C 20 A4 28 BE 7...|373232667| 7|863994| 90|name590| 96189|590| | 689|[9B 00 70 22 F0 8...|373232667| 7|863994| 89|name689| 81242|689| | 895|[9D A0 9D 91 AE 8...|373232668| 7|863995| 95|name895| 59968|895| +-----+--------------------+---------+------------+------+---+-------+------+---+ only showing top 20 rows
somePrimaryKeys: scala.collection.immutable.Range = Range 1 to 10 scanQuery1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: int, __digest: binary ... 7 more fields]
Sample specified number of records from Aerospike to considerably reduce data movement between Aerospike and the Spark clusters. Depending on the aerospike.partition.factor setting, you may get more records than desired. Please use this property in conjunction with Spark limit()
function to get the specified number of records. The sample read is not randomized, so sample more than you need and use the Spark sample()
function to randomize if you see fit. You can use it in conjunction with aerospike.recordspersecond
to control the load on the Aerospike server while sampling.
For more information, please see documentation page.
//number_of_spark_partitions (num_sp)=2^{aerospike.partition.factor}
//total number of records = Math.ceil((float)aerospike.sample.size/num_sp) * (num_sp)
//use lower partition factor for more accurate sampling
val setname="scala_input_data"
val sample_size=101
val df3=spark2.read.format("aerospike")
.option("aerospike.partition.factor","2")
.option("aerospike.set",setname)
.option("aerospike.sample.size","101") //allows to sample approximately spacific number of record.
.load()
val df4=spark2.read.format("aerospike")
.option("aerospike.partition.factor","6")
.option("aerospike.set",setname)
.option("aerospike.sample.size","101") //allows to sample approximately spacific number of record.
.load()
//Notice that more records were read than requested due to the underlying partitioning logic related to the partition factor as described earlier, hence we use Spark limit() function additionally to return the desired number of records.
val count3=df3.count()
val count4=df4.count()
//Note how limit got only 101 record from df4 which have 128 records.
val dfWithLimit=df4.limit(101)
val limitCount=dfWithLimit.count()
setname: String = scala_input_data sample_size: Int = 101 df3: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields] df4: org.apache.spark.sql.DataFrame = [__key: string, __digest: binary ... 7 more fields] count3: Long = 104 count4: Long = 128 dfWithLimit: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [__key: string, __digest: binary ... 7 more fields] limitCount: Long = 101
val pushdownset="scala_input_data" // we are using this set created above
import com.aerospike.spark.utility.AerospikePushdownExpressions
//We can construct dynamix expression only when library is unshaded.
// id % 5 == 0
// Equvalent Exp: Exp.eq(Exp.mod(Exp.intBin("a"), Exp.`val`(5)), Exp.`val`(0))
// These can be only done with unshaded connector
// val expIntBin=AerospikePushdownExpressions.intBin("id") // id is the name of column
// val expMODIntBinEqualToZero=AerospikePushdownExpressions.eq(
// AerospikePushdownExpressions.mod(expIntBin, AerospikePushdownExpressions.`val`(5)),
// AerospikePushdownExpressions.`val`(0))
// val expMODIntBinToBase64= AerospikePushdownExpressions.build(expMODIntBinEqualToZero).getBase64
// convert to base64 Expression object
val expMODIntBinToBase64= "kwGTGpNRAqJpZAUA"
val pushDownDF =spark2.sqlContext
.read
.format("aerospike")
.schema(schema)
.option("aerospike.set", pushdownset)
.option("aerospike.pushdown.expressions", expMODIntBinToBase64)
.load()
pushDownDF.count() //note this should return 200, becuase there are 200 records whose id bin is divisible be 5
pushdownset: String = scala_input_data import com.aerospike.spark.utility.AerospikePushdownExpressions expMODIntBinToBase64: String = kwGTGpNRAqJpZAUA pushDownDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields] res9: Long = 200
aerolookup allows you to look up records corresponding to a set of keys stored in a Spark DF, streaming or otherwise. It supports:
aerospike.schema.flexible
to true in the SparkConf object.val outputSchema= StructType(
List(StructField("name", name, false),
StructField("SSN", StringType, false),
StructField("home_address", ArrayType(address), false))
)
import spark2.implicits._
//Create a set of PKs whose records you'd like to look up in the Aerospike database
val ssns = Seq("825-55-3247", "289-18-1554", "756-46-4088", "525-31-0299", "456-45-2200", "200-71-7765")
val ssnDF = ssns.toDF("SSN")
import com.aerospike.spark._ // to import aerojoin functionality
//scala_complex_input_data is the set in Aerospike database that you are using to look up the keys stored in ssnDF
val outputDF=aerolookup(ssnDF,"SSN", "scala_complex_input_data",outputSchema, "test")
outputDF.show(100)
+--------------------+-----------+--------------------+ | name| SSN| home_address| +--------------------+-----------+--------------------+ |[Gary, [[Cameron,...|825-55-3247|[[66428, [Kim Mil...| |[Megan, [[Robert,...|289-18-1554|[[81551, [Archer ...| |[Melanie, [[Justi...|756-46-4088|[[61327, [Jeanett...| |[Lisa, [[William,...|525-31-0299|[[98337, [Brittne...| |[Ryan, [[Jonathon...|456-45-2200|[[97077, [Davis D...| |[Lauren, [[Shaun,...|200-71-7765|[[6813, [Johnson ...| +--------------------+-----------+--------------------+
outputSchema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StructType(StructField(first_name,StringType,false), StructField(aliases,ArrayType(StructType(StructField(first_name,StringType,false), StructField(last_name,StringType,false)),true),false)),false), StructField(SSN,StringType,false), StructField(home_address,ArrayType(StructType(StructField(zip,LongType,false), StructField(street,StructType(StructField(street_name,StringType,false), StructField(apt_number,IntegerType,false)),false), StructField(city,StringType,false)),true),false)) import spark2.implicits._ ssns: Seq[String] = List(825-55-3247, 289-18-1554, 756-46-4088, 525-31-0299, 456-45-2200, 200-71-7765) ssnDF: org.apache.spark.sql.DataFrame = [SSN: string] import com.aerospike.spark._ outputDF: org.ap...