This notebook requires that Aerospike datbase is running.
!asd >& /dev/null
!pgrep -x asd >/dev/null && echo "Aerospike database is running!" || echo "**Aerospike database is not running!**"
Aerospike database is running!
# Directorie where spark related components are installed
SPARK_NB_DIR = '/opt/spark-nb'
SPARK_HOME = SPARK_NB_DIR + '/spark-3.0.3-bin-hadoop3.2'
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="localhost"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "test"
AEROSPIKE_SPARK_JAR_VERSION="3.2.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Aerospike Spark Connector settings
import os
AEROSPIKE_JAR_PATH = SPARK_NB_DIR + '/' + "aerospike-spark-assembly-" + AEROSPIKE_SPARK_JAR_VERSION + ".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
Please follow the instructions below instead of the setup above if you are running this notebook in a different environment from the one provided by the Aerospike Intro-Notebooks container.
# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST = "<seed-host-ip>"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt
# if you are not sure
AS_NAMESPACE = "<namespace>"
AEROSPIKE_SPARK_JAR_VERSION="<spark-connector-version>"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Set SPARK_HOME path.
SPARK_HOME = '<spark-home-dir>'
# Please download the appropriate Aeropsike Connect for Spark from the [download page
# (https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html)
# Set `AEROSPIKE_JAR_PATH` with path to the downloaded binary
import os
AEROSPIKE_JAR_PATH= "<aerospike-jar-dir>/aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
# Next we locate the Spark installation - this will be found using the SPARK_HOME environment variable that you will have set
import findspark
findspark.init(SPARK_HOME)
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
# JSON data location. For this test you can locate this in the same directory as the Spark connector JAR
complex_data_json="nested_data.json"
import json
with open('nested_data.json') as f:
for line in range(1):
print(f.readline(),end='')
{"name": {"first_name": "Megan", "last_name": "Chang", "aliases": [{"first_name": "Robert", "last_name": "Green"}, {"first_name": "William", "last_name": "Sullivan"}, {"first_name": "Kristen", "last_name": "Turner"}, {"first_name": "Thomas", "last_name": "Silva"}, {"first_name": "Rebecca", "last_name": "Wagner"}]}, "SSN": "289-18-1554", "home_address": [{"zip": 81551, "street": {"street_name": "Archer Mountain", "apt_number": 924}, "city": "North Melissaborough"}, {"zip": 73876, "street": {"street_name": "Ryan Plain", "apt_number": 877}, "city": "Greenfort"}, {"zip": 72420, "street": {"street_name": "Davis Streets", "apt_number": 97}, "city": "Cookchester"}, {"zip": 92728, "street": {"street_name": "Lee Parks", "apt_number": 28711}, "city": "Goldenshire"}, {"zip": 64632, "street": {"street_name": "Andrea River", "apt_number": 8398}, "city": "Seanstad"}], "work_history": [{"company_name": "Johnston-Roberts", "company_address": {"zip": 25324, "street": {"street_name": "Johnson Wall", "apt_number": 11220}, "city": "Villanuevaside"}, "worked_from": "14.04.2020"}, {"company_name": "Massey, Warren and Boyd", "company_address": {"zip": 31368, "street": {"street_name": "Jacobson Path", "apt_number": 947}, "city": "New Isabella"}, "worked_from": "10.03.2020"}, {"company_name": "Salazar LLC", "company_address": {"zip": 83095, "street": {"street_name": "James Bridge", "apt_number": 35256}, "city": "Garcialand"}, "worked_from": "15.05.2020"}, {"company_name": "Montoya Group", "company_address": {"zip": 70519, "street": {"street_name": "Jones Coves", "apt_number": 91615}, "city": "Wilsonstad"}, "worked_from": "07.02.2020"}, {"company_name": "Lopez, Martinez and Clark", "company_address": {"zip": 8507, "street": {"street_name": "Johnson Landing", "apt_number": 41314}, "city": "East William"}, "worked_from": "08.01.2020"}]}
# Schema specification
aliases_type = StructType([
StructField("first_name",StringType(),False),
StructField("last_name",StringType(),False)
])
id_type = StructType([
StructField("first_name",StringType(),False),
StructField("last_name",StringType(),False),
StructField("aliases",ArrayType(aliases_type),False)
])
street_adress_type = StructType([
StructField("street_name",StringType(),False),
StructField("apt_number",IntegerType(),False)
])
address_type = StructType([
StructField("zip",LongType(),False),
StructField("street",street_adress_type,False),
StructField("city",StringType(),False)
])
workHistory_type = StructType([
StructField ("company_name",StringType(),False),
StructField( "company_address",address_type,False),
StructField("worked_from",StringType(),False)
])
person_type = StructType([
StructField("name",id_type,False),
StructField("SSN",StringType(),False),
StructField("home_address",ArrayType(address_type),False),
StructField("work_history",ArrayType(workHistory_type),False)
])
# Load the JSON file into a DF with the schema
cmplx_data_with_schema=spark.read.schema(person_type).json(complex_data_json)
# Save data to Aerospike
cmplx_data_with_schema \
.write \
.mode('overwrite') \
.format("aerospike") \
.option("aerospike.writeset", "complex_input_data") \
.option("aerospike.updateByKey", "name.first_name") \
.save()
cmplx_data_with_schema.printSchema()
root |-- name: struct (nullable = true) | |-- first_name: string (nullable = true) | |-- last_name: string (nullable = true) | |-- aliases: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- first_name: string (nullable = true) | | | |-- last_name: string (nullable = true) |-- SSN: string (nullable = true) |-- home_address: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- zip: long (nullable = true) | | |-- street: struct (nullable = true) | | | |-- street_name: string (nullable = true) | | | |-- apt_number: integer (nullable = true) | | |-- city: string (nullable = true) |-- work_history: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- company_name: string (nullable = true) | | |-- company_address: struct (nullable = true) | | | |-- zip: long (nullable = true) | | | |-- street: struct (nullable = true) | | | | |-- street_name: string (nullable = true) | | | | |-- apt_number: integer (nullable = true) | | | |-- city: string (nullable = true) | | |-- worked_from: string (nullable = true)
aql> show bins
+-------+----------------+-------+-----------------+
| quota | bin | count | namespace |
+-------+----------------+-------+-----------------+
| 65535 | "home_address" | 4 | "testNameSpace" |
| 65535 | "name" | 4 | "testNameSpace" |
| 65535 | "SSN" | 4 | "testNameSpace" |
| 65535 | "work_history" | 4 | "testNameSpace" |
+-------+----------------+-------+-----------------+
aql> select * from testNameSpace.complex_input_data
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| home_address | name | SSN | work_history |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| LIST('[{"zip":33927, "street":{"apt_number":97293, "street_name":"Davenport Way"}, "city":"Freemanbury"}, {"zip":30072, "street":{"apt_number":82109, "street_name":"Fisher Bridge"}, "city":"New Jon"}, {"zip":34764, "street":{"apt_number":5944, "street_na | KEY_ORDERED_MAP('{"last_name":"Deleon", "first_name":"Kendra", "aliases":[{"last_name":"Reed", "first_name":"Tammy"}, {"last_name":"George", "first_name":"Amanda"}, {"last_name":"King", "first_name":"Michael"}, {"last_name":"Peterson", "first_name":"Mark | "472-01-0475" | LIST('[{"company_name":"Chapman and Sons", "company_address":{"zip":43184, "street":{"apt_number":14913, "street_name":"Sanchez Forks"}, "city":"Samanthaburgh"}, "worked_from":"26.04.2020"}, {"company_name":"Sparks LLC", "company_address":{"zip":35836, " | | LIST('[{"zip":26201, "street":{"apt_number":7445, "street_name":"Bradley Islands"}, "city":"West Jessicaview"}, {"zip":64674, "street":{"apt_number":905, "street_name":"Stephanie Islands"}, "city":"Thomasburgh"}, {"zip":87688, "street":{"apt_number":6942 | KEY_ORDERED_MAP('{"last_name":"Anderson", "first_name":"Jeff", "aliases":[{"last_name":"Bell", "first_name":"Nicholas"}, {"last_name":"Garcia", "first_name":"Danielle"}, {"last_name":"Gutierrez", "first_name":"Jonathan"}, {"last_name":"Rosales", "first_n | "191-86-2935" | LIST('[{"company_name":"Mercer Inc", "company_address":{"zip":51771, "street":{"apt_number":76392, "street_name":"Johnson Ways"}, "city":"East Christopher"}, "worked_from":"05.05.2020"}, {"company_name":"Garza Inc", "company_address":{"zip":17587, "stree |
loadedComplexDFWithSchema=spark \
.read \
.format("aerospike") \
.option("aerospike.set", "complex_input_data") \
.schema(person_type) \
.load()
loadedComplexDFWithSchema.show(5)
+--------------------+-----------+--------------------+--------------------+ | name| SSN| home_address| work_history| +--------------------+-----------+--------------------+--------------------+ |[Maria, Bates, [[...|165-16-6030|[[2399, [Ebony Un...|[[Adams-Guzman, [...| |[Brenda, Gonzales...|396-98-0954|[[63320, [Diane O...|[[Powell Group, [...| |[Bryan, Davis, [[...|682-39-2482|[[47508, [Cooper ...|[[Rivera-Ruiz, [1...| |[Tami, Jordan, [[...|001-49-0685|[[23288, [Clark V...|[[Roberts PLC, [4...| |[Connie, Joyce, [...|369-38-9885|[[27216, [Goodman...|[[Pugh, Walsh and...| +--------------------+-----------+--------------------+--------------------+ only showing top 5 rows
loadedComplexDFWithSchema.registerTempTable("mytable")
sqlContext.sql("select distinct work_history.company_name as Company from mytable").show()
+--------------------+ | Company| +--------------------+ |[Chapman and Sons...| |[Johnson and Sons...| |[Mclean Ltd, Kerr...| |[Edwards, Rogers ...| |[Marshall, Cox an...| |[Wolf, Kennedy an...| |[Williams Ltd, Jo...| |[Smith-Cook, Patt...| |[Martin Group, Sp...| |[Sutton-Long, Ada...| |[Washington Inc, ...| |[Valenzuela PLC, ...| |[Porter and Sons,...| |[Hudson Group, Br...| |[Guzman Group, Cu...| |[Bowers LLC, Wats...| |[Robbins, Harris ...| |[Wilson Inc, Pete...| |[Elliott-Fuller, ...| |[Campbell-Lee, An...| +--------------------+ only showing top 20 rows
sdf = spark.sql("select name.first_name as first, SSN, home_address.street.street_name as Street from mytable \
where SSN=\"396-98-0954\"")
sdf.show()
+------+-----------+--------------------+ | first| SSN| Street| +------+-----------+--------------------+ |Brenda|396-98-0954|[Diane Overpass, ...| +------+-----------+--------------------+
sdf1 = spark.sql("select home_address as address from mytable")
explode_df = sdf1.selectExpr("explode(address) AS structCol").selectExpr("structCol.*")
explode_df.show()
+-----+--------------------+--------------------+ | zip| street| city| +-----+--------------------+--------------------+ | 2399| [Ebony Union, 22]| Robertohaven| |70689| [Scott Skyway, 755]| Mclaughlinton| |58472|[Tiffany Course, ...| Lake Shannon| |89243| [Tapia Rapids, 854]| Karenton| |63320|[Diane Overpass, 12]| New Nicholas| |60950| [Julie Lock, 52396]| Contrerasville| |47508|[Cooper Vista, 59...| Port Tanya| |10918| [Jones Plaza, 5430]| Jonesmouth| |23288|[Clark Village, 9...| Frankport| |79837| [Megan Rest, 561]| Williamsside| |36853|[Kayla Orchard, 491]|North Michaelborough| |68729| [Hunt Port, 595]| West Jeremy| |27216| [Goodman Isle, 73]| Lake Wendy| |69643|[Brown Spring, 7872]| North Kristin| |93147|[Ryan Freeway, 4316]| South Krystalport| |49305| [Ward Bypass, 9262]| South Joyland| |99893|[Knight Courts, 1...| Lake Williamfort| |89962|[Meghan Highway, ...| Port Garyton| |38066|[Richard Stream, ...| New Holly| |36042| [Eric Haven, 1741]| Patriciatown| +-----+--------------------+--------------------+ only showing top 20 rows