%%writefile requirements.txt
dbt-core==1.2.0
dbt-postgres==1.1.1
psycopg2-binary==2.9.3
ipython-sql==0.4.1
boto3==1.24.31
!pip install -r requirements.txt
import boto3
import json
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
In this step, we will download the data
pd.read_csv("taxi_rides.csv").to_parquet("yellow_tripdata_2022-05_sample_1000.parquet")
!wget -q --show-progress https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
taxi+_zone_lookup.c 100%[===================>] 12.03K --.-KB/s in 0s
yellow_tripdata_df = pd.read_parquet('yellow_tripdata_2022-05_sample_1000.parquet')
yellow_tripdata_df.head()
vendor_id | pickup_datetime | dropoff_datetime | passenger_count | trip_distance | rate_code_id | store_and_fwd_flag | pickup_location_id | dropoff_location_id | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2 | 2019-01-05 06:36:51 | 2019-01-05 06:50:42 | 1 | 3.72 | 1 | N | 68 | 236 | 1 | 14.0 | 0.0 | 0.5 | 1.00 | 0.0 | 0.3 | 15.80 | NaN |
1 | 1 | 2019-01-23 15:22:13 | 2019-01-23 15:32:50 | 1 | 3.30 | 1 | N | 12 | 232 | 2 | 12.5 | 0.0 | 0.5 | 0.00 | 0.0 | 0.3 | 13.30 | 0.0 |
2 | 2 | 2019-01-04 10:54:47 | 2019-01-04 11:18:31 | 2 | 3.09 | 1 | N | 234 | 236 | 1 | 17.0 | 0.0 | 0.5 | 3.56 | 0.0 | 0.3 | 21.36 | NaN |
3 | 1 | 2019-01-05 12:07:08 | 2019-01-05 12:14:06 | 1 | 1.00 | 1 | N | 25 | 49 | 1 | 6.0 | 0.0 | 0.5 | 1.35 | 0.0 | 0.3 | 8.15 | NaN |
4 | 2 | 2019-01-04 18:23:00 | 2019-01-04 18:25:22 | 5 | 0.41 | 1 | N | 151 | 151 | 1 | 3.5 | 1.0 | 0.5 | 0.80 | 0.0 | 0.3 | 6.10 | NaN |
yellow_tripdata_df.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 10000 entries, 0 to 9999 Data columns (total 18 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 vendor_id 10000 non-null int64 1 pickup_datetime 10000 non-null object 2 dropoff_datetime 10000 non-null object 3 passenger_count 10000 non-null int64 4 trip_distance 10000 non-null float64 5 rate_code_id 10000 non-null int64 6 store_and_fwd_flag 10000 non-null object 7 pickup_location_id 10000 non-null int64 8 dropoff_location_id 10000 non-null int64 9 payment_type 10000 non-null int64 10 fare_amount 10000 non-null float64 11 extra 10000 non-null float64 12 mta_tax 10000 non-null float64 13 tip_amount 10000 non-null float64 14 tolls_amount 10000 non-null float64 15 improvement_surcharge 10000 non-null float64 16 total_amount 10000 non-null float64 17 congestion_surcharge 3673 non-null float64 dtypes: float64(9), int64(6), object(3) memory usage: 1.4+ MB
lookup_zone = pd.read_csv('taxi+_zone_lookup.csv')
lookup_zone.head()
LocationID | Borough | Zone | service_zone | |
---|---|---|---|---|
0 | 1 | EWR | Newark Airport | EWR |
1 | 2 | Queens | Jamaica Bay | Boro Zone |
2 | 3 | Bronx | Allerton/Pelham Gardens | Boro Zone |
3 | 4 | Manhattan | Alphabet City | Yellow Zone |
4 | 5 | Staten Island | Arden Heights | Boro Zone |
lookup_zone.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 265 entries, 0 to 264 Data columns (total 4 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 LocationID 265 non-null int64 1 Borough 265 non-null object 2 Zone 264 non-null object 3 service_zone 263 non-null object dtypes: int64(1), object(3) memory usage: 8.4+ KB
yellow_tripdata_df.columns
Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'rate_code_id', 'store_and_fwd_flag', 'pickup_location_id', 'dropoff_location_id', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge'], dtype='object')
# select only few columns that we are interested in
cols_to_select = ['vendor_id',
'pickup_datetime',
'dropoff_datetime',
'passenger_count',
'pickup_location_id',
'dropoff_location_id',
'fare_amount']
yellow_tripdata_df = yellow_tripdata_df[cols_to_select]
yellow_tripdata_df
vendor_id | pickup_datetime | dropoff_datetime | passenger_count | pickup_location_id | dropoff_location_id | fare_amount | |
---|---|---|---|---|---|---|---|
0 | 2 | 2019-01-05 06:36:51 | 2019-01-05 06:50:42 | 1 | 68 | 236 | 14.0 |
1 | 1 | 2019-01-23 15:22:13 | 2019-01-23 15:32:50 | 1 | 12 | 232 | 12.5 |
2 | 2 | 2019-01-04 10:54:47 | 2019-01-04 11:18:31 | 2 | 234 | 236 | 17.0 |
3 | 1 | 2019-01-05 12:07:08 | 2019-01-05 12:14:06 | 1 | 25 | 49 | 6.0 |
4 | 2 | 2019-01-04 18:23:00 | 2019-01-04 18:25:22 | 5 | 151 | 151 | 3.5 |
... | ... | ... | ... | ... | ... | ... | ... |
9995 | 2 | 2019-01-01 05:39:35 | 2019-01-01 06:04:19 | 1 | 162 | 132 | 52.0 |
9996 | 2 | 2019-01-31 14:23:02 | 2019-01-31 14:35:24 | 1 | 170 | 230 | 9.0 |
9997 | 1 | 2019-01-24 13:08:20 | 2019-01-24 13:30:16 | 1 | 170 | 249 | 14.0 |
9998 | 2 | 2019-01-24 07:29:50 | 2019-01-25 06:39:33 | 1 | 249 | 186 | 7.5 |
9999 | 2 | 2019-01-12 01:22:17 | 2019-01-12 01:33:08 | 5 | 113 | 246 | 9.5 |
10000 rows × 7 columns
# rename the columns
lookup_zone.columns = ['locationid','borough','zone','service_zone']
lookup_zone
locationid | borough | zone | service_zone | |
---|---|---|---|---|
0 | 1 | EWR | Newark Airport | EWR |
1 | 2 | Queens | Jamaica Bay | Boro Zone |
2 | 3 | Bronx | Allerton/Pelham Gardens | Boro Zone |
3 | 4 | Manhattan | Alphabet City | Yellow Zone |
4 | 5 | Staten Island | Arden Heights | Boro Zone |
... | ... | ... | ... | ... |
260 | 261 | Manhattan | World Trade Center | Yellow Zone |
261 | 262 | Manhattan | Yorkville East | Yellow Zone |
262 | 263 | Manhattan | Yorkville West | Yellow Zone |
263 | 264 | Unknown | NV | NaN |
264 | 265 | Unknown | NaN | NaN |
265 rows × 4 columns
In this step, we will load the data into postgres
# Setup the credentials
def get_secret(secret_name):
region_name = "us-east-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
get_secret_value_response = json.loads(get_secret_value_response['SecretString'])
return get_secret_value_response
db_credentials = get_secret(secret_name='wysde')
USERNAME = db_credentials["RDS_POSTGRES_USERNAME"]
PASSWORD = db_credentials["RDS_POSTGRES_PASSWORD"]
HOST = "database-1.cy8ltogyfgas.us-east-1.rds.amazonaws.com"
PORT = 5432
DBNAME = "sparsh"
DBT_SCHEMA = "dbt_taxi_duplicate"
CONN = f"postgresql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}"
# load the data into our postgres database
alchemyEngine = create_engine(CONN, pool_recycle=3600);
postgreSQLConnection = alchemyEngine.connect();
lookup_zone.to_sql('taxi_zone_lookup', postgreSQLConnection, if_exists='replace', schema=DBT_SCHEMA, index=False)
yellow_tripdata_df.to_sql('yellow_taxi_trips', postgreSQLConnection, if_exists='replace', schema=DBT_SCHEMA, index=False)
postgreSQLConnection.close()
%reload_ext sql
%sql {CONN}
%sql select * from {DBT_SCHEMA}.yellow_taxi_trips limit 10;
* postgresql://postgres:***@database-1.cy8ltogyfgas.us-east-1.rds.amazonaws.com:5432/sparsh 10 rows affected.
vendor_id | pickup_datetime | dropoff_datetime | passenger_count | pickup_location_id | dropoff_location_id | fare_amount |
---|---|---|---|---|---|---|
2 | 2019-01-05 06:36:51 | 2019-01-05 06:50:42 | 1 | 68 | 236 | 14.0 |
1 | 2019-01-23 15:22:13 | 2019-01-23 15:32:50 | 1 | 12 | 232 | 12.5 |
2 | 2019-01-04 10:54:47 | 2019-01-04 11:18:31 | 2 | 234 | 236 | 17.0 |
1 | 2019-01-05 12:07:08 | 2019-01-05 12:14:06 | 1 | 25 | 49 | 6.0 |
2 | 2019-01-04 18:23:00 | 2019-01-04 18:25:22 | 5 | 151 | 151 | 3.5 |
1 | 2019-01-06 10:52:46 | 2019-01-06 11:08:39 | 2 | 144 | 161 | 15.0 |
2 | 2019-01-22 18:26:28 | 2019-01-22 18:35:38 | 1 | 142 | 238 | 8.0 |
2 | 2019-01-20 23:03:56 | 2019-01-20 23:21:49 | 1 | 114 | 239 | 18.0 |
2 | 2019-01-23 00:17:10 | 2019-01-23 00:32:58 | 1 | 161 | 79 | 12.5 |
1 | 2019-01-28 12:58:13 | 2019-01-28 13:47:02 | 1 | 236 | 138 | 43.0 |
%sql select count(*) from {DBT_SCHEMA}.yellow_taxi_trips limit 10;
* postgresql://postgres:***@database-1.cy8ltogyfgas.us-east-1.rds.amazonaws.com:5432/sparsh 1 rows affected.
count |
---|
10000 |
%sql select count(*) from {DBT_SCHEMA}.taxi_zone_lookup limit 10;
* postgresql://postgres:***@database-1.cy8ltogyfgas.us-east-1.rds.amazonaws.com:5432/sparsh 1 rows affected.
count |
---|
265 |
Initiate the dbt project
!dbt init
%cd nyctaxi
# let's see what's all there
!tree --du -h -C .
. ├── [ 571] README.md ├── [ 96] analyses ├── [1.3K] dbt_project.yml ├── [ 96] macros ├── [1.3K] models │ └── [1.2K] example │ ├── [ 475] my_first_dbt_model.sql │ ├── [ 115] my_second_dbt_model.sql │ └── [ 437] schema.yml ├── [ 96] seeds ├── [ 96] snapshots └── [ 96] tests 3.9K used in 7 directories, 5 files
!tree --du -h -C ~/.dbt
/Users/sparshagarwal/.dbt
└── [2.1K] profiles.yml
2.2K used in 0 directories, 1 file
# !cat ~/.dbt/profiles.yml
Setup the database credentials
# %%writefile ~/.dbt/profiles.yml
# nyctaxi:
# outputs:
# dev:
# type: postgres
# threads: 1
# host: database-1.cy8ltogyfgas.us-east-1.rds.amazonaws.com
# port: 5432
# user: postgres
# pass:
# dbname: sparsh
# schema: dbt_taxi
# target: dev
!dbt debug
!dbt run
!dbt test
!tree --du -h -C .
!mkdir -p ./models/staging
%%writefile ./models/staging/yellow_taxi_trips_models.sql
select
vendor_id,
pickup_datetime,
dropoff_datetime,
passenger_count,
pickup_location_id,
dropoff_location_id,
fare_amount
from {{ source('source', 'yellow_taxi_trips') }}
Writing ./models/staging/yellow_taxi_trips_models.sql
%%writefile ./models/staging/taxi_zone_lookup_model.sql
select
locationid,
borough,
zone,
service_zone
from {{ source('source', 'taxi_zone_lookup') }}
Writing ./models/staging/taxi_zone_lookup_model.sql
%%writefile ./models/staging/schema.yml
version: 2
sources:
- name: source
schema: dbt_taxi
tables:
- name: yellow_taxi_trips
- name: taxi_zone_lookup
models:
- name: taxi_zone_lookup_model
description: "A list of all taxi zones with codes in NYC"
columns:
- name: locationid
tests:
- not_null
- name: borough
tests:
- not_null
- name: zone
tests:
- not_null
- name: service_zone
tests:
- not_null
- name: yellow_taxi_trips_models
description: "A reduced version of yellow taxi trip data in NYC"
columns:
- name: vendor_id
tests:
- not_null
- accepted_values:
values: ['1', '2', '4']
- name: pickup_datetime
tests:
- not_null
- name: dropoff_datetime
tests:
- not_null
- name: passenger_count
tests:
- not_null
- name: pickup_location_id
tests:
- not_null
- name: dropoff_location_id
tests:
- not_null
- name: fare_amount
tests:
- not_null
Writing ./models/staging/schema.yml
We will now create another dbt model, which combines data from the two staging models. Let's assume we want to write a query to join the staging tables on the location ID fields and add the actual location names to the pickup and dropoff locations of the taxi ride data.
!mkdir -p ./models/marketing
%%writefile ./models/marketing/trips_with_borough_name_model.sql
select
t.vendor_id,
t.pickup_datetime,
t.dropoff_datetime,
z1.borough as pickup_borough,
z2.borough as dropoff_borough,
t.passenger_count,
t.fare_amount
from {{ ref('yellow_taxi_trips_models') }} t
left join {{ ref('taxi_zone_lookup_model') }} z1
on t.pickup_location_id = z1.locationid
left join {{ ref('taxi_zone_lookup_model') }} z2
on t.dropoff_location_id = z2.locationid
Writing ./models/marketing/trips_with_borough_name_model.sql
%%writefile ./models/marketing/trips_vendor_model.sql
select
t.vendor_id,
t.pickup_datetime,
from {{ ref('yellow_taxi_trips_models') }} t
Writing ./models/marketing/trips_vendor_model.sql
%%writefile ./models/marketing/schema.yml
version: 2
models:
- name: trips_with_borough_name_model
description: "Combines taxi rides with the borough names for pickup and dropoff locations."
columns:
- name: vendor_id
- name: pickup_datetime
- name: dropoff_datetime
- name: pickup_borough
- name: dropoff_borough
- name: passenger_count
- name: fare_amount
- name: trips_vendor_model
description: "Shows the pickup datetime and vendor id."
columns:
- name: vendor_id
- name: pickup_datetime
Writing ./models/marketing/schema.yml
!dbt run
!dbt docs generate
!dbt docs serve
11:20:47 Running with dbt=1.3.1 11:20:47 Serving docs at 0.0.0.0:8080 11:20:47 To access from your browser, navigate to: http://localhost:8080 11:20:47 11:20:47 11:20:47 Press Ctrl+C to exit. 127.0.0.1 - - [28/Feb/2023 16:50:49] "GET / HTTP/1.1" 200 - 127.0.0.1 - - [28/Feb/2023 16:50:52] "GET /manifest.json?cb=1677583252105 HTTP/1.1" 200 - 127.0.0.1 - - [28/Feb/2023 16:50:52] "GET /catalog.json?cb=1677583252105 HTTP/1.1" 200 - 127.0.0.1 - - [28/Feb/2023 16:50:53] "GET / HTTP/1.1" 304 - 127.0.0.1 - - [28/Feb/2023 16:50:55] "GET /manifest.json?cb=1677583255187 HTTP/1.1" 200 - 127.0.0.1 - - [28/Feb/2023 16:50:55] "GET /catalog.json?cb=1677583255187 HTTP/1.1" 200 - ^C