Below, a seed address for your Aerospike database cluster is required
Check the given namespace is available, and your feature key is located as per AS_FEATURE_KEY_PATH
Finally, review https://www.aerospike.com/enterprise/download/connectors/ to ensure AEROSPIKE_SPARK_JAR_VERSION is correct
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="127.0.0.1"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AS_FEATURE_KEY_PATH = "/etc/aerospike/features.conf"
AEROSPIKE_SPARK_JAR_VERSION="2.5.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set
# if you followed the repository README
import findspark
findspark.init()
# Here we download the Aerospike Spark jar
import urllib
import os
def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION):
DOWNLOAD_PREFIX="https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/"
DOWNLOAD_SUFFIX="/artifact/jar"
AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX
return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL
def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION):
JAR_NAME="aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
if(not(os.path.exists(JAR_NAME))) :
urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME)
else :
print(JAR_NAME+" already downloaded")
return os.path.join(os.getcwd(),JAR_NAME)
AEROSPIKE_JAR_PATH=download_aerospike_spark_jar()
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
aerospike-spark-assembly-2.5.0.jar already downloaded
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
Get a spark session object and set required Aerospike configuration properties
Set up spark and point aerospike db to AS_HOST
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
spark.conf.set("aerospike.namespace",AS_NAMESPACE)
spark.conf.set("aerospike.seedhost",AS_CONNECTION_STRING)
spark.conf.set("aerospike.keyPath",AS_FEATURE_KEY_PATH )
# We create age vs salary data, using three different Gaussian distributions
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math
# Create covariance matrix from std devs + correlation
def covariance_matrix(std_dev_1,std_dev_2,correlation):
return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2],
[correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]]
# Return a bivariate sample given means/std dev/correlation
def age_salary_sample(distribution_params,sample_size):
mean = [distribution_params["age_mean"], distribution_params["salary_mean"]]
cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"],
distribution_params["age_salary_correlation"])
return np.random.multivariate_normal(mean, cov, sample_size).T
# Define the characteristics of our age/salary distribution
age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000,
"age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3}
age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000,
"age_std_dev":4,"salary_std_dev":10000,"age_salary_correlation":0.7}
age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000,
"age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1}
distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3]
# Sample age/salary data for each distributions
group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=100)
group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=120)
group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=80)
ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages])
salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries])
print("Data created")
Data created
# Turn the above records into a Data Frame
# First of all, create an array of arrays
inputBuf = []
for i in range(0, len(ages)) :
id = i + 1 # Avoid counting from zero
name = "Individual: {:03d}".format(id)
# Note we need to make sure values are typed correctly
# salary will have type numpy.float64 - if it is not cast as below, an error will be thrown
age = float(ages[i])
salary = int(salaries[i])
inputBuf.append((id, name,age,salary))
# Convert to an RDD
inputRDD = spark.sparkContext.parallelize(inputBuf)
# Convert to a data frame using a schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", DoubleType(), True),
StructField("salary",IntegerType(), True)
])
inputDF=spark.createDataFrame(inputRDD,schema)
#Write the data frame to Aerospike, the id field is used as the primary key
inputDF \
.write \
.mode('overwrite') \
.format("com.aerospike.spark.sql") \
.option("aerospike.set", "salary_data")\
.option("aerospike.updateByKey", "id") \
.save()
# If we explicitly set the schema, using the previously created schema object
# we effectively type the rows in the Data Frame
loadedDFWithSchema=spark \
.read \
.format("com.aerospike.spark.sql") \
.schema(schema) \
.option("aerospike.set", "salary_data").load()
loadedDFWithSchema.show(5)
loadedDFWithSchema.count()
+---+---------------+------------------+------+ | id| name| age|salary| +---+---------------+------------------+------+ |239|Individual: 239|35.045714151242784| 64851| |101|Individual: 101| 48.94863100225242| 92233| |194|Individual: 194| 43.87904465057981| 76336| | 31|Individual: 031|25.419955216543517| 51542| |139|Individual: 139|39.658710069583876| 80585| +---+---------------+------------------+------+ only showing top 5 rows
300
loadedDFWithSchema.registerTempTable("myview")
spark.sql("select * from myview").take(20)
[Row(id=239, name='Individual: 239', age=36.31763988049552, salary=73121), Row(id=101, name='Individual: 101', age=42.131372446959624, salary=88392), Row(id=194, name='Individual: 194', age=45.67209291776493, salary=68430), Row(id=31, name='Individual: 031', age=25.369666877630568, salary=48846), Row(id=139, name='Individual: 139', age=43.51114009073862, salary=82116), Row(id=14, name='Individual: 014', age=26.58855120481238, salary=61593), Row(id=142, name='Individual: 142', age=43.170929881406686, salary=86203), Row(id=272, name='Individual: 272', age=38.43340146883269, salary=72691), Row(id=76, name='Individual: 076', age=24.93997559158264, salary=64180), Row(id=147, name='Individual: 147', age=52.175425376631246, salary=88246), Row(id=79, name='Individual: 079', age=24.65820831985479, salary=54088), Row(id=96, name='Individual: 096', age=25.518457474526, salary=49251), Row(id=132, name='Individual: 132', age=41.798677512668064, salary=84438), Row(id=10, name='Individual: 010', age=25.509944072858175, salary=45908), Row(id=141, name='Individual: 141', age=49.80648644002289, salary=87623), Row(id=140, name='Individual: 140', age=41.11269768838019, salary=78535), Row(id=160, name='Individual: 160', age=36.35698689416882, salary=61116), Row(id=112, name='Individual: 112', age=47.632639902424046, salary=78404), Row(id=120, name='Individual: 120', age=49.876620096920284, salary=94501), Row(id=34, name='Individual: 034', age=26.77243285030579, salary=46245)]