!pip install --quiet duckdb !pip install --quiet pyiceberg !pip install sqlalchemy import logging logging.basicConfig(level=logging.DEBUG) import duckdb duckdb.sql("SELECT 1").df !mkdir "/tmp/warehouse" # make directory for catalog SqlLite file from pyiceberg.catalog.sql import SqlCatalog warehouse_path = "/tmp/warehouse" catalog = SqlCatalog( "default", **{ "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", "warehouse": f"file://{warehouse_path}", }, ) arrow_df = duckdb.sql("select * from read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet')").arrow() schema = arrow_df.schema schema # create the iceberg table from pyiceberg.exceptions import NoSuchNamespaceError, NamespaceAlreadyExistsError, TableAlreadyExistsError # Define the table name and namespace namespace = "taxi_namespace" table_name = "taxi_trips" # Create the namespace if it doesn't exist try: catalog.create_namespace(namespace) except NamespaceAlreadyExistsError: print("Namespace already exists.") pass # Namespace already exists # Create or load the Iceberg table try: table = catalog.create_table(f"{namespace}.{table_name}", schema) except TableAlreadyExistsError: table = catalog.load_table(f"{namespace}.{table_name}") print("Iceberg table already exists.") table.overwrite(arrow_df) # the data frame to the iceberg dataset len(table.scan().to_arrow()) df=table.scan().to_pandas() r=duckdb.register("duck_test",df) duckdb.sql("select * from duck_test") duckdb.sql("SELECT * FROM information_schema.tables ;") !pyiceberg --help !pyiceberg --catalog default --uri sqlite:////tmp/warehouse/pyiceberg_catalog.db list !pyiceberg --catalog default --uri sqlite:////tmp/warehouse/pyiceberg_catalog.db describe taxi_namespace.taxi_trips from pyiceberg.catalog import load_catalog read_catalog=load_catalog( "default", **{ "uri": "sqlite:////tmp/warehouse/pyiceberg_catalog.db", "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", } ) read_catalog.list_namespaces() # list all namespaces read_catalog.list_tables("taxi_namespace") # list all tables in the namespace read_table = read_catalog.load_table("taxi_namespace.taxi_trips") read_table from pyiceberg.expressions import GreaterThanOrEqual scan = table.scan( row_filter=GreaterThanOrEqual("trip_distance", 10.0), selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), limit=100, ) [task.file.file_path for task in scan.plan_files()] # show the file that contains the rows scan_df = scan.to_pandas() # read the data to a pandas data frame scan_df scan_duck = scan.to_duckdb(table_name="distant_taxi_trips") # scan into duckdb scan_duck.sql("select * from distant_taxi_trips") duckdb.sql("INSTALL iceberg;") duckdb.sql("LOAD ICEBERG;") query=f""" SELECT * FROM iceberg_scan('{warehouse_path}/{namespace}.db/{table_name}') """ query # Query the Iceberg table result = duckdb.sql(query) # Detach the existing "sqlite_db" if it exists duckdb.execute("DETACH DATABASE IF EXISTS sqlite_db;") # Attach the SQLite database to DuckDB duckdb.execute("INSTALL sqlite_scanner;") duckdb.execute("LOAD sqlite_scanner;") duckdb.execute("ATTACH DATABASE '/tmp/warehouse/pyiceberg_catalog.db' AS sqlite_db;") duckdb.sql("select * from main.sqlite_master") !ls /tmp/ !ls /tmp/warehouse/ !cat /tmp/warehouse/pyiceberg_catalog.db !ls /tmp/warehouse/taxi_namespace.db !ls /tmp/warehouse/taxi_namespace.db/taxi_trips !ls /tmp/warehouse/taxi_namespace.db/taxi_trips/metadata !pip freeze