#!/usr/bin/env python # coding: utf-8 # # Aerospike Connect for Spark Tutorial for Python # ## Tested with Spark connector 3.0.1, Java 8, Apache Spark 3.0.0, Python 3.7 and Scala 2.12.11 and Spylon ( https://pypi.org/project/spylon-kernel/) # ### Setup # # Below, a seed address for your Aerospike database cluster is required # # Check the given namespace is available, and your feature key is located as per AS_FEATURE_KEY_PATH # # Finally, review https://www.aerospike.com/enterprise/download/connectors/ to ensure AEROSPIKE_SPARK_JAR_VERSION is correct # In[1]: # IP Address or DNS name for one host in your Aerospike cluster AS_HOST ="172.16.39.141" # Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure AS_NAMESPACE = "test" AS_FEATURE_KEY_PATH = "/etc/aerospike/features.conf" AEROSPIKE_SPARK_JAR_VERSION="3.0.1" AS_PORT = 3000 # Usually 3000, but change here if not AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT) # In[2]: # Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set # if you followed the repository README import findspark findspark.init() # In[3]: # Here we download the Aerospike Spark jar import urllib import os def aerospike_spark_jar_download_url(version=AEROSPIKE_SPARK_JAR_VERSION): DOWNLOAD_PREFIX="https://www.aerospike.com/enterprise/download/connectors/aerospike-spark/" DOWNLOAD_SUFFIX="/artifact/jar" AEROSPIKE_SPARK_JAR_DOWNLOAD_URL = DOWNLOAD_PREFIX+AEROSPIKE_SPARK_JAR_VERSION+DOWNLOAD_SUFFIX return AEROSPIKE_SPARK_JAR_DOWNLOAD_URL def download_aerospike_spark_jar(version=AEROSPIKE_SPARK_JAR_VERSION): JAR_NAME="aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar" if(not(os.path.exists(JAR_NAME))) : urllib.request.urlretrieve(aerospike_spark_jar_download_url(),JAR_NAME) else : print(JAR_NAME+" already downloaded") return os.path.join(os.getcwd(),JAR_NAME) AEROSPIKE_JAR_PATH=download_aerospike_spark_jar() os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell' # In[4]: import pyspark from pyspark.context import SparkContext from pyspark.sql.context import SQLContext from pyspark.sql.session import SparkSession from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType # Get a spark session object and set required Aerospike configuration properties # Set up spark and point aerospike db to AS_HOST # In[5]: sc = SparkContext.getOrCreate() conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING),("aerospike.keyPath",AS_FEATURE_KEY_PATH)]) sc.stop() sc = pyspark.SparkContext(conf=conf) spark = SparkSession(sc) sqlContext = SQLContext(sc) # ## Schema in the Spark Connector # # - Aerospike is schemaless, however spark adher to schema. After the schema is decided upon (either through inference or given), data within the bins must honor the types. # # - To infer schema, the connector samples a set of records (configurable through `aerospike.schema.scan`) to decide the name of bins/columns and their types. This implies that the derived schema depends entirely upon sampled records. # # - **Note that `__key` was not part of provided schema. So how can one query using `__key`? We can just add `__key` in provided schema with appropriate type. Similarly we can add `__gen` or `__ttl` etc.** # # schemaWithPK = StructType([ # StructField("__key",IntegerType(), False), # StructField("id", IntegerType(), False), # StructField("name", StringType(), False), # StructField("age", IntegerType(), False), # StructField("salary",IntegerType(), False)]) # # - **We recommend that you provide schema for queries that involve complex data types such as lists, maps, and mixed types. Using schema inference for CDT may cause unexpected issues.** # ### Flexible schema inference # # Spark assumes that the underlying data store (Aerospike in this case) follows a strict schema for all the records within a table. However, Aerospike is a No-SQL DB and is schemaless. Hence a single bin (mapped to a column ) within a set ( mapped to a table ) could technically hold values of multiple Aerospike supported types. The Spark connector reconciles this incompatibility with help of certain rules. Please choose the configuration that suits your use case. The strict configuration (aerospike.schema.flexible = false ) could be used when you have modeled your data in Aerospike to adhere to a strict schema i.e. each record within the set has the same schema. # # In[6]: import random num_records=100 schema = StructType( [ StructField("_id", IntegerType(), True), StructField("name", StringType(), True) ] ) inputBuf = [] for i in range(1, num_records) : name = "name" + str(i) id_ = i inputBuf.append((id_, name)) inputRDD = spark.sparkContext.parallelize(inputBuf) inputDF=spark.createDataFrame(inputRDD,schema) #Write the Sample Data to Aerospike inputDF \ .write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", "py_input_data")\ .option("aerospike.updateByKey", "_id") \ .save() # #### aerospike.schema.flexible = true (default) # # If none of the column types in the user-specified schema match the bin types of a record in Aerospike, a record with NULLs is returned in the result set. # # Please use the filter() in Spark to filter out NULL records. For e.g. df.filter("gender == NULL").show(false), where df is a dataframe and gender is a field that was not specified in the user-specified schema. # # If the above mismatch is limited to fewer columns in the user-specified schema then NULL would be returned for those columns in the result set. **Note: there is no way to tell apart a NULL due to missing value in the original data set and the NULL due to mismatch, at this point. Hence, the user would have to treat all NULLs as missing values.** The columns that are not a part of the schema will be automatically filtered out in the result set by the connector. # # Please note that if any field is set to NOT nullable i.e. nullable = false, your query will error out if there’s a type mismatch between an Aerospike bin and the column type specified in the user-specified schema. # # # In[7]: schemaIncorrect = StructType( [ StructField("_id", IntegerType(), True), StructField("name", IntegerType(), True) ##Note incorrect type of name bin ] ) flexSchemaInference=spark \ .read \ .format("aerospike") \ .schema(schemaIncorrect) \ .option("aerospike.set", "py_input_data").load() flexSchemaInference.show(5) ##notice all the contents of name column is null due to schema mismatch and aerospike.schema.flexible = true (by default) # #### aerospike.schema.flexible = false # # If a mismatch between the user-specified schema and the schema of a record in Aerospike is detected at the bin/column level, your query will error out. # # In[8]: #When strict matching is set, we will get an exception due to type mismatch with schema provided. try: errorDFStrictSchemaInference=spark \ .read \ .format("aerospike") \ .schema(schemaIncorrect) \ .option("aerospike.schema.flexible" ,"false") \ .option("aerospike.set", "py_input_data").load() errorDFStrictSchemaInference.show(5) except Exception as e: pass #This will throw error due to type mismatch # ## Create realistic sample data # In[9]: # We create age vs salary data, using three different Gaussian distributions import numpy as np import matplotlib.pyplot as plt import pandas as pd import math # Make sure we get the same results every time this workbook is run # Otherwise we are occasionally exposed to results not working out as expected np.random.seed(12345) # Create covariance matrix from std devs + correlation def covariance_matrix(std_dev_1,std_dev_2,correlation): return [[std_dev_1 ** 2, correlation * std_dev_1 * std_dev_2], [correlation * std_dev_1 * std_dev_2, std_dev_2 ** 2]] # Return a bivariate sample given means/std dev/correlation def age_salary_sample(distribution_params,sample_size): mean = [distribution_params["age_mean"], distribution_params["salary_mean"]] cov = covariance_matrix(distribution_params["age_std_dev"],distribution_params["salary_std_dev"], distribution_params["age_salary_correlation"]) return np.random.multivariate_normal(mean, cov, sample_size).T # Define the characteristics of our age/salary distribution age_salary_distribution_1 = {"age_mean":25,"salary_mean":50000, "age_std_dev":1,"salary_std_dev":5000,"age_salary_correlation":0.3} age_salary_distribution_2 = {"age_mean":45,"salary_mean":80000, "age_std_dev":4,"salary_std_dev":8000,"age_salary_correlation":0.7} age_salary_distribution_3 = {"age_mean":35,"salary_mean":70000, "age_std_dev":2,"salary_std_dev":9000,"age_salary_correlation":0.1} distribution_data = [age_salary_distribution_1,age_salary_distribution_2,age_salary_distribution_3] # Sample age/salary data for each distributions sample_size_1 = 100; sample_size_2 = 120; sample_size_3 = 80; sample_sizes = [sample_size_1,sample_size_2,sample_size_3] group_1_ages,group_1_salaries = age_salary_sample(age_salary_distribution_1,sample_size=sample_size_1) group_2_ages,group_2_salaries = age_salary_sample(age_salary_distribution_2,sample_size=sample_size_2) group_3_ages,group_3_salaries = age_salary_sample(age_salary_distribution_3,sample_size=sample_size_3) ages=np.concatenate([group_1_ages,group_2_ages,group_3_ages]) salaries=np.concatenate([group_1_salaries,group_2_salaries,group_3_salaries]) print("Data created") # ### Display simulated age/salary data # In[10]: # Plot the sample data group_1_colour, group_2_colour, group_3_colour ='red','blue', 'pink' plt.xlabel('Age',fontsize=10) plt.ylabel("Salary",fontsize=10) plt.scatter(group_1_ages,group_1_salaries,c=group_1_colour,label="Group 1") plt.scatter(group_2_ages,group_2_salaries,c=group_2_colour,label="Group 2") plt.scatter(group_3_ages,group_3_salaries,c=group_3_colour,label="Group 3") plt.legend(loc='upper left') plt.show() # ### Save data to Aerospike # In[11]: # Turn the above records into a Data Frame # First of all, create an array of arrays inputBuf = [] for i in range(0, len(ages)) : id = i + 1 # Avoid counting from zero name = "Individual: {:03d}".format(id) # Note we need to make sure values are typed correctly # salary will have type numpy.float64 - if it is not cast as below, an error will be thrown age = float(ages[i]) salary = int(salaries[i]) inputBuf.append((id, name,age,salary)) # Convert to an RDD inputRDD = spark.sparkContext.parallelize(inputBuf) # Convert to a data frame using a schema schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", DoubleType(), True), StructField("salary",IntegerType(), True) ]) inputDF=spark.createDataFrame(inputRDD,schema) #Write the data frame to Aerospike, the id field is used as the primary key inputDF \ .write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.set", "salary_data")\ .option("aerospike.updateByKey", "id") \ .save() # ### Insert data using sql insert staements # In[12]: #Aerospike DB needs a Primary key for record insertion. Hence, you must identify the primary key column #using for example .option(“aerospike.updateByKey”, “id”), where “id” is the name of the column that you’d #like to be the Primary key, while loading data from the DB. insertDFWithSchema=spark \ .read \ .format("aerospike") \ .schema(schema) \ .option("aerospike.set", "salary_data") \ .option("aerospike.updateByKey", "id") \ .load() sqlView="inserttable" # # V2 datasource doesn't allow insert into a view. # insertDFWithSchema.createTempView(sqlView) # preparedStatement = "insert into {view} values ({id}, 'Individual: {id:03d}', {age}, {salary})" # insertStatement1 = preparedStatement.format(id=sum(sample_sizes)+1,view=sqlView, # age=age_salary_distribution_1["age_mean"],salary=age_salary_distribution_1["salary_mean"]) # insertStatement2 = preparedStatement.format(id=sum(sample_sizes)+2,view=sqlView, # age=age_salary_distribution_2["age_mean"],salary=age_salary_distribution_2["salary_mean"]) # spark.sql(insertStatement1) # spark.sql(insertStatement2) # spark \ # .read \ # .format("aerospike") \ # .schema(schema) \ # .option("aerospike.set", "salary_data") \ # .option("aerospike.updateByKey", "id") \ # .load().where("id >{bound}".format(bound=sum(sample_sizes))).show() spark.sql("select * from inserttable").show() # ## Load data into a DataFrame without specifying any Schema (uses schema inference) # In[13]: # Create a Spark DataFrame by using the Connector Schema inference mechanism # The fields preceded with __ are metadata fields - key/digest/expiry/generation/ttl # By default you just get everything, with no column ordering, which is why it looks untidy # Note we don't get anything in the 'key' field as we have not chosen to save as a bin. # Use .option("aerospike.sendKey", True) to do this loadedDFWithoutSchema = ( spark.read.format("aerospike") \ .option("aerospike.set", "salary_data") \ .load() ) loadedDFWithoutSchema.show(10) # ## Load data into a DataFrame using user specified schema # In[14]: # If we explicitly set the schema, using the previously created schema object # we effectively type the rows in the Data Frame loadedDFWithSchema=spark \ .read \ .format("aerospike") \ .schema(schema) \ .option("aerospike.set", "salary_data").load() loadedDFWithSchema.show(5) # ## Working with complex Data Types (CDT) in Aerospike # # ### Save json into Aerospike using a schema # In[15]: # Schema specification aliases_type = StructType([ StructField("first_name",StringType(),False), StructField("last_name",StringType(),False) ]) id_type = StructType([ StructField("first_name",StringType(),False), StructField("last_name",StringType(),False), StructField("aliases",ArrayType(aliases_type),False) ]) street_adress_type = StructType([ StructField("street_name",StringType(),False), StructField("apt_number",IntegerType(),False) ]) address_type = StructType([ StructField("zip",LongType(),False), StructField("street",street_adress_type,False), StructField("city",StringType(),False) ]) workHistory_type = StructType([ StructField ("company_name",StringType(),False), StructField( "company_address",address_type,False), StructField("worked_from",StringType(),False) ]) person_type = StructType([ StructField("name",id_type,False), StructField("SSN",StringType(),False), StructField("home_address",ArrayType(address_type),False), StructField("work_history",ArrayType(workHistory_type),False) ]) # JSON data location complex_data_json="resources/nested_data.json" # Read data in using prepared schema cmplx_data_with_schema=spark.read.schema(person_type).json(complex_data_json) # Save data to Aerospike cmplx_data_with_schema \ .write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", "complex_input_data") \ .option("aerospike.updateByKey", "name.first_name") \ .save() # ### Retrieve CDT from Aerospike into a DataFrame using schema # In[16]: loadedComplexDFWithSchema=spark \ .read \ .format("aerospike") \ .option("aerospike.set", "complex_input_data") \ .schema(person_type) \ .load() loadedComplexDFWithSchema.show(5) # ## Data Exploration with Aerospike # In[17]: import pandas import matplotlib import matplotlib.pyplot as plt #convert spark df to pandas df pdf = loadedDFWithSchema.toPandas() # Describe the data pdf.describe() # In[18]: #Histogram - Age age_min, age_max = int(np.amin(pdf['age'])), math.ceil(np.amax(pdf['age'])) age_bucket_size = 5 print(age_min,age_max) pdf[['age']].plot(kind='hist',bins=range(age_min,age_max,age_bucket_size),rwidth=0.8) plt.xlabel('Age',fontsize=10) plt.legend(loc=None) plt.show() #Histogram - Salary salary_min, salary_max = int(np.amin(pdf['salary'])), math.ceil(np.amax(pdf['salary'])) salary_bucket_size = 5000 pdf[['salary']].plot(kind='hist',bins=range(salary_min,salary_max,salary_bucket_size),rwidth=0.8) plt.xlabel('Salary',fontsize=10) plt.legend(loc=None) plt.show() # Heatmap age_bucket_count = math.ceil((age_max - age_min)/age_bucket_size) salary_bucket_count = math.ceil((salary_max - salary_min)/salary_bucket_size) x = [[0 for i in range(salary_bucket_count)] for j in range(age_bucket_count)] for i in range(len(pdf['age'])): age_bucket = math.floor((pdf['age'][i] - age_min)/age_bucket_size) salary_bucket = math.floor((pdf['salary'][i] - salary_min)/salary_bucket_size) x[age_bucket][salary_bucket] += 1 plt.title("Salary/Age distribution heatmap") plt.xlabel("Salary in '000s") plt.ylabel("Age") plt.imshow(x, cmap='YlOrRd', interpolation='nearest',extent=[salary_min/1000,salary_max/1000,age_min,age_max], origin="lower") plt.colorbar(orientation="horizontal") plt.show() # # Querying Aerospike Data using SparkSQL # ## Notes # 1. Queries using the primary key will use batch gets - https://www.aerospike.com/docs/client/c/usage/kvs/batch.html] and run fast. # 2. All other queries may entail a full scan of the Aerospike DB if they can’t be converted to Aerospike batch get. # ## Queries that include Primary Key in the Predicate # # With batch get queries we can apply filters on metadata columns such as `__gen` or `__ttl`. To do this, these columns should be exposed through the schema. # In[19]: # Basic PKey query batchGet1= spark \ .read \ .format("aerospike") \ .option("aerospike.set", "salary_data") \ .option("aerospike.keyType", "int") \ .load().where("__key = 100") \ batchGet1.show() #Note ASDB only supports equality test with PKs in primary key query. #So, a where clause with "__key >10", would result in scan query! # In[20]: # Batch get, primary key based query from pyspark.sql.functions import col somePrimaryKeys= list(range(1,10)) someMoreKeys= list(range(12,14)) batchGet2= spark \ .read \ .format("aerospike") \ .option("aerospike.set", "salary_data") \ .option("aerospike.keyType", "int") \ .load().where((col("__key").isin(somePrimaryKeys)) | ( col("__key").isin(someMoreKeys))) batchGet2.show(5) # ## Queries including non-primary key conditions # In[21]: # This query will run as a scan, which will be slower somePrimaryKeys= list(range(1,10)) scanQuery1= spark \ .read \ .format("aerospike") \ .option("aerospike.set", "salary_data") \ .option("aerospike.keyType", "int") \ .load().where((col("__key").isin(somePrimaryKeys)) | ( col("age") >50 )) scanQuery1.show() # ## Query with CDT # In[22]: #Find people who have had at least 5 jobs in the past from pyspark.sql.functions import col, size loadedComplexDFWithSchema \ .withColumn("past_jobs", col("work_history.company_name")) \ .withColumn("num_jobs", size(col("past_jobs"))) \ .where(col("num_jobs") > 4) \ .show(5) # ## Parameters for tuning Aerospike / Spark performance # # - aerospike.partition.factor: number of logical aerospike partitions [0-15] # - aerospike.maxthreadcount : maximum number of threads to use for writing data into Aerospike # - aerospike.compression : compression of java client-server communication # - aerospike.batchMax : maximum number of records per read request (default 5000) # - aerospike.recordspersecond : same as java client # # ## Other useful parameters # - aerospike.keyType : Primary key type hint for schema inference. Always set it properly if primary key type is not string # # See https://www.aerospike.com/docs/connect/processing/spark/reference.html for detailed description of the above properties # # # Machine Learning using Aerospike / Spark # # In this section we use the data we took from Aerospike and apply a clustering algorithm to it. # # We assume the data is composed of multiple data sets having a Gaussian multi-variate distribution # # We don't know how many clusters there are, so we try clustering based on the assumption there are 1 through 20. # # We compare the quality of the results using the Bayesian Information Criterion - https://en.wikipedia.org/wiki/Bayesian_information_criterion and pick the best. # # ## Find Optimal Cluster Count # In[23]: from sklearn.mixture import GaussianMixture # We take the data we previously ages=pdf['age'] salaries=pdf['salary'] age_salary_matrix=np.matrix([ages,salaries]).T # Find the optimal number of clusters optimal_cluster_count = 1 best_bic_score = GaussianMixture(1).fit(age_salary_matrix).bic(age_salary_matrix) for count in range(1,20): gm=GaussianMixture(count) gm.fit(age_salary_matrix) if gm.bic(age_salary_matrix) < best_bic_score: best_bic_score = gm.bic(age_salary_matrix) optimal_cluster_count = count print("Optimal cluster count found to be "+str(optimal_cluster_count)) # ## Estimate cluster distribution parameters # Next we fit our cluster using the optimal cluster count, and print out the discovered means and covariance matrix # In[24]: gm = GaussianMixture(optimal_cluster_count) gm.fit(age_salary_matrix) estimates = [] # Index for index in range(0,optimal_cluster_count): estimated_mean_age = round(gm.means_[index][0],2) estimated_mean_salary = round(gm.means_[index][1],0) estimated_age_std_dev = round(math.sqrt(gm.covariances_[index][0][0]),2) estimated_salary_std_dev = round(math.sqrt(gm.covariances_[index][1][1]),0) estimated_correlation = round(gm.covariances_[index][0][1] / ( estimated_age_std_dev * estimated_salary_std_dev ),3) row = [estimated_mean_age,estimated_mean_salary,estimated_age_std_dev,estimated_salary_std_dev,estimated_correlation] estimates.append(row) pd.DataFrame(estimates,columns = ["Est Mean Age","Est Mean Salary","Est Age Std Dev","Est Salary Std Dev","Est Correlation"]) # ## Original Distribution Parameters # In[25]: distribution_data_as_rows = [] for distribution in distribution_data: row = [distribution['age_mean'],distribution['salary_mean'],distribution['age_std_dev'], distribution['salary_std_dev'],distribution['age_salary_correlation']] distribution_data_as_rows.append(row) pd.DataFrame(distribution_data_as_rows,columns = ["Mean Age","Mean Salary","Age Std Dev","Salary Std Dev","Correlation"]) # You can see that the algorithm provides good estimates of the original parameters # ## Prediction # # We generate new age/salary pairs for each of the distributions and look at how accurate the prediction is # In[26]: def prediction_accuracy(model,age_salary_distribution,sample_size): # Generate new values new_ages,new_salaries = age_salary_sample(age_salary_distribution,sample_size) new_age_salary_matrix=np.matrix([new_ages,new_salaries]).T # Find which cluster the mean would be classified into mean = np.matrix([age_salary_distribution['age_mean'],age_salary_distribution['salary_mean']]) mean_cluster_index = model.predict(mean)[0] # How would new samples be classified classification = model.predict(new_age_salary_matrix) # How many were classified correctly correctly_classified = len([ 1 for x in classification if x == mean_cluster_index]) return correctly_classified / sample_size prediction_accuracy_results = [None for x in range(3)] for index, age_salary_distribution in enumerate(distribution_data): prediction_accuracy_results[index] = prediction_accuracy(gm,age_salary_distribution,1000) overall_accuracy = sum(prediction_accuracy_results)/ len(prediction_accuracy_results) print("Accuracies for each distribution : "," ,".join(map('{:.2%}'.format,prediction_accuracy_results))) print("Overall accuracy : ",'{:.2%}'.format(overall_accuracy)) # In[ ]: