!pip install --quiet duckdb
!pip install --quiet pyiceberg
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 969.9/969.9 kB 4.7 MB/s eta 0:00:00 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 123.9/123.9 kB 4.8 MB/s eta 0:00:00
!pip install sqlalchemy
Requirement already satisfied: sqlalchemy in /usr/local/lib/python3.10/dist-packages (2.0.31) Requirement already satisfied: typing-extensions>=4.6.0 in /usr/local/lib/python3.10/dist-packages (from sqlalchemy) (4.12.2) Requirement already satisfied: greenlet!=0.4.17 in /usr/local/lib/python3.10/dist-packages (from sqlalchemy) (3.0.3)
import logging
logging.basicConfig(level=logging.DEBUG)
import duckdb
duckdb.sql("SELECT 1").df
<bound method PyCapsule.df of ┌───────┐ │ 1 │ │ int32 │ ├───────┤ │ 1 │ └───────┘ >
Use sqlalchemy to create a sqlite catalog for iceberg. Create a table in iceberg and load the taxi dataset.
!mkdir "/tmp/warehouse" # make directory for catalog SqlLite file
from pyiceberg.catalog.sql import SqlCatalog
warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
Load taxi data into a iceberg table
arrow_df = duckdb.sql("select * from read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')").arrow()
FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))
schema = arrow_df.schema
schema
VendorID: int64 tpep_pickup_datetime: timestamp[us] tpep_dropoff_datetime: timestamp[us] passenger_count: double trip_distance: double RatecodeID: double store_and_fwd_flag: string PULocationID: int64 DOLocationID: int64 payment_type: int64 fare_amount: double extra: double mta_tax: double tip_amount: double tolls_amount: double improvement_surcharge: double total_amount: double congestion_surcharge: double airport_fee: double
# create the iceberg table
from pyiceberg.exceptions import NoSuchNamespaceError, NamespaceAlreadyExistsError, TableAlreadyExistsError
# Define the table name and namespace
namespace = "taxi_namespace"
table_name = "taxi_trips"
# Create the namespace if it doesn't exist
try:
catalog.create_namespace(namespace)
except NamespaceAlreadyExistsError:
print("Namespace already exists.")
pass # Namespace already exists
# Create or load the Iceberg table
try:
table = catalog.create_table(f"{namespace}.{table_name}", schema)
except TableAlreadyExistsError:
table = catalog.load_table(f"{namespace}.{table_name}")
print("Iceberg table already exists.")
table.overwrite(arrow_df) # the data frame to the iceberg dataset
len(table.scan().to_arrow())
3066766
df=table.scan().to_pandas()
r=duckdb.register("duck_test",df)
duckdb.sql("select * from duck_test")
┌──────────┬──────────────────────┬──────────────────────┬───┬──────────────┬──────────────────────┬─────────────┐ │ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datet… │ … │ total_amount │ congestion_surcharge │ airport_fee │ │ int64 │ timestamp │ timestamp │ │ double │ double │ double │ ├──────────┼──────────────────────┼──────────────────────┼───┼──────────────┼──────────────────────┼─────────────┤ │ 2 │ 2023-01-01 00:32:10 │ 2023-01-01 00:40:36 │ … │ 14.3 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 00:55:08 │ 2023-01-01 01:01:27 │ … │ 16.9 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 00:25:04 │ 2023-01-01 00:37:49 │ … │ 34.9 │ 2.5 │ 0.0 │ │ 1 │ 2023-01-01 00:03:48 │ 2023-01-01 00:13:25 │ … │ 20.85 │ 0.0 │ 1.25 │ │ 2 │ 2023-01-01 00:10:29 │ 2023-01-01 00:21:19 │ … │ 19.68 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 00:50:34 │ 2023-01-01 01:02:52 │ … │ 27.8 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 00:09:22 │ 2023-01-01 00:19:49 │ … │ 20.52 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 00:27:12 │ 2023-01-01 00:49:56 │ … │ 64.44 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 00:21:44 │ 2023-01-01 00:36:40 │ … │ 28.38 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 00:39:42 │ 2023-01-01 00:50:36 │ … │ 19.9 │ 2.5 │ 0.0 │ │ · │ · │ · │ · │ · │ · │ · │ │ · │ · │ · │ · │ · │ · │ · │ │ · │ · │ · │ · │ · │ · │ · │ │ 2 │ 2023-01-01 01:28:24 │ 2023-01-01 01:56:38 │ … │ 42.3 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 01:25:30 │ 2023-01-01 01:46:33 │ … │ 35.6 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 01:49:47 │ 2023-01-01 02:01:27 │ … │ 26.63 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 01:13:11 │ 2023-01-01 01:16:31 │ … │ 14.04 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 01:22:22 │ 2023-01-01 01:33:19 │ … │ 21.36 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 01:37:42 │ 2023-01-01 01:49:29 │ … │ 17.8 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 01:51:17 │ 2023-01-01 02:02:07 │ … │ 22.9 │ 2.5 │ 0.0 │ │ 2 │ 2023-01-01 01:49:24 │ 2023-01-01 02:17:26 │ … │ 33.2 │ 2.5 │ 0.0 │ │ 1 │ 2023-01-01 01:16:50 │ 2023-01-01 01:23:29 │ … │ 10.4 │ 0.0 │ 0.0 │ │ 1 │ 2023-01-01 01:26:48 │ 2023-01-01 01:35:56 │ … │ 13.9 │ 0.0 │ 0.0 │ ├──────────┴──────────────────────┴──────────────────────┴───┴──────────────┴──────────────────────┴─────────────┤ │ ? rows (>9999 rows, 20 shown) 19 columns (6 shown) │ └────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
duckdb.sql("SELECT * FROM information_schema.tables ;")
┌───────────────┬──────────────┬────────────┬───┬────────────────────┬──────────┬───────────────┬───────────────┐ │ table_catalog │ table_schema │ table_name │ … │ is_insertable_into │ is_typed │ commit_action │ TABLE_COMMENT │ │ varchar │ varchar │ varchar │ │ varchar │ varchar │ varchar │ varchar │ ├───────────────┼──────────────┼────────────┼───┼────────────────────┼──────────┼───────────────┼───────────────┤ │ temp │ main │ duck_test │ … │ NO │ NO │ NULL │ NULL │ ├───────────────┴──────────────┴────────────┴───┴────────────────────┴──────────┴───────────────┴───────────────┤ │ 1 rows 13 columns (7 shown) │ └───────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Use the pyiceberg cli to read describe the newly created table
!pyiceberg --help
Usage: pyiceberg [OPTIONS] COMMAND [ARGS]... Options: --catalog TEXT --verbose BOOLEAN --output [text|json] --uri TEXT --credential TEXT --help Show this message and exit. Commands: create Operation to create a namespace. describe Describe a namespace or a table. drop Operations to drop a namespace or table. files List all the files of the table. list List tables or namespaces. list-refs List all the refs in the provided table. location Return the location of the table. properties Properties on tables/namespaces. rename Rename a table. schema Get the schema of the table. spec Return the partition spec of the table. uuid Return the UUID of the table. version Print pyiceberg version.
!pyiceberg --catalog default --uri sqlite:////tmp/warehouse/pyiceberg_catalog.db list
taxi_namespace
!pyiceberg --catalog default --uri sqlite:////tmp/warehouse/pyiceberg_catalog.db describe taxi_namespace.taxi_trips
Table format version 2 Metadata location file:///tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/00001-86911ea9-b1… Table UUID 14d1f1e3-c7d5-4099-ba01-db72e6d43773 Last Updated 1721786211755 Partition spec [] Sort order [] Current schema Schema, id=0 ├── 1: VendorID: optional long ├── 2: tpep_pickup_datetime: optional timestamp ├── 3: tpep_dropoff_datetime: optional timestamp ├── 4: passenger_count: optional double ├── 5: trip_distance: optional double ├── 6: RatecodeID: optional double ├── 7: store_and_fwd_flag: optional string ├── 8: PULocationID: optional long ├── 9: DOLocationID: optional long ├── 10: payment_type: optional long ├── 11: fare_amount: optional double ├── 12: extra: optional double ├── 13: mta_tax: optional double ├── 14: tip_amount: optional double ├── 15: tolls_amount: optional double ├── 16: improvement_surcharge: optional double ├── 17: total_amount: optional double ├── 18: congestion_surcharge: optional double └── 19: airport_fee: optional double Current snapshot Operation.APPEND: id=5984614148918088538, schema_id=0 Snapshots Snapshots └── Snapshot 5984614148918088538, schema 0: file:///tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/snap-59846141… Properties
from pyiceberg.catalog import load_catalog
read_catalog=load_catalog(
"default",
**{
"uri": "sqlite:////tmp/warehouse/pyiceberg_catalog.db",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
}
)
read_catalog.list_namespaces() # list all namespaces
[('taxi_namespace',)]
read_catalog.list_tables("taxi_namespace") # list all tables in the namespace
[('taxi_namespace', 'taxi_trips')]
read_table = read_catalog.load_table("taxi_namespace.taxi_trips")
read_table
taxi_trips( 1: VendorID: optional long, 2: tpep_pickup_datetime: optional timestamp, 3: tpep_dropoff_datetime: optional timestamp, 4: passenger_count: optional double, 5: trip_distance: optional double, 6: RatecodeID: optional double, 7: store_and_fwd_flag: optional string, 8: PULocationID: optional long, 9: DOLocationID: optional long, 10: payment_type: optional long, 11: fare_amount: optional double, 12: extra: optional double, 13: mta_tax: optional double, 14: tip_amount: optional double, 15: tolls_amount: optional double, 16: improvement_surcharge: optional double, 17: total_amount: optional double, 18: congestion_surcharge: optional double, 19: airport_fee: optional double ), partition by: [], sort order: [], snapshot: Operation.APPEND: id=5984614148918088538, schema_id=0
Read data from the parquet files to a pandas data frame.
from pyiceberg.expressions import GreaterThanOrEqual
scan = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
limit=100,
)
[task.file.file_path for task in scan.plan_files()] # show the file that contains the rows
<pyiceberg.table.DataScan object at 0x7b94e2cb2200>
['file:///tmp/warehouse/taxi_namespace.db/taxi_trips/data/00000-0-4d92790a-a45b-4e41-807e-0299bbe7d87d.parquet']
scan_df = scan.to_pandas() # read the data to a pandas data frame
scan_df
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | |
---|---|---|---|
0 | 2 | 2023-01-01 00:27:12 | 2023-01-01 00:49:56 |
1 | 2 | 2023-01-01 00:09:29 | 2023-01-01 00:29:23 |
2 | 1 | 2023-01-01 00:13:30 | 2023-01-01 00:44:00 |
3 | 2 | 2023-01-01 00:41:41 | 2023-01-01 01:19:32 |
4 | 2 | 2023-01-01 00:22:39 | 2023-01-01 01:30:45 |
... | ... | ... | ... |
95 | 2 | 2023-01-01 00:56:24 | 2023-01-01 01:26:29 |
96 | 2 | 2023-01-01 00:55:38 | 2023-01-01 01:25:34 |
97 | 2 | 2023-01-01 00:13:36 | 2023-01-01 00:48:23 |
98 | 2 | 2023-01-01 00:51:18 | 2023-01-01 01:11:18 |
99 | 2 | 2023-01-01 00:27:34 | 2023-01-01 01:05:05 |
100 rows × 3 columns
query the scanned rows using duckdb
scan_duck = scan.to_duckdb(table_name="distant_taxi_trips") # scan into duckdb
scan_duck.sql("select * from distant_taxi_trips")
┌──────────┬──────────────────────┬───────────────────────┐ │ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datetime │ │ int64 │ timestamp │ timestamp │ ├──────────┼──────────────────────┼───────────────────────┤ │ 2 │ 2023-01-01 00:27:12 │ 2023-01-01 00:49:56 │ │ 2 │ 2023-01-01 00:09:29 │ 2023-01-01 00:29:23 │ │ 1 │ 2023-01-01 00:13:30 │ 2023-01-01 00:44:00 │ │ 2 │ 2023-01-01 00:41:41 │ 2023-01-01 01:19:32 │ │ 2 │ 2023-01-01 00:22:39 │ 2023-01-01 01:30:45 │ │ 1 │ 2023-01-01 00:18:29 │ 2023-01-01 00:55:20 │ │ 2 │ 2023-01-01 00:25:14 │ 2023-01-01 01:00:29 │ │ 2 │ 2023-01-01 00:37:44 │ 2023-01-01 01:16:14 │ │ 2 │ 2023-01-01 00:49:24 │ 2023-01-01 01:30:59 │ │ 1 │ 2023-01-01 00:21:15 │ 2023-01-01 00:52:25 │ │ · │ · │ · │ │ · │ · │ · │ │ · │ · │ · │ │ 2 │ 2023-01-01 00:12:07 │ 2023-01-01 00:44:26 │ │ 2 │ 2023-01-01 00:47:21 │ 2023-01-01 01:30:32 │ │ 2 │ 2023-01-01 00:47:39 │ 2023-01-01 01:23:27 │ │ 1 │ 2023-01-01 00:56:13 │ 2023-01-01 01:20:11 │ │ 2 │ 2023-01-01 00:05:24 │ 2023-01-01 00:33:50 │ │ 2 │ 2023-01-01 00:56:24 │ 2023-01-01 01:26:29 │ │ 2 │ 2023-01-01 00:55:38 │ 2023-01-01 01:25:34 │ │ 2 │ 2023-01-01 00:13:36 │ 2023-01-01 00:48:23 │ │ 2 │ 2023-01-01 00:51:18 │ 2023-01-01 01:11:18 │ │ 2 │ 2023-01-01 00:27:34 │ 2023-01-01 01:05:05 │ ├──────────┴──────────────────────┴───────────────────────┤ │ 100 rows (20 shown) 3 columns │ └─────────────────────────────────────────────────────────┘
Use duckdb and the Iceberg extension to query iceberg
duckdb.sql("INSTALL iceberg;")
FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))
duckdb.sql("LOAD ICEBERG;")
query=f"""
SELECT *
FROM iceberg_scan('{warehouse_path}/{namespace}.db/{table_name}')
"""
query
"\n SELECT *\n FROM iceberg_scan('/tmp/warehouse/taxi_namespace.db/taxi_trips')\n"
# Query the Iceberg table
result = duckdb.sql(query)
--------------------------------------------------------------------------- IOException Traceback (most recent call last) <ipython-input-53-4ef425e70b2c> in <cell line: 2>() 1 # Query the Iceberg table ----> 2 result = duckdb.sql(query) /usr/local/lib/python3.10/dist-packages/duckdb/__init__.py in sql(query, **kwargs) 455 else: 456 conn = duckdb.connect(":default:") --> 457 return conn.sql(query, **kwargs) 458 _exported_symbols.append('sql') 459 IOException: IO Error: Cannot open file "/tmp/warehouse/taxi_namespace.db/taxi_trips/metadata/version-hint.text": No such file or directory
# Detach the existing "sqlite_db" if it exists
duckdb.execute("DETACH DATABASE IF EXISTS sqlite_db;")
# Attach the SQLite database to DuckDB
duckdb.execute("INSTALL sqlite_scanner;")
duckdb.execute("LOAD sqlite_scanner;")
duckdb.execute("ATTACH DATABASE '/tmp/warehouse/pyiceberg_catalog.db' AS sqlite_db;")
duckdb.sql("select * from main.sqlite_master")
!ls /tmp/
!ls /tmp/warehouse/
!cat /tmp/warehouse/pyiceberg_catalog.db
!ls /tmp/warehouse/taxi_namespace.db
!ls /tmp/warehouse/taxi_namespace.db/taxi_trips
!ls /tmp/warehouse/taxi_namespace.db/taxi_trips/metadata
!pip freeze