# IP Address or DNS name for one host in your Aerospike cluster
AS_HOST ="127.0.0.1"
# Name of one of your namespaces. Type 'show namespaces' at the aql prompt if you are not sure
AS_NAMESPACE = "testNameSpace"
AEROSPIKE_SPARK_JAR_VERSION="3.1.0"
AS_PORT = 3000 # Usually 3000, but change here if not
AS_CONNECTION_STRING = AS_HOST + ":"+ str(AS_PORT)
# Next we locate the Spark installation - this will be found using the SPARK_HOME
# environment variable that you will have set
import findspark
findspark.init()
Set AEROSPIKE_JAR_PATH
with path to the downloaded binary
import os
AEROSPIKE_JAR_PATH= "aerospike-spark-assembly-"+AEROSPIKE_SPARK_JAR_VERSION+".jar"
os.environ["PYSPARK_SUBMIT_ARGS"] = '--jars ' + AEROSPIKE_JAR_PATH + ' pyspark-shell'
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, IntegerType, MapType, LongType, DoubleType
sc = SparkContext.getOrCreate()
conf=sc._conf.setAll([("aerospike.namespace",AS_NAMESPACE),("aerospike.seedhost",AS_CONNECTION_STRING)])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)
#We will not specify the schema here, but rather use the schema inference capability of the Spark connector.
as_df=spark \
.read \
.format("aerospike") \
.option("aerospike.set", "covid_stats") \
.option("aerospike.sendKey", "true") \
.load()
as_df.show(5)
+--------------+--------------------+---------+------------+-------+------------+--------------+-----------+-----------+--------------+------------+-----------+-----------+-------------+------------+--------------+-----------+----------+ | __key| __digest| __expiry|__generation| __ttl|drate_100Kl7|conf_rate_100K|probable_dd|d_rate_100K| state_ter|total_deaths|total_cases|d_in_last_7|confirm_cases|crate_100Kl7|case_last_week|pbble_cases|confirm_dd| +--------------+--------------------+---------+------------+-------+------------+--------------+-----------+-----------+--------------+------------+-----------+-----------+-------------+------------+--------------+-----------+----------+ |Virgin Islands|[2D 40 5A 16 9B 9...|377621369| 2|2591982| 0.3| 1342.0| 0| 21.0|Virgin Islands| 23| 1405| 2| 0| 3.7| 27| 0| 0| |North Carolina|[83 70 D3 0C A3 2...|377621369| 2|2591982| 0.3| 2825.0| 94| 44.0|North Carolina| 4607| 293339| 224| 280213| 22.9| 16647| 13126| 4513| | Indiana|[91 60 2C F4 F4 4...|377621369| 2|2591982| 0.6| 3144.0| 246| 69.0| Indiana| 4629| 210374| 265| 0| 60.3| 28266| 0| 4383| | Oklahoma|[EF 70 A8 4C 85 0...|377621369| 2|2591982| 0.4| 3720.0| 43| 36.0| Oklahoma| 1450| 146692| 98| 124671| 58.5| 16151| 22021| 1407| | Missouri|[0A 91 83 C6 45 D...|377621369| 2|2591982| 0.3| 3415.0| 0| 51.0| Missouri| 3153| 209197| 127| 0| 55.2| 23662| 0| 0| +--------------+--------------------+---------+------------+-------+------------+--------------+-----------+-----------+--------------+------------+-----------+-----------+-------------+------------+--------------+-----------+----------+ only showing top 5 rows
as_df.printSchema()
root |-- __key: string (nullable = true) |-- __digest: binary (nullable = true) |-- __expiry: integer (nullable = false) |-- __generation: integer (nullable = false) |-- __ttl: integer (nullable = false) |-- drate_100Kl7: double (nullable = true) |-- conf_rate_100K: double (nullable = true) |-- probable_dd: long (nullable = true) |-- d_rate_100K: double (nullable = true) |-- state_ter: string (nullable = true) |-- total_deaths: long (nullable = true) |-- total_cases: long (nullable = true) |-- d_in_last_7: long (nullable = true) |-- confirm_cases: long (nullable = true) |-- crate_100Kl7: double (nullable = true) |-- case_last_week: long (nullable = true) |-- pbble_cases: long (nullable = true) |-- confirm_dd: long (nullable = true)
as_df.write.parquet("proto.parquet")
#Read in the parquet file created above
#Parquet files are self-describing so the schema is preserved
#The result of loading a Parquet file is also a DataFrame
parquetFileDF = spark.read.parquet("proto.parquet")
#Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
namesDF = spark.sql("SELECT state_ter as states, total_deaths as covid_deaths FROM parquetFile").show()
+--------------------+------------+ | states|covid_deaths| +--------------------+------------+ |Federated States ...| 0| |Republic of Marsh...| 0| |Northern Mariana ...| 2| |District of Columbia| 654| | North Carolina| 4607| | Virgin Islands| 23| | American Samoa| 0| | South Carolina| 4036| | New Hampshire| 489| | West Virginia| 502| | Massachusetts| 10131| | New York City| 24086| | New Jersey| 16429| | Guam| 88| | Pennsylvania| 9020| | Rhode Island| 1224| | North Dakota| 639| | Arizona| 6164| | California| 17963| | Idaho| 686| +--------------------+------------+ only showing top 20 rows
namesDF = spark.sql("SELECT state_ter as hot_zones FROM parquetFile where case_last_week > 10000").show()
+--------------+ | hot_zones| +--------------+ |North Carolina| | Massachusetts| | New Jersey| | Pennsylvania| | Arizona| | California| | Georgia| | Tennessee| | Wisconsin| | Minnesota| | Colorado| | Kentucky| | Illinois| | Virginia| | Missouri| | New York| | Nebraska| | Oklahoma| | Michigan| | Florida| +--------------+ only showing top 20 rows