#!/usr/bin/env python # coding: utf-8 # # Store JSON documents into Aerospike and query using Spark SQL # #### Tested with Spark connector 3.2.0, Java 8, Apache Spark 3.0.2, Python 3.7 # #### The purpose of this notebook is to walk you through how to store data from a JSON source into Aerospike and subsequently query it using Spark SQL. JSON documents are stored as [CDT](https://docs.aerospike.com/docs/guide/cdt.html) in the Aeorspike Database by the Spark connector # ### Setup # #### Ensure Database Is Running # This notebook requires that Aerospike datbase is running. # In[1]: get_ipython().system('asd >& /dev/null') get_ipython().system('pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"') # #### Set Aerospike, Spark, and Spark Connector Paths and Parameters # In[2]: # Directorie where spark related components are installed SPARK_NB_DIR = '/opt/spark-nb' SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2' # In[3]: # IP Address or DNS name for one host in your Aerospike cluster AS_HOST ="localhost" # Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure AS_NAMESPACE = "test" AEROSPIKE_SPARK_JAR_VERSION="3.2.0" AS_PORT = 3000 # Usually 3000, but change here if not AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT) # In[4]: # Aerospike Spark Connector settings import os AEROSPIKE_JAR_PATH = SPARK_NB_DIR + '/' + "aerospike-spark-assembly-" + AEROSPIKE_SPARK_JAR_VERSION + ".jar" os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell' # #### Alternative Setup for Running Notebook in Different Environment # Please follow the instructions below **instead of the setup above** if you are running this notebook in a different environment from the one provided by the Aerospike Intro-Notebooks container. # ``` # # IP Address or DNS name for one host in your Aerospike cluster # AS_HOST = "" # # Name of one of your namespaces. Type 'show namespaces' at the aql prompt # # if you are not sure # AS_NAMESPACE = "" # AEROSPIKE_SPARK_JAR_VERSION="" # AS_PORT = 3000 # Usually 3000, but change here if not # AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT) # # # Set SPARK_HOME path. # SPARK_HOME = '' # # # Please download the appropriate Aeropsike Connect for Spark from the [download page # # (https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html) # # Set `AEROSPIKE_JAR_PATH` with path to the downloaded binary # import os # AEROSPIKE_JAR_PATH= "/aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar" # os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell' # ``` # ### Spark Initialization # In[5]: # Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set import findspark findspark.init(SPARK_HOME) # In[6]: import pyspark from pyspark.context import SparkContext from pyspark.sql.context import SQLContext from pyspark.sql.session import SparkSession from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType # #### Configure Aerospike properties in the Spark Session object. Please visit [Configuring Aerospike Connect for Spark](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html) for more information about the properties used on this page. # In[7]: sc = SparkContext.getOrCreate() conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)]) sc.stop() sc = pyspark.SparkContext(conf=conf) spark = SparkSession(sc) sqlContext = SQLContext(sc) # ### View the JSON Documents that you plan to use for this test # In[8]: # JSON data location. For this test you can locate this in the same directory as the Spark connector JAR complex_data_json="nested_data.json" # In[9]: import json with open('nested_data.json') as f: for line in range(1): print(f.readline(),end='') # ### Use the StructType class to create a custom schema. Add columns by providing the column name, data type, and nullable option. # In[10]: # Schema specification aliases_type = StructType([ StructField("first_name",StringType(),False), StructField("last_name",StringType(),False) ]) id_type = StructType([ StructField("first_name",StringType(),False), StructField("last_name",StringType(),False), StructField("aliases",ArrayType(aliases_type),False) ]) street_adress_type = StructType([ StructField("street_name",StringType(),False), StructField("apt_number",IntegerType(),False) ]) address_type = StructType([ StructField("zip",LongType(),False), StructField("street",street_adress_type,False), StructField("city",StringType(),False) ]) workHistory_type = StructType([ StructField ("company_name",StringType(),False), StructField( "company_address",address_type,False), StructField("worked_from",StringType(),False) ]) person_type = StructType([ StructField("name",id_type,False), StructField("SSN",StringType(),False), StructField("home_address",ArrayType(address_type),False), StructField("work_history",ArrayType(workHistory_type),False) ]) # ### Write JSON documents into Aerospike by specifying the aforementioned schema. # In[11]: # Load the JSON file into a DF with the schema cmplx_data_with_schema=spark.read.schema(person_type).json(complex_data_json) # Save data to Aerospike cmplx_data_with_schema \ .write \ .mode('overwrite') \ .format("aerospike") \ .option("aerospike.writeset", "complex_input_data") \ .option("aerospike.updateByKey", "name.first_name") \ .save() # In[12]: cmplx_data_with_schema.printSchema() # #### Notice that JSON data is stored as CDT in Aerospike # aql> show bins \ # +-------+----------------+-------+-----------------+\ # | quota | bin | count | namespace |\ # +-------+----------------+-------+-----------------+\ # | 65535 | "home_address" | 4 | "testNameSpace" |\ # | 65535 | "name" | 4 | "testNameSpace" |\ # | 65535 | "SSN" | 4 | "testNameSpace" |\ # | 65535 | "work_history" | 4 | "testNameSpace" |\ # +-------+----------------+-------+-----------------+ # # aql> select * from testNameSpace.complex_input_data| home_address | name | SSN | work_history || LIST('[{"zip":33927, "street":{"apt_number":97293, "street_name":"Davenport Way"}, "city":"Freemanbury"}, {"zip":30072, "street":{"apt_number":82109, "street_name":"Fisher Bridge"}, "city":"New Jon"}, {"zip":34764, "street":{"apt_number":5944, "street_na | KEY_ORDERED_MAP('{"last_name":"Deleon", "first_name":"Kendra", "aliases":[{"last_name":"Reed", "first_name":"Tammy"}, {"last_name":"George", "first_name":"Amanda"}, {"last_name":"King", "first_name":"Michael"}, {"last_name":"Peterson", "first_name":"Mark | "472-01-0475" | LIST('[{"company_name":"Chapman and Sons", "company_address":{"zip":43184, "street":{"apt_number":14913, "street_name":"Sanchez Forks"}, "city":"Samanthaburgh"}, "worked_from":"26.04.2020"}, {"company_name":"Sparks LLC", "company_address":{"zip":35836, " | # | LIST('[{"zip":26201, "street":{"apt_number":7445, "street_name":"Bradley Islands"}, "city":"West Jessicaview"}, {"zip":64674, "street":{"apt_number":905, "street_name":"Stephanie Islands"}, "city":"Thomasburgh"}, {"zip":87688, "street":{"apt_number":6942 | KEY_ORDERED_MAP('{"last_name":"Anderson", "first_name":"Jeff", "aliases":[{"last_name":"Bell", "first_name":"Nicholas"}, {"last_name":"Garcia", "first_name":"Danielle"}, {"last_name":"Gutierrez", "first_name":"Jonathan"}, {"last_name":"Rosales", "first_n | "191-86-2935" | LIST('[{"company_name":"Mercer Inc", "company_address":{"zip":51771, "street":{"apt_number":76392, "street_name":"Johnson Ways"}, "city":"East Christopher"}, "worked_from":"05.05.2020"}, {"company_name":"Garza Inc", "company_address":{"zip":17587, "stree | # ### Load data from Aerospike CDT into a Spark DataFrame # In[13]: loadedComplexDFWithSchema=spark \ .read \ .format("aerospike") \ .option("aerospike.set", "complex_input_data") \ .schema(person_type) \ .load() loadedComplexDFWithSchema.show(5) # ### Create a view so that you can query Aerospike CDT # In[14]: loadedComplexDFWithSchema.registerTempTable("mytable") # #### Extract distinct company names from the work-history element # In[15]: sqlContext.sql("select distinct work_history.company_name as Company from mytable").show() # #### Look up a record using SSN # In[16]: sdf = spark.sql("select name.first_name as first, SSN, home_address.street.street_name as Street from mytable \ where SSN=\"396-98-0954\"") sdf.show() # #### Access nested fields # In[17]: sdf1 = spark.sql("select home_address as address from mytable") explode_df = sdf1.selectExpr("explode(address) AS structCol").selectExpr("structCol.*") explode_df.show() # ### Resources: # 1. [Query JSON in Python using Spark SQL](https://medium.com/@clementselvaraj/https-medium-com-querying-json-in-python-using-spark-sql-a08761946dd2) # 2. [An introduction to JSON support in Spark SQL](https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html) # 3. [Working with JSON in Apache Spark](https://medium.com/expedia-group-tech/working-with-json-in-apache-spark-1ecf553c2a8c) # # In[ ]: