#!/usr/bin/env python
# coding: utf-8
# # Store JSON documents into Aerospike and query using Spark SQL
# #### Tested with Spark connector 3.2.0, Java 8, Apache Spark 3.0.2, Python 3.7
# #### The purpose of this notebook is to walk you through how to store data from a JSON source into Aerospike and subsequently query it using Spark SQL. JSON documents are stored as [CDT](https://docs.aerospike.com/docs/guide/cdt.html) in the Aeorspike Database by the Spark connector
# ### Setup
# #### Ensure Database Is Running
# This notebook requires that Aerospike datbase is running.
# In[1]:
get_ipython().system('asd >& /dev/null')
get_ipython().system('pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"')
# #### Set Aerospike, Spark, and Spark Connector Paths and Parameters
# In[2]:
# Directorie where spark related components are installed
SPARK_NB_DIR = '/opt/spark-nb'
SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
# In[3]:
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="localhost"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AEROSPIKE_SPARK_JAR_VERSION="3.2.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# In[4]:
# Aerospike Spark Connector settings
import os
AEROSPIKE_JAR_PATH = SPARK_NB_DIR + '/' + "aerospike-spark-assembly-" + AEROSPIKE_SPARK_JAR_VERSION + ".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
# #### Alternative Setup for Running Notebook in Different Environment
# Please follow the instructions below **instead of the setup above** if you are running this notebook in a different environment from the one provided by the Aerospike Intro-Notebooks container.
# ```
# # IP Address or DNS name for one host in your Aerospike cluster
# AS_HOST = ""
# # Name of one of your namespaces. Type 'show namespaces' at the aql prompt
# # if you are not sure
# AS_NAMESPACE = ""
# AEROSPIKE_SPARK_JAR_VERSION=""
# AS_PORT = 3000 # Usually 3000, but change here if not
# AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
#
# # Set SPARK_HOME path.
# SPARK_HOME = ''
#
# # Please download the appropriate Aeropsike Connect for Spark from the [download page
# # (https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html)
# # Set `AEROSPIKE_JAR_PATH` with path to the downloaded binary
# import os
# AEROSPIKE_JAR_PATH= "/aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
# os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
# ```
# ### Spark Initialization
# In[5]:
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set
import findspark
findspark.init(SPARK_HOME)
# In[6]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
# #### Configure Aerospike properties in the Spark Session object. Please visit [Configuring Aerospike Connect for Spark](https://docs.aerospike.com/docs/connect/processing/spark/configuration.html) for more information about the properties used on this page.
# In[7]:
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
# ### View the JSON Documents that you plan to use for this test
# In[8]:
# JSON data location. For this test you can locate this in the same directory as the Spark connector JAR
complex_data_json="nested_data.json"
# In[9]:
import json
with open('nested_data.json') as f:
for line in range(1):
print(f.readline(),end='')
# ### Use the StructType class to create a custom schema. Add columns by providing the column name, data type, and nullable option.
# In[10]:
# Schema specification
aliases_type = StructType([
StructField("first_name",StringType(),False),
StructField("last_name",StringType(),False)
])
id_type = StructType([
StructField("first_name",StringType(),False),
StructField("last_name",StringType(),False),
StructField("aliases",ArrayType(aliases_type),False)
])
street_adress_type = StructType([
StructField("street_name",StringType(),False),
StructField("apt_number",IntegerType(),False)
])
address_type = StructType([
StructField("zip",LongType(),False),
StructField("street",street_adress_type,False),
StructField("city",StringType(),False)
])
workHistory_type = StructType([
StructField ("company_name",StringType(),False),
StructField( "company_address",address_type,False),
StructField("worked_from",StringType(),False)
])
person_type = StructType([
StructField("name",id_type,False),
StructField("SSN",StringType(),False),
StructField("home_address",ArrayType(address_type),False),
StructField("work_history",ArrayType(workHistory_type),False)
])
# ### Write JSON documents into Aerospike by specifying the aforementioned schema.
# In[11]:
# Load the JSON file into a DF with the schema
cmplx_data_with_schema=spark.read.schema(person_type).json(complex_data_json)
# Save data to Aerospike
cmplx_data_with_schema \
.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "complex_input_data") \
.option("aerospike.updateByKey", "name.first_name") \
.save()
# In[12]:
cmplx_data_with_schema.printSchema()
# #### Notice that JSON data is stored as CDT in Aerospike
# aql> show bins \
# +-------+----------------+-------+-----------------+\
# | quota | bin | count | namespace |\
# +-------+----------------+-------+-----------------+\
# | 65535 | "home_address" | 4 | "testNameSpace" |\
# | 65535 | "name" | 4 | "testNameSpace" |\
# | 65535 | "SSN" | 4 | "testNameSpace" |\
# | 65535 | "work_history" | 4 | "testNameSpace" |\
# +-------+----------------+-------+-----------------+
#
# aql> select * from testNameSpace.complex_input_data
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# | home_address | name | SSN | work_history |
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#
# | LIST('[{"zip":33927, "street":{"apt_number":97293, "street_name":"Davenport Way"}, "city":"Freemanbury"}, {"zip":30072, "street":{"apt_number":82109, "street_name":"Fisher Bridge"}, "city":"New Jon"}, {"zip":34764, "street":{"apt_number":5944, "street_na | KEY_ORDERED_MAP('{"last_name":"Deleon", "first_name":"Kendra", "aliases":[{"last_name":"Reed", "first_name":"Tammy"}, {"last_name":"George", "first_name":"Amanda"}, {"last_name":"King", "first_name":"Michael"}, {"last_name":"Peterson", "first_name":"Mark | "472-01-0475" | LIST('[{"company_name":"Chapman and Sons", "company_address":{"zip":43184, "street":{"apt_number":14913, "street_name":"Sanchez Forks"}, "city":"Samanthaburgh"}, "worked_from":"26.04.2020"}, {"company_name":"Sparks LLC", "company_address":{"zip":35836, " |
# | LIST('[{"zip":26201, "street":{"apt_number":7445, "street_name":"Bradley Islands"}, "city":"West Jessicaview"}, {"zip":64674, "street":{"apt_number":905, "street_name":"Stephanie Islands"}, "city":"Thomasburgh"}, {"zip":87688, "street":{"apt_number":6942 | KEY_ORDERED_MAP('{"last_name":"Anderson", "first_name":"Jeff", "aliases":[{"last_name":"Bell", "first_name":"Nicholas"}, {"last_name":"Garcia", "first_name":"Danielle"}, {"last_name":"Gutierrez", "first_name":"Jonathan"}, {"last_name":"Rosales", "first_n | "191-86-2935" | LIST('[{"company_name":"Mercer Inc", "company_address":{"zip":51771, "street":{"apt_number":76392, "street_name":"Johnson Ways"}, "city":"East Christopher"}, "worked_from":"05.05.2020"}, {"company_name":"Garza Inc", "company_address":{"zip":17587, "stree |
# ### Load data from Aerospike CDT into a Spark DataFrame
# In[13]:
loadedComplexDFWithSchema=spark \
.read \
.format("aerospike") \
.option("aerospike.set", "complex_input_data") \
.schema(person_type) \
.load()
loadedComplexDFWithSchema.show(5)
# ### Create a view so that you can query Aerospike CDT
# In[14]:
loadedComplexDFWithSchema.registerTempTable("mytable")
# #### Extract distinct company names from the work-history element
# In[15]:
sqlContext.sql("select distinct work_history.company_name as Company from mytable").show()
# #### Look up a record using SSN
# In[16]:
sdf = spark.sql("select name.first_name as first, SSN, home_address.street.street_name as Street from mytable \
where SSN=\"396-98-0954\"")
sdf.show()
# #### Access nested fields
# In[17]:
sdf1 = spark.sql("select home_address as address from mytable")
explode_df = sdf1.selectExpr("explode(address) AS structCol").selectExpr("structCol.*")
explode_df.show()
# ### Resources:
# 1. [Query JSON in Python using Spark SQL](https://medium.com/@clementselvaraj/https-medium-com-querying-json-in-python-using-spark-sql-a08761946dd2)
# 2. [An introduction to JSON support in Spark SQL](https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html)
# 3. [Working with JSON in Apache Spark](https://medium.com/expedia-group-tech/working-with-json-in-apache-spark-1ecf553c2a8c)
#
# In[ ]: