title: "Hive PySpark Example" date: 2021-02-24 type: technical_note draft: false
In this notebook we'll cover how you can read/write to Hive using SparkSQL, this notebook assumes that you have enabled the service "Hive" in your project
sparkmagic automatically creates a spark session in the cluster for us with Hive enabled
spark
Using the spark session you can interact with Hive through the sql
method on the sparkSession, or through auxillary methods likes .select()
and .where()
.
Each project that have enabled Hive will automatically have a Hive database created for them, this is the only Hive database that you can access unless someone have shared their database with you.
from hops import hdfs as hopsfs
PROJECT_NAME = hopsfs.project_name()
PROJECT_NAME
'test'
spark.sql("use " + PROJECT_NAME)
DataFrame[]
Tables can be created either by issuing a CREATE TABLE
statement or by using the saveAsTable()
method on an existing dataframe. When using saveAsTable
spark will infer the schema from the dataframe and do the CREATE TABLE
for you.
spark.sql("show tables").show()
+--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+
spark.sql("CREATE TABLE MAGIC_MATRIX (position int, value float) STORED AS ORC")
DataFrame[]
spark.sql("show tables").show()
+--------+------------+-----------+ |database| tableName|isTemporary| +--------+------------+-----------+ | test|magic_matrix| false| +--------+------------+-----------+
from pyspark.sql.types import *
schema = StructType([StructField('SquaredValue', IntegerType(), True)])
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark.sparkContext)
rddValues = spark.sparkContext.parallelize(list(range(0,100))).map(lambda x: [x*x])
dfValues = sqlContext.createDataFrame(rddValues,schema)
dfValues.show(5)
+------------+ |SquaredValue| +------------+ | 0| | 1| | 4| | 9| | 16| +------------+ only showing top 5 rows
dfValues.write.format("ORC").mode("overwrite").saveAsTable("SquaredValues")
spark.sql("show tables").show()
+--------+-------------+-----------+ |database| tableName|isTemporary| +--------+-------------+-----------+ | test| magic_matrix| false| | test|squaredvalues| false| +--------+-------------+-----------+
Values can be inserted with plain SQL or by using saveAsTable
/ insertInto
spark.sql("INSERT INTO TABLE magic_matrix VALUES (1, 99), (2, 100)")
DataFrame[]
spark.sql("SELECT * FROM magic_matrix").show()
+--------+-----+ |position|value| +--------+-----+ | 1| 99.0| | 2|100.0| +--------+-----+
rddValues2 = spark.sparkContext.parallelize(list(range(100,200))).map(lambda x: [x*x])
dfValues2 = sqlContext.createDataFrame(rddValues2,schema)
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+ |count(1)| +--------+ | 100| +--------+
dfValues2.write.format("ORC").mode("append").saveAsTable("squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+ |count(1)| +--------+ | 200| +--------+
dfValues2.write.mode("append").insertInto("squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+ |count(1)| +--------+ | 300| +--------+
You can also use overwrite mode:
dfValues2.write.format("ORC").mode("overwrite").saveAsTable("squaredvalues")
spark.sql("REFRESH TABLE squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+ |count(1)| +--------+ | 100| +--------+
rddValues3 = spark.sparkContext.parallelize(list(range(200,300))).map(lambda x: [x*x])
dfValues3 = sqlContext.createDataFrame(rddValues3,schema)
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+ |count(1)| +--------+ | 200| +--------+
dfValues3.registerTempTable("temptable")
sqlContext.sql("insert into table squaredvalues select * from temptable")
DataFrame[]
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+ |count(1)| +--------+ | 300| +--------+
spark.sql("SELECT * FROM squaredvalues WHERE squaredvalue > 380 ").show()
+------------+ |SquaredValue| +------------+ | 40000| | 40401| | 40804| | 41209| | 41616| | 42025| | 42436| | 42849| | 43264| | 43681| | 44100| | 44521| | 44944| | 45369| | 45796| | 46225| | 46656| | 47089| | 47524| | 47961| +------------+ only showing top 20 rows
spark.sql("SELECT * FROM magic_matrix WHERE position = 2 ").show()
+--------+-----+ |position|value| +--------+-----+ | 2|100.0| +--------+-----+
spark.sql("SHOW TABLES").show()
+--------------+-------------+-----------+ | database| tableName|isTemporary| +--------------+-------------+-----------+ |sparksqlonhive| magic_matrix| false| |sparksqlonhive|squaredvalues| false| | | temptable| true| +--------------+-------------+-----------+
spark.sql("DROP TABLE magic_matrix")
DataFrame[]
spark.sql("SHOW TABLES").show()
+--------------+-------------+-----------+ | database| tableName|isTemporary| +--------------+-------------+-----------+ |sparksqlonhive|squaredvalues| false| | | temptable| true| +--------------+-------------+-----------+
spark.sql("DROP TABLE squaredvalues")
DataFrame[]
spark.sql("SHOW TABLES").show()
+--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ | |temptable| true| +--------+---------+-----------+
spark.catalog.dropTempView("temptable")
spark.sql("SHOW TABLES").show()
+--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+