Nessie Spark SQL Demo with NBA Dataset

This demo showcases how to use Nessie Python API along with Spark3 from Delta Lake

Initialize Pyspark

To get started, we will first have to do a few setup steps that give us everything we need to get started with Nessie. In case you're interested in the detailed setup steps for Spark, you can check out the docs

The Binder server has downloaded spark and some data for us as well as started a Nessie server in the background. All we have to do is start Spark

The below cell starts a local Spark session with parameters needed to configure Nessie. Each config option is followed by a comment explaining its purpose.

In [ ]:
import os
import findspark
from pyspark.sql import *
from pyspark import SparkConf
import pynessie
findspark.init()
pynessie_version = pynessie.__version__

warehouse = 'file://' + os.getcwd() + '/spark_warehouse/delta'
conf = SparkConf()
# we add our custom fork of delta to the known repositories
conf.set("spark.jars.repositories", "https://storage.googleapis.com/nessie-maven")
# we need delta libraries and the nessie sql extensions
conf.set("spark.jars.packages", f"org.projectnessie:nessie-deltalake:{pynessie_version},org.projectnessie:nessie-spark-extensions:{pynessie_version}")
# ensure python <-> java interactions are w/ pyarrow
conf.set("spark.sql.execution.pyarrow.enabled", "true")
# create catalog dev_catalog as a Delta catalog
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# set the location for Nessie catalog to store data. Spark writes to this directory
conf.set("spark.sql.catalog.spark_catalog.warehouse", warehouse)
# set the location of the nessie server. In this demo its running locally. There are many ways to run it (see https://projectnessie.org/try/)
conf.set("spark.sql.catalog.spark_catalog.uri", "http://localhost:19120/api/v1")
# default branch for Nessie catalog to work on
conf.set("spark.sql.catalog.spark_catalog.ref", "main")
# use no authorization. Options are NONE AWS BASIC and aws implies running Nessie on a lambda
conf.set("spark.sql.catalog.spark_catalog.auth_type", "NONE")
# These two lines tell Delta to use Nessie as the internal storage handler thereby enabling Delta/Nessie integraton
conf.set("spark.delta.logFileHandler.class", "org.projectnessie.deltalake.NessieLogFileMetaParser")
conf.set("spark.delta.logStore.class", "org.projectnessie.deltalake.NessieLogStore")
# enable the extensions for both Nessie and Delta
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
# finally, start up the Spark server
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

Solving Data Engineering problems with Nessie

In this Demo we are a data engineer working at a fictional sports analytics blog. In order for the authors to write articles they have to have access to the relevant data. They need to be able to retrieve data quickly and be able to create charts with it.

We have been asked to collect and expose some information about basketball players. We have located some data sources and are now ready to start ingesting data into our data lakehouse. We will perform the ingestion steps on a Nessie branch to test and validate the data before exposing to the analysts.

Set up Nessie branches

Once all dependencies are configured, we can get started with ingesting our basketball data into Nessie with the following steps:

  • Create a new branch named dev
  • List all branches

It is worth mentioning that we don't have to explicitly create a main branch, since it's the default branch.

In [ ]:
spark.sql("CREATE BRANCH dev AS main").toPandas()

We have created the branch dev and we can see the branch with the Nessie hash its currently pointing to.

Below we list all branches. Note that the auto created main branch already exists and both branches point at the same hash

In [ ]:
spark.sql("LIST REFERENCES").toPandas() 

Create tables under dev branch

Once we created the dev branch and verified that it exists, we can create some tables and add some data.

We create two tables under the dev branch:

  • salaries
  • totals_stats

These tables list the salaries per player per year and their stats per year.

To create the data we:

  1. switch our branch context to dev
  2. create the table
  3. insert the data from an existing csv file. This csv file is already stored locally on the demo machine. A production use case would likely take feeds from official data sources
In [ ]:
spark.sql("USE REFERENCE dev")

# Creating `salaries` table
spark.sql("""CREATE TABLE IF NOT EXISTS salaries (Season STRING, Team STRING, Salary STRING, Player STRING)
             USING delta LOCATION '{}/salaries'""".format(warehouse))

spark.sql("""CREATE OR REPLACE TEMPORARY VIEW salaries_table USING csv
             OPTIONS (path "../datasets/nba/salaries.csv", header true)""")
spark.sql('INSERT INTO salaries SELECT * FROM salaries_table')

# Creating `totals_stats` table
spark.sql("""CREATE TABLE IF NOT EXISTS totals_stats (
             Season STRING, Age STRING, Team STRING, ORB STRING, DRB STRING, TRB STRING, AST STRING, STL STRING, 
             BLK STRING, TOV STRING, PTS STRING, Player STRING, RSorPO STRING) 
             USING delta LOCATION '{}/total_stats'""".format(warehouse))
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW stats_table USING csv
             OPTIONS (path "../datasets/nba/totals_stats.csv", header true)""")
spark.sql('INSERT INTO totals_stats SELECT * FROM stats_table').toPandas()

Now we count the rows in our tables to ensure they are the same number as the csv files.

In [ ]:
table_count = spark.sql("select count(*) from salaries").toPandas().values[0][0]
csv_count = spark.sql("select count(*) from salaries_table").toPandas().values[0][0]
assert table_count == csv_count
print(table_count)

table_count = spark.sql("select count(*) from totals_stats").toPandas().values[0][0]
csv_count = spark.sql("select count(*) from stats_table").toPandas().values[0][0]
assert table_count == csv_count
print(table_count)

Check generated tables

Since we have been working solely on the dev branch, where we created 2 tables and added some data, let's verify that the main branch was not altered by our changes.

Note: SHOW TABLES does not work on Delta because the Delta Catalog has no concept of references. We have to use the command line instead. In this demo we are switching the reference around regularly which means SHOW TABLES isn't always reliable. In the situation where your Spark job is only using one reference we can safely call SHOW TABLES.

In [ ]:
!nessie contents --list

And on the dev branch we expect to see two tables

In [ ]:
!nessie contents --list --ref dev

We can also verify that the dev and main branches point to different commits

In [ ]:
spark.sql("LIST REFERENCES").toPandas()

Dev promotion into main

Once we are done with our changes on the dev branch, we would like to merge those changes into main. We merge dev into main via the Spark sql merge command. Both branches should be at the same revision after merging/promotion.

In [ ]:
spark.sql("MERGE BRANCH dev").toPandas()

We can verify the branches are at the same hash and that the main branch now contains the expected tables and row counts.

The tables are now on main and ready for consumtion by our blog authors and analysts!

In [ ]:
spark.sql("LIST REFERENCES").toPandas()
In [ ]:
!nessie contents --list
In [ ]:
spark.sql("USE REFERENCE main")
table_count = spark.sql("select count(*) from salaries").toPandas().values[0][0]
csv_count = spark.sql("select count(*) from salaries_table").toPandas().values[0][0]
assert table_count == csv_count
print(table_count)

table_count = spark.sql("select count(*) from totals_stats").toPandas().values[0][0]
csv_count = spark.sql("select count(*) from stats_table").toPandas().values[0][0]
assert table_count == csv_count
print(table_count)

Perform regular ETL on the new tables

Our analysts are happy with the data and we want to now regularly ingest data to keep things up to date. Our first ETL job consists of the following:

  1. Update the salaries table to add new data
  2. We add Years column to total_stats table to show how many years the player was in the league
  3. We create a new table to hold information about the players appearances in all star games

As always we will do this work on a branch and verify the results. This ETL job can then be set up to run nightly with new stats and salary information.

In [ ]:
spark.sql("CREATE BRANCH etl AS main").toPandas()
In [ ]:
# add some salaries for Kevin Durant
spark.sql("USE REFERENCE etl")
spark.sql('''INSERT INTO salaries VALUES 
    ("2017-18", "Golden State Warriors", "$25000000", "Kevin Durant"),
    ("2018-19", "Golden State Warriors", "$30000000", "Kevin Durant"),
    ("2019-20", "Brooklyn Nets", "$37199000", "Kevin Durant"),
    ("2020-21", "Brooklyn Nets", "$39058950", "Kevin Durant")
    ''').toPandas()
In [ ]:
# Adding a column in the `totals_stats` table
spark.sql("ALTER TABLE totals_stats ADD COLUMNS (Years STRING)").toPandas()
In [ ]:
# Creating `allstar_games_stats` table and viewing the contents
spark.sql("""CREATE TABLE IF NOT EXISTS allstar_games_stats (
          Season STRING, Age STRING, Team STRING, ORB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, 
          TOV STRING, PF STRING, PTS STRING, Player STRING) 
          USING delta LOCATION '{}/allstar_stats'""".format(warehouse))
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW allstar_table USING csv
             OPTIONS (path "../datasets/nba/allstar_games_stats.csv", header true)""")
spark.sql('INSERT INTO allstar_games_stats SELECT * FROM allstar_table').toPandas()

# notice how we view the data on the etl branch via @etl
spark.sql("select count(*) from allstar_games_stats").toPandas()

We can verify that the new table isn't on the main branch but is present on the etl branch

In [ ]:
!nessie contents --list
In [ ]:
!nessie contents --list --ref etl

Now that we are happy with the data we can again merge it into main

In [ ]:
spark.sql("MERGE BRANCH etl").toPandas()

Now lets verify that the changes exist on the main branch and that the main and etl branches have the same hash

In [ ]:
!nessie contents --list
In [ ]:
spark.sql("LIST REFERENCES").toPandas()
In [ ]:
spark.sql("USE REFERENCE main").toPandas()
table_count = spark.sql("select count(*) from allstar_games_stats").toPandas().values[0][0]
csv_count = spark.sql("select count(*) from allstar_table").toPandas().values[0][0]
assert table_count == csv_count
print(table_count)

Create experiment branch

As a data analyst we might want to carry out some experiments with some data, without affecting main in any way. As in the previous examples, we can just get started by creating an experiment branch off of main and carry out our experiment, which could consist of the following steps:

  • drop totals_stats table
  • add data to salaries table
  • compare experiment and main tables
In [ ]:
spark.sql("CREATE BRANCH experiment AS main").toPandas()
spark.sql("USE REFERENCE experiment").toPandas()
In [ ]:
# Drop the `totals_stats` table on the `experiment` branch
spark.sql("DROP TABLE IF EXISTS totals_stats").toPandas()
In [ ]:
# add some salaries for Dirk Nowitzki
spark.sql('''INSERT INTO salaries VALUES 
    ("2015-16", "Dallas Mavericks", "$8333333", "Dirk Nowitzki"),
    ("2016-17", "Dallas Mavericks", "$25000000", "Dirk Nowitzki"),
    ("2017-28", "Dallas Mavericks", "$5000000", "Dirk Nowitzki"),
    ("2018-19", "Dallas Mavericks", "$5000000", "Dirk Nowitzki")
    ''').toPandas()
In [ ]:
!nessie contents --list --ref experiment
In [ ]:
!nessie contents --list

Let's take a look at the contents of the salaries table on the experiment branch. Notice the use of the nessie catalog and the use of @experiment to view data on the experiment branch

In [ ]:
spark.sql("select count(*) from salaries").toPandas()

and compare to the contents of the salaries table on the main branch. Notice that we didn't have to specify @branchName as it defaulted to the main branch

In [ ]:
spark.sql("USE REFERENCE main").toPandas()
spark.sql("select count(*) from salaries").toPandas()

And finally lets clean up after ourselves

In [ ]:
spark.sql("DROP BRANCH dev")
spark.sql("DROP BRANCH etl")
spark.sql("DROP BRANCH experiment")