This demo showcases how to use Nessie Python API along with Flink from Iceberg
To get started, we will first have to do a few setup steps that give us everything we need to get started with Nessie. In case you're interested in the detailed setup steps for Flink, you can check out the docs
The Binder server has downloaded flink and some data for us as well as started a Nessie server in the background. All we have to do is start Flink
The below cell starts a local Flink session with parameters needed to configure Nessie. Each config option is followed by a comment explaining its purpose.
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.table.expressions import lit
from pynessie import init
# where we will store our data
warehouse = os.path.join(os.getcwd(), "flink-warehouse")
# this was downloaded when Binder started, its available on maven central
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "../iceberg-flink-runtime-1.17-1.4.2.jar")
assert os.path.exists(iceberg_flink_runtime_jar)
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file://{}".format(iceberg_flink_runtime_jar))
table_env = StreamTableEnvironment.create(env)
nessie_client = init()
def create_ref_catalog(ref):
"""
Create a flink catalog that is tied to a specific ref.
In order to create the catalog we have to first create the branch
"""
default_branch = nessie_client.get_default_branch()
if ref != default_branch:
default_branch_hash = nessie_client.get_reference(default_branch).hash_
nessie_client.create_branch(ref, ref=default_branch, hash_on_ref=default_branch_hash)
# The important args below are:
# type - tell Flink to use Iceberg as the catalog
# catalog-impl - which Iceberg catalog to use, in this case we want Nessie
# uri - the location of the nessie server.
# ref - the Nessie ref/branch we want to use (defaults to main)
# warehouse - the location this catalog should store its data
table_env.execute_sql(
f"""CREATE CATALOG {ref}_catalog WITH (
'type'='iceberg',
'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',
'uri'='http://localhost:19120/api/v1',
'ref'='{ref}',
'warehouse' = '{warehouse}')"""
)
create_ref_catalog(nessie_client.get_default_branch())
print("\n\n\nFlink running\n\n\n")
# Create the 'nba' namespace.
table_env.execute_sql("CREATE DATABASE main_catalog.nba").wait()
In this Demo we are a data engineer working at a fictional sports analytics blog. In order for the authors to write articles they have to have access to the relevant data. They need to be able to retrieve data quickly and be able to create charts with it.
We have been asked to collect and expose some information about basketball players. We have located some data sources and are now ready to start ingesting data into our data lakehouse. We will perform the ingestion steps on a Nessie branch to test and validate the data before exposing to the analysts.
Once all dependencies are configured, we can get started with ingesting our basketball data into Nessie
with the following steps:
dev
It is worth mentioning that we don't have to explicitly create a main
branch, since it's the default branch.
create_ref_catalog("dev")
We have created the branch dev
and we can see the branch with the Nessie hash
its currently pointing to.
Below we list all branches. Note that the auto created main
branch already exists and both branches point at the same empty hash
initially
!nessie --verbose branch
Once we created the dev
branch and verified that it exists, we can create some tables and add some data.
We create two tables under the dev
branch:
salaries
totals_stats
These tables list the salaries per player per year and their stats per year.
To create the data we:
# Load the dataset
from pyflink.table import DataTypes, Schema, TableDescriptor
from pyflink.table.expressions import col
# Creating `salaries` table
(
table_env.create_temporary_table(
"dev_catalog.nba.salaries_temp",
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("Season", DataTypes.STRING())
.column("Team", DataTypes.STRING())
.column("Salary", DataTypes.STRING())
.column("Player", DataTypes.STRING())
.build()
)
.option("path", "../datasets/nba/salaries.csv")
.format("csv")
.build(),
)
)
table_env.execute_sql(
"""CREATE TABLE IF NOT EXISTS dev_catalog.nba.`salaries@dev`
(Season STRING, Team STRING, Salary STRING, Player STRING)"""
).wait()
tab = table_env.from_path("dev_catalog.nba.salaries_temp")
tab.execute_insert("dev_catalog.nba.`salaries@dev`").wait()
# Creating `totals_stats` table
(
table_env.create_temporary_table(
"dev_catalog.nba.totals_stats_temp",
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("Season", DataTypes.STRING())
.column("Age", DataTypes.STRING())
.column("Team", DataTypes.STRING())
.column("ORB", DataTypes.STRING())
.column("DRB", DataTypes.STRING())
.column("TRB", DataTypes.STRING())
.column("AST", DataTypes.STRING())
.column("STL", DataTypes.STRING())
.column("BLK", DataTypes.STRING())
.column("TOV", DataTypes.STRING())
.column("PTS", DataTypes.STRING())
.column("Player", DataTypes.STRING())
.column("RSorPO", DataTypes.STRING())
.build()
)
.option("path", "../datasets/nba/totals_stats.csv")
.format("csv")
.build(),
)
)
table_env.execute_sql(
"""CREATE TABLE IF NOT EXISTS dev_catalog.nba.`totals_stats@dev` (Season STRING, Age STRING, Team STRING,
ORB STRING, DRB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, TOV STRING, PTS STRING,
Player STRING, RSorPO STRING)"""
).wait()
tab = table_env.from_path("dev_catalog.nba.totals_stats_temp")
tab.execute_insert("dev_catalog.nba.`totals_stats@dev`").wait()
salaries = table_env.from_path("dev_catalog.nba.`salaries@dev`").select(col("Season").count).to_pandas().values[0][0]
totals_stats = (
table_env.from_path("dev_catalog.nba.`totals_stats@dev`").select(col("Season").count).to_pandas().values[0][0]
)
print(f"\n\n\nAdded {salaries} rows to the salaries table and {totals_stats} rows to the totals_stats table.\n\n\n")
Now we count the rows in our tables to ensure they are the same number as the csv files. Note we use the table@branch
notation which overrides the context set by the catalog.
table_count = table_env.from_path("dev_catalog.nba.`salaries@dev`").select(col("Season").count).to_pandas().values[0][0]
csv_count = table_env.from_path("dev_catalog.nba.salaries_temp").select(col("Season").count).to_pandas().values[0][0]
assert table_count == csv_count
print(table_count)
table_count = (
table_env.from_path("dev_catalog.nba.`totals_stats@dev`").select(col("Season").count).to_pandas().values[0][0]
)
csv_count = (
table_env.from_path("dev_catalog.nba.totals_stats_temp").select(col("Season").count).to_pandas().values[0][0]
)
assert table_count == csv_count
print(table_count)
Since we have been working solely on the dev
branch, where we created 2 tables and added some data,
let's verify that the main
branch was not altered by our changes.
!nessie content list
And on the dev
branch we expect to see two tables
!nessie content list --ref dev
We can also verify that the dev
and main
branches point to different commits
!nessie --verbose branch
Once we are done with our changes on the dev
branch, we would like to merge those changes into main
.
We merge dev
into main
via the command line merge
command.
Both branches should be at the same revision after merging/promotion.
!nessie merge dev -b main --force
We can verify that the main
branch now contains the expected tables and row counts.
The tables are now on main
and ready for consumption by our blog authors and analysts!
!nessie --verbose branch
!nessie content list
table_count = table_env.from_path("main_catalog.nba.salaries").select(col("Season").count).to_pandas().values[0][0]
csv_count = table_env.from_path("dev_catalog.nba.salaries_temp").select(col("Season").count).to_pandas().values[0][0]
assert table_count == csv_count
table_count = table_env.from_path("main_catalog.nba.totals_stats").select(col("Season").count).to_pandas().values[0][0]
csv_count = (
table_env.from_path("dev_catalog.nba.totals_stats_temp").select(col("Season").count).to_pandas().values[0][0]
)
assert table_count == csv_count
Our analysts are happy with the data and we want to now regularly ingest data to keep things up to date. Our first ETL job consists of the following:
Age
column isn't required in the totals_stats
table so we will drop the columnAs always we will do this work on a branch and verify the results. This ETL job can then be set up to run nightly with new stats and salary information.
create_ref_catalog("etl")
# add some salaries for Kevin Durant
table_env.execute_sql(
"""INSERT INTO etl_catalog.nba.salaries
VALUES ('2017-18', 'Golden State Warriors', '$25000000', 'Kevin Durant'),
('2018-19', 'Golden State Warriors', '$30000000', 'Kevin Durant'),
('2019-20', 'Brooklyn Nets', '$37199000', 'Kevin Durant'),
('2020-21', 'Brooklyn Nets', '$39058950', 'Kevin Durant')"""
).wait()
# Rename the table `totals_stats` to `new_totals_stats`
table_env.execute_sql(
"ALTER TABLE etl_catalog.nba.`totals_stats@etl` RENAME TO etl_catalog.nba.new_totals_stats"
).wait()
# Creating `allstar_games_stats` table
(
table_env.create_temporary_table(
"etl_catalog.nba.allstar_games_stats_temp",
TableDescriptor.for_connector("filesystem")
.schema(
Schema.new_builder()
.column("Season", DataTypes.STRING())
.column("Age", DataTypes.STRING())
.column("Team", DataTypes.STRING())
.column("ORB", DataTypes.STRING())
.column("TRB", DataTypes.STRING())
.column("AST", DataTypes.STRING())
.column("STL", DataTypes.STRING())
.column("BLK", DataTypes.STRING())
.column("TOV", DataTypes.STRING())
.column("PF", DataTypes.STRING())
.column("PTS", DataTypes.STRING())
.column("Player", DataTypes.STRING())
.build()
)
.option("path", "../datasets/nba/allstar_games_stats.csv")
.format("csv")
.build(),
)
)
table_env.execute_sql(
"""CREATE TABLE IF NOT EXISTS etl_catalog.nba.`allstar_games_stats@etl` (Season STRING, Age STRING,
Team STRING, ORB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, TOV STRING,
PF STRING, PTS STRING, Player STRING)"""
).wait()
tab = table_env.from_path("etl_catalog.nba.allstar_games_stats_temp")
tab.execute_insert("etl_catalog.nba.`allstar_games_stats@etl`").wait()
# Notice how we view the data on the etl branch via @etl
table_env.from_path("etl_catalog.nba.`allstar_games_stats@etl`").to_pandas()
We can verify that the new table isn't on the main
branch but is present on the etl branch
# Since we have been working on the `etl` branch, the `allstar_games_stats` table is not on the `main` branch
!nessie content list
# We should see `allstar_games_stats` and the `new_totals_stats` on the `etl` branch
!nessie content list --ref etl
Now that we are happy with the data we can again merge it into main
!nessie merge etl -b main --force
Now lets verify that the changes exist on the main
branch
!nessie content list
!nessie --verbose branch
table_count = (
table_env.from_path("main_catalog.nba.allstar_games_stats").select(col("Season").count).to_pandas().values[0][0]
)
csv_count = (
table_env.from_path("etl_catalog.nba.allstar_games_stats_temp").select(col("Season").count).to_pandas().values[0][0]
)
assert table_count == csv_count
experiment
branch¶As a data analyst we might want to carry out some experiments with some data, without affecting main
in any way.
As in the previous examples, we can just get started by creating an experiment
branch off of main
and carry out our experiment, which could consist of the following steps:
totals_stats
tablesalaries
tableexperiment
and main
tablescreate_ref_catalog("experiment")
# Drop the `new_totals_stats` table on the `experiment` branch
table_env.execute_sql("DROP TABLE experiment_catalog.nba.`new_totals_stats@etl`")
# add some salaries for Dirk Nowitzki
table_env.execute_sql(
"""INSERT INTO experiment_catalog.nba.salaries VALUES
('2015-16', 'Dallas Mavericks', '$8333333', 'Dirk Nowitzki'),
('2016-17', 'Dallas Mavericks', '$25000000', 'Dirk Nowitzki'),
('2017-18', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki'),
('2018-19', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki')"""
).wait()
# We should see the `salaries` and `allstar_games_stats` tables only (since we just dropped `new_totals_stats`)
!nessie content list --ref experiment
# `main` hasn't changed been changed and still has the `new_totals_stats` table
!nessie content list
Let's take a look at the contents of the salaries
table on the experiment
branch.
Notice the use of the nessie
catalog and the use of @experiment
to view data on the experiment
branch
table_env.from_path("main_catalog.nba.`salaries@experiment`").select(lit(1).count).to_pandas()
and compare to the contents of the salaries
table on the main
branch. Notice that we didn't have to specify @branchName
as it defaulted
to the main
branch
table_env.from_path("main_catalog.nba.`salaries@main`").select(lit(1).count).to_pandas()