Nessie Iceberg/Flink SQL Demo with NBA Dataset

This demo showcases how to use Nessie Python API along with Flink from Iceberg

To get started, we will first have to do a few setup steps that give us everything we need to get started with Nessie. In case you're interested in the detailed setup steps for Flink, you can check out the docs

The Binder server has downloaded flink and some data for us as well as started a Nessie server in the background. All we have to do is start Flink

The below cell starts a local Flink session with parameters needed to configure Nessie. Each config option is followed by a comment explaining its purpose.

In [ ]:
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.table.expressions import lit
from pynessie import init

# where we will store our data
warehouse = os.path.join(os.getcwd(), "flink-warehouse")
# this was downloaded when Binder started, its available on maven central
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "../iceberg-flink-runtime-1.13-0.13.1.jar")
assert os.path.exists(iceberg_flink_runtime_jar)

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file://{}".format(iceberg_flink_runtime_jar))
table_env = StreamTableEnvironment.create(env)

nessie_client = init()


def create_ref_catalog(ref):
    """
    Create a flink catalog that is tied to a specific ref.

    In order to create the catalog we have to first create the branch
    """
    default_branch = nessie_client.get_default_branch()
    if ref != default_branch:
        default_branch_hash = nessie_client.get_reference(default_branch).hash_
        nessie_client.create_branch(ref, ref=default_branch, hash_on_ref=default_branch_hash)
    # The important args below are:
    # type - tell Flink to use Iceberg as the catalog
    # catalog-impl - which Iceberg catalog to use, in this case we want Nessie
    # uri - the location of the nessie server.
    # ref - the Nessie ref/branch we want to use (defaults to main)
    # warehouse - the location this catalog should store its data
    table_env.execute_sql(
        f"""CREATE CATALOG {ref}_catalog WITH (
            'type'='iceberg',
            'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',
            'uri'='http://localhost:19120/api/v1',
            'ref'='{ref}',
            'warehouse' = '{warehouse}')"""
    )


create_ref_catalog(nessie_client.get_default_branch())
print("\n\n\nFlink running\n\n\n")
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/srv/conda/envs/flink-demo/lib/python3.7/site-packages/pyflink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/srv/conda/envs/flink-demo/lib/python3.7/site-packages/pyflink/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


Flink running



Solving Data Engineering problems with Nessie

In this Demo we are a data engineer working at a fictional sports analytics blog. In order for the authors to write articles they have to have access to the relevant data. They need to be able to retrieve data quickly and be able to create charts with it.

We have been asked to collect and expose some information about basketball players. We have located some data sources and are now ready to start ingesting data into our data lakehouse. We will perform the ingestion steps on a Nessie branch to test and validate the data before exposing to the analysts.

Set up Nessie branches (via Nessie CLI)

Once all dependencies are configured, we can get started with ingesting our basketball data into Nessie with the following steps:

  • Create a new branch named dev
  • List all branches

It is worth mentioning that we don't have to explicitly create a main branch, since it's the default branch.

In [ ]:
create_ref_catalog("dev")

We have created the branch dev and we can see the branch with the Nessie hash its currently pointing to.

Below we list all branches. Note that the auto created main branch already exists and both branches point at the same empty hash initially

In [ ]:
!nessie --verbose branch
  dev   2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d
* main  2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d

Create tables under dev branch

Once we created the dev branch and verified that it exists, we can create some tables and add some data.

We create two tables under the dev branch:

  • salaries
  • totals_stats

These tables list the salaries per player per year and their stats per year.

To create the data we:

  1. switch our branch context to dev
  2. create the table
  3. insert the data from an existing csv file. This csv file is already stored locally on the demo machine. A production use case would likely take feeds from official data sources
In [ ]:
# Load the dataset
from pyflink.table import DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

# Creating `salaries` table
(
    table_env.connect(FileSystem().path("../datasets/nba/salaries.csv"))
    .with_format(
        OldCsv()
        .field("Season", DataTypes.STRING())
        .field("Team", DataTypes.STRING())
        .field("Salary", DataTypes.STRING())
        .field("Player", DataTypes.STRING())
    )
    .with_schema(
        Schema()
        .field("Season", DataTypes.STRING())
        .field("Team", DataTypes.STRING())
        .field("Salary", DataTypes.STRING())
        .field("Player", DataTypes.STRING())
    )
    .create_temporary_table("dev_catalog.nba.salaries_temp")
)

table_env.execute_sql(
    """CREATE TABLE IF NOT EXISTS dev_catalog.nba.salaries
            (Season STRING, Team STRING, Salary STRING, Player STRING)"""
).wait()

tab = table_env.from_path("dev_catalog.nba.salaries_temp")
tab.execute_insert("dev_catalog.nba.salaries").wait()

# Creating `totals_stats` table
(
    table_env.connect(FileSystem().path("../datasets/nba/totals_stats.csv"))
    .with_format(
        OldCsv()
        .field("Season", DataTypes.STRING())
        .field("Age", DataTypes.STRING())
        .field("Team", DataTypes.STRING())
        .field("ORB", DataTypes.STRING())
        .field("DRB", DataTypes.STRING())
        .field("TRB", DataTypes.STRING())
        .field("AST", DataTypes.STRING())
        .field("STL", DataTypes.STRING())
        .field("BLK", DataTypes.STRING())
        .field("TOV", DataTypes.STRING())
        .field("PTS", DataTypes.STRING())
        .field("Player", DataTypes.STRING())
        .field("RSorPO", DataTypes.STRING())
    )
    .with_schema(
        Schema()
        .field("Season", DataTypes.STRING())
        .field("Age", DataTypes.STRING())
        .field("Team", DataTypes.STRING())
        .field("ORB", DataTypes.STRING())
        .field("DRB", DataTypes.STRING())
        .field("TRB", DataTypes.STRING())
        .field("AST", DataTypes.STRING())
        .field("STL", DataTypes.STRING())
        .field("BLK", DataTypes.STRING())
        .field("TOV", DataTypes.STRING())
        .field("PTS", DataTypes.STRING())
        .field("Player", DataTypes.STRING())
        .field("RSorPO", DataTypes.STRING())
    )
    .create_temporary_table("dev_catalog.nba.totals_stats_temp")
)

table_env.execute_sql(
    """CREATE TABLE IF NOT EXISTS dev_catalog.nba.totals_stats (Season STRING, Age STRING, Team STRING,
        ORB STRING, DRB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, TOV STRING, PTS STRING,
        Player STRING, RSorPO STRING)"""
).wait()

tab = table_env.from_path("dev_catalog.nba.totals_stats_temp")
tab.execute_insert("dev_catalog.nba.totals_stats").wait()

salaries = table_env.from_path("main_catalog.nba.`salaries@dev`").select(lit(1).count).to_pandas().values[0][0]
totals_stats = table_env.from_path("main_catalog.nba.`totals_stats@dev`").select(lit(1).count).to_pandas().values[0][0]
print(f"\n\n\nAdded {salaries} rows to the salaries table and {totals_stats} rows to the totals_stats table.\n\n\n")
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/srv/conda/envs/flink-demo/lib/python3.7/site-packages/pyflink/lib/hadoop-auth-2.10.1.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
log4j:WARN No appenders could be found for logger (org.apache.htrace.core.Tracer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2022-05-24 07:44:58,464 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:58,464 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:58,465 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:58,464 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:58,464 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:58,465 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:58,464 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:58,465 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,663 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,663 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,663 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,663 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,664 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,664 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,665 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:44:59,665 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]



Added 51 rows to the salaries table and 93 rows to the totals_stats table.



Now we count the rows in our tables to ensure they are the same number as the csv files. Note we use the table@branch notation which overrides the context set by the catalog.

In [ ]:
table_count = table_env.from_path("dev_catalog.nba.`salaries@dev`").select("Season.count").to_pandas().values[0][0]
csv_count = table_env.from_path("dev_catalog.nba.salaries_temp").select("Season.count").to_pandas().values[0][0]
assert table_count == csv_count
print(table_count)

table_count = table_env.from_path("dev_catalog.nba.`totals_stats@dev`").select("Season.count").to_pandas().values[0][0]
csv_count = table_env.from_path("dev_catalog.nba.totals_stats_temp").select("Season.count").to_pandas().values[0][0]
assert table_count == csv_count
print(table_count)
2022-05-24 07:45:04,807 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:04,869 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:04,872 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:04,874 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:04,876 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:04,879 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:04,881 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:04,883 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
51
2022-05-24 07:45:06,280 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:06,344 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:06,347 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:06,351 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:06,354 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:06,357 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:06,360 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:06,364 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
93

Check generated tables

Since we have been working solely on the dev branch, where we created 2 tables and added some data, let's verify that the main branch was not altered by our changes.

In [ ]:
!nessie content list

And on the dev branch we expect to see two tables

In [ ]:
!nessie content list --ref dev
ICEBERG_TABLE:
	nba.totals_stats
	nba.salaries

We can also verify that the dev and main branches point to different commits

In [ ]:
!nessie --verbose branch
  dev   f48b93594ddead3a7616a271d657f0fff97cd0c4c04d4a579fa165aa96a69908
* main  2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d

Dev promotion into main

Once we are done with our changes on the dev branch, we would like to merge those changes into main. We merge dev into main via the command line merge command. Both branches should be at the same revision after merging/promotion.

In [ ]:
!nessie merge dev -b main --force

We can verify that the main branch now contains the expected tables and row counts.

The tables are now on main and ready for consumption by our blog authors and analysts!

In [ ]:
!nessie --verbose branch
* main  facfd43be1d062734ca0cda5ae900dde398180bf3f370a19627da8a2419589b0
  dev   f48b93594ddead3a7616a271d657f0fff97cd0c4c04d4a579fa165aa96a69908

In [ ]:
!nessie content list
ICEBERG_TABLE:
	nba.salaries
	nba.totals_stats

In [ ]:
table_count = table_env.from_path("main_catalog.nba.salaries").select("Season.count").to_pandas().values[0][0]
csv_count = table_env.from_path("dev_catalog.nba.salaries_temp").select("Season.count").to_pandas().values[0][0]
assert table_count == csv_count

table_count = table_env.from_path("main_catalog.nba.totals_stats").select("Season.count").to_pandas().values[0][0]
csv_count = table_env.from_path("dev_catalog.nba.totals_stats_temp").select("Season.count").to_pandas().values[0][0]
assert table_count == csv_count
2022-05-24 07:45:10,661 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:10,724 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:10,725 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:10,727 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:10,729 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:10,730 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:10,732 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:10,733 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,239 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,304 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,307 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,312 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,316 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,319 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,322 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:12,326 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]

Perform regular ETL on the new tables

Our analysts are happy with the data and we want to now regularly ingest data to keep things up to date. Our first ETL job consists of the following:

  1. Update the salaries table to add new data
  2. We have decided the Age column isn't required in the totals_stats table so we will drop the column
  3. We create a new table to hold information about the players appearances in all star games

As always we will do this work on a branch and verify the results. This ETL job can then be set up to run nightly with new stats and salary information.

In [ ]:
create_ref_catalog("etl")
In [ ]:
# add some salaries for Kevin Durant
table_env.execute_sql(
    """INSERT INTO etl_catalog.nba.salaries
                        VALUES ('2017-18', 'Golden State Warriors', '$25000000', 'Kevin Durant'),
                        ('2018-19', 'Golden State Warriors', '$30000000', 'Kevin Durant'),
                        ('2019-20', 'Brooklyn Nets', '$37199000', 'Kevin Durant'),
                        ('2020-21', 'Brooklyn Nets', '$39058950', 'Kevin Durant')"""
).wait()
2022-05-24 07:45:13,368 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
In [ ]:
# Rename the table `totals_stats` to `new_totals_stats`
table_env.execute_sql("ALTER TABLE etl_catalog.nba.totals_stats RENAME TO etl_catalog.nba.new_totals_stats").wait()
In [ ]:
# Creating `allstar_games_stats` table
(
    table_env.connect(FileSystem().path("../datasets/nba/allstar_games_stats.csv"))
    .with_format(
        OldCsv()
        .field("Season", DataTypes.STRING())
        .field("Age", DataTypes.STRING())
        .field("Team", DataTypes.STRING())
        .field("ORB", DataTypes.STRING())
        .field("TRB", DataTypes.STRING())
        .field("AST", DataTypes.STRING())
        .field("STL", DataTypes.STRING())
        .field("BLK", DataTypes.STRING())
        .field("TOV", DataTypes.STRING())
        .field("PF", DataTypes.STRING())
        .field("PTS", DataTypes.STRING())
        .field("Player", DataTypes.STRING())
    )
    .with_schema(
        Schema()
        .field("Season", DataTypes.STRING())
        .field("Age", DataTypes.STRING())
        .field("Team", DataTypes.STRING())
        .field("ORB", DataTypes.STRING())
        .field("TRB", DataTypes.STRING())
        .field("AST", DataTypes.STRING())
        .field("STL", DataTypes.STRING())
        .field("BLK", DataTypes.STRING())
        .field("TOV", DataTypes.STRING())
        .field("PF", DataTypes.STRING())
        .field("PTS", DataTypes.STRING())
        .field("Player", DataTypes.STRING())
    )
    .create_temporary_table("etl_catalog.nba.allstar_games_stats_temp")
)

table_env.execute_sql(
    """CREATE TABLE IF NOT EXISTS etl_catalog.nba.allstar_games_stats (Season STRING, Age STRING,
        Team STRING, ORB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, TOV STRING,
        PF STRING, PTS STRING, Player STRING)"""
).wait()

tab = table_env.from_path("etl_catalog.nba.allstar_games_stats_temp")
tab.execute_insert("etl_catalog.nba.allstar_games_stats").wait()

# Notice how we view the data on the etl branch via @etl
table_env.from_path("etl_catalog.nba.`allstar_games_stats@etl`").to_pandas()
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:14,227 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
2022-05-24 07:45:15,480 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:15,543 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:15,546 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:15,549 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:15,551 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:15,554 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:15,557 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:15,560 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
Out[ ]:
Season Age Team ORB TRB AST STL BLK TOV PF PTS Player
0 2004-05 26 LAL 3 6 7 3 1 4 5 16 Kobe Bryant
1 2005-06 27 LAL 0 7 8 3 0 3 5 8 Kobe Bryant
2 2006-07 28 LAL 1 5 6 6 0 4 1 31 Kobe Bryant
3 2007-08 29 LAL 0 1 0 0 0 0 0 0 Kobe Bryant
4 2008-09 30 LAL 1 4 4 4 0 1 0 27 Kobe Bryant
5 2009-10 31 LAL Kobe Bryant
6 2009-10 25 CLE 1 5 6 4 0 2 1 25 Lebron James
7 2010-11 26 MIA 2 12 10 0 0 4 3 29 Lebron James
8 2011-12 27 MIA 0 6 7 0 0 4 2 36 Lebron James
9 2012-13 28 MIA 0 3 5 1 0 4 0 19 Lebron James
10 2013-14 29 MIA 1 7 7 3 0 1 0 22 Lebron James
11 2014-15 30 CLE 1 5 7 2 0 4 1 30 Lebron James
12 2010-11 32 LAL 10 14 3 3 0 4 2 37 Kobe Bryant
13 2011-12 33 LAL 0 1 1 2 0 1 2 27 Kobe Bryant
14 2012-13 34 LAL 2 4 8 2 2 1 2 9 Kobe Bryant
15 2013-14 35 LAL Kobe Bryant
16 2014-15 36 LAL Kobe Bryant
17 2015-16 37 LAL 1 6 7 1 0 1 1 10 Kobe Bryant
18 1997-98 19 LAL 2 6 1 2 0 1 1 18 Kobe Bryant
19 1999-00 21 LAL 1 1 3 2 0 1 3 15 Kobe Bryant
20 2000-01 22 LAL 2 4 7 1 0 3 3 19 Kobe Bryant
21 2001-02 23 LAL 2 5 5 1 0 0 2 31 Kobe Bryant
22 2002-03 24 LAL 2 7 6 3 2 5 5 22 Kobe Bryant
23 2003-04 25 LAL 1 4 4 5 1 6 3 20 Kobe Bryant
24 Season Age Team ORB TRB AST STL BLK TOV PF PTS Player
25 2004-05 20 CLE 1 8 6 2 0 3 0 13 Lebron James
26 2005-06 21 CLE 2 6 2 2 0 1 2 29 Lebron James
27 2006-07 22 CLE 0 6 6 1 0 4 0 28 Lebron James
28 2007-08 23 CLE 1 8 9 2 2 4 3 27 Lebron James
29 2008-09 24 CLE 0 5 3 0 0 3 0 20 Lebron James
30 1992-93 29 CHI 3 4 5 4 0 6 5 30 Michael Jordan
31 1995-96 32 CHI 1 4 1 1 0 0 1 20 Michael Jordan
32 1996-97 33 CHI 3 11 11 2 0 3 4 14 Michael Jordan
33 1997-98 34 CHI 1 6 8 3 0 2 0 23 Michael Jordan
34 2001-02 38 WAS 0 4 3 2 0 1 1 8 Michael Jordan
35 2002-03 39 WAS 2 5 2 2 0 2 3 20 Michael Jordan
36 2015-16 31 CLE 0 4 7 0 0 4 0 13 Lebron James
37 2016-17 32 CLE 0 3 1 0 0 4 2 23 Lebron James
38 2017-18 33 CLE 0 10 8 1 0 5 2 29 Lebron James
39 2018-19 34 LAL 2 8 4 0 2 1 1 19 Lebron James
40 1984-85 21 CHI 3 6 2 3 1 1 4 7 Michael Jordan
41 1985-86 22 CHI Michael Jordan
42 1986-87 23 CHI 0 0 4 2 0 5 2 11 Michael Jordan
43 1987-88 24 CHI 3 8 3 4 4 2 5 40 Michael Jordan
44 1988-89 25 CHI 1 2 3 5 0 4 1 28 Michael Jordan
45 1989-90 26 CHI 1 5 2 5 1 5 1 17 Michael Jordan
46 1990-91 27 CHI 3 5 5 2 0 10 2 26 Michael Jordan
47 1991-92 28 CHI 1 1 5 2 0 1 2 18 Michael Jordan

We can verify that the new table isn't on the main branch but is present on the etl branch

In [ ]:
# Since we have been working on the `etl` branch, the `allstar_games_stats` table is not on the `main` branch
!nessie content list
ICEBERG_TABLE:
	nba.salaries
	nba.totals_stats

In [ ]:
# We should see `allstar_games_stats` and the `new_totals_stats` on the `etl` branch
!nessie content list --ref etl
ICEBERG_TABLE:
	nba.allstar_games_stats
	nba.new_totals_stats
	nba.salaries

Now that we are happy with the data we can again merge it into main

In [ ]:
!nessie merge etl -b main --force

Now lets verify that the changes exist on the main branch

In [ ]:
!nessie content list
ICEBERG_TABLE:
	nba.salaries
	nba.new_totals_stats
	nba.allstar_games_stats

In [ ]:
!nessie --verbose branch
* main  720543fa3a9579d0bfee11e07f383d86468eb4d73dc207e5bd6ef7f76b000930
  etl   c962d80b04ee619a6a0670cb5f664d948c86f6ebf66435027c5abe761e920c9e
  dev   f48b93594ddead3a7616a271d657f0fff97cd0c4c04d4a579fa165aa96a69908

In [ ]:
table_count = (
    table_env.from_path("main_catalog.nba.allstar_games_stats").select("Season.count").to_pandas().values[0][0]
)
csv_count = (
    table_env.from_path("etl_catalog.nba.allstar_games_stats_temp").select("Season.count").to_pandas().values[0][0]
)
assert table_count == csv_count
2022-05-24 07:45:19,196 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:19,257 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:19,260 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:19,263 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:19,265 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:19,268 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:19,270 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]
2022-05-24 07:45:19,273 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new decompressor [.gz]

Create experiment branch

As a data analyst we might want to carry out some experiments with some data, without affecting main in any way. As in the previous examples, we can just get started by creating an experiment branch off of main and carry out our experiment, which could consist of the following steps:

  • drop totals_stats table
  • add data to salaries table
  • compare experiment and main tables
In [ ]:
create_ref_catalog("experiment")
In [ ]:
# Drop the `new_totals_stats` table on the `experiment` branch
table_env.execute_sql("DROP TABLE experiment_catalog.nba.new_totals_stats")
Out[ ]:
<pyflink.table.table_result.TableResult at 0x7f0413977b90>
In [ ]:
# add some salaries for Dirk Nowitzki
table_env.execute_sql(
    """INSERT INTO experiment_catalog.nba.salaries VALUES
    ('2015-16', 'Dallas Mavericks', '$8333333', 'Dirk Nowitzki'),
    ('2016-17', 'Dallas Mavericks', '$25000000', 'Dirk Nowitzki'),
    ('2017-18', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki'),
    ('2018-19', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki')"""
).wait()
2022-05-24 07:45:20,258 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.gz]
In [ ]:
# We should see the `salaries` and `allstar_games_stats` tables only (since we just dropped `new_totals_stats`)
!nessie content list --ref experiment
ICEBERG_TABLE:
	nba.salaries
	nba.allstar_games_stats

In [ ]:
# `main` hasn't changed been changed and still has the `new_totals_stats` table
!nessie content list
ICEBERG_TABLE:
	nba.salaries
	nba.new_totals_stats
	nba.allstar_games_stats

Let's take a look at the contents of the salaries table on the experiment branch. Notice the use of the nessie catalog and the use of @experiment to view data on the experiment branch

In [ ]:
table_env.from_path("main_catalog.nba.`salaries@experiment`").select(lit(1).count).to_pandas()
Out[ ]:
EXPR$0
0 59

and compare to the contents of the salaries table on the main branch. Notice that we didn't have to specify @branchName as it defaulted to the main branch

In [ ]:
table_env.from_path("main_catalog.nba.`salaries@main`").select(lit(1).count).to_pandas()
Out[ ]:
EXPR$0
0 55