gdown
is a Python utility for downloading files stored in Google Drive. The bash script in the following cell iterates through a collection of Google Drive identifiers that match files taxi_2015.zip
through taxi_2019.zip
stored in a shared Google Drive. This script uses these files instead of the original files from https://opendata.dc.gov/search?categories=transportation&q=taxi&type=document%20link since the originals cannot be easily downloaded using a bash script.
%%bash
pip install gdown
for ID in '1yF2hYrVjAZ3VPFo1dDkN80wUV2eEq65O'\
'1Z7ZVi79wKEbnc0FH3o0XKHGUS8MQOU6R'\
'1I_uraLKNbGPe3IeE7FUa9gPfwBHjthu4'\
'1MoY3THytktrxam6_hFSC8kW1DeHZ8_g1'\
'1balxhT6Qh_WDp4wq4OsG40iwqFa86QgW'
do
gdown --id $ID
done
The script in the following cell unzips the downloaded dataset files to the dctaxi
subdirectory in the current directory of the notebook. The -o
flag used by the unzip
command overwrites existing files in case you execute the next cell more than once.
%%bash
mkdir -p dctaxi
for YEAR in '2015' \
'2016' \
'2017' \
'2018' \
'2019'
do
unzip -o taxi_$YEAR.zip -d dctaxi
done
The next cell reports on the disk usage (du
) by the files from the DC taxi dataset. All of the files in the dataset have the taxi_
prefix. Since the entire output of the du
command lists the disk usage of all of the files, the tail
command is used to limit the output to just the last line. You can remove the tail
command (in other words, leave just du -cha taxi_*.txt
in the next cell) if you wish to report on the disk usage by the individual files in the dataset.
For reference, the entire report on disk usage is also available as a Github Gist here: https://gist.github.com/osipov/032505a9c7e7388a2384f893be9e0681
!du -cha --block-size=1MB dctaxi/taxi_*.txt | tail -n 1
The dataset includes a README_DC_Taxicab_trip.txt
file with a brief documentation about the dataset contents. Run the next cell and take a moment to review the documentation, focusing on the schema used by the dataset.
%%bash
cat dctaxi/README_DC_Taxicab_trip.txt
Run the next cell to confirm that the dataset consists of pipe (|
) separated values organized according to the schema described by the documentation. The taxi_2015_09.txt
file used in the next cell was picked arbitrarily, just to illustrate the dataset.
!head dctaxi/taxi_2015_09.txt
%%bash
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip -o awscliv2.zip
sudo ./aws/install
Modify the contents of the next cell to specify your AWS credentials as strings.
If you see the following exception:
TypeError: str expected, not NoneType
It means that you did not specify the credentials correctly.
import os
# *** REPLACE None in the next 2 lines with your AWS key values ***
os.environ['AWS_ACCESS_KEY_ID'] = None
os.environ['AWS_SECRET_ACCESS_KEY'] = None
Run the next cell to validate your credentials.
%%bash
aws sts get-caller-identity
If you have specified the correct credentials as values for the AWS_ACCESS_KEY_ID
and the AWS_SECRET_ACCESS_KEY
environment variables, then aws sts get-caller-identity
used by the previous cell should have returned back the UserId
, Account
and the Arn
for the credentials, resembling the following
{
"UserId": "█████████████████████",
"Account": "████████████",
"Arn": "arn:aws:iam::████████████:user/█████████"
}
Replace the None
in the next cell with your AWS region name, for example us-west-2
.
# *** REPLACE None in the next line with your AWS region ***
os.environ['AWS_DEFAULT_REGION'] = None
If you have specified the region correctly, the following cell should return back the region that you have specifies.
%%bash
echo $AWS_DEFAULT_REGION
Use the bash $RANDOM
pseudo-random number generator and the first 32 characters of the md5sum
output to produce a unique bucket ID.
BUCKET_ID = !echo $(echo $RANDOM | md5sum | cut -c -32)
os.environ['BUCKET_ID'] = next(iter(BUCKET_ID))
os.environ['BUCKET_ID']
BUCKET_ID
¶The next cell saves the contents of the BUCKET_ID
environment variable to a BUCKET_ID
file as a backup.
val = os.environ['BUCKET_ID']
%store val > BUCKET_ID
!cat BUCKET_ID
BUCKET_ID
fileEnsure that you have a backup copy of the BUCKET_ID
file created by the previous cell before proceeding. The contents of the BUCKET_ID
file are going to be reused later in this notebook and in the other notebooks.
%%bash
aws s3api create-bucket --bucket dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION --create-bucket-configuration LocationConstraint=$AWS_DEFAULT_REGION
If the previous cell executed successfully, then it should have produced an output resembling the following:
{
"Location": "http:/dc-taxi-████████████████████████████████-█████████.s3.amazonaws.com/"
}
You can return back the name of the bucket by running the following cell:
!echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION
You can also use the AWS CLI list-buckets
command to print out all the buckets that exist in your AWS account, however the printed names will not show the s3://
prefix:
!aws s3api list-buckets
Synchronize the contents of the dctaxi
directory (where you unzipped the dataset) to the csv
folder in the S3 bucket you just created.
%%bash
aws s3 sync \
--exclude 'README*' \
dctaxi/ s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/
You can check whether the aws sync
command completed successfully, by listing the contents of the newly created bucket. Run the following cell:
!aws s3 ls --recursive --summarize --human-readable s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/ | tail -n 2
which should have returned
Total Objects: 54
Total Size: 11.2 GiB
if the dataset was copied to S3 successfully.
As before you can remove the tail -n 2
part in the previous cell to report the entire contents of the csv
folder on S3.
%%bash
aws iam detach-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole && \
aws iam delete-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-name GlueBucketPolicy && \
aws iam delete-role --role-name AWSGlueServiceRole-dc-taxi
aws iam create-role --path "/service-role/" --role-name AWSGlueServiceRole-dc-taxi --assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}'
aws iam attach-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
aws iam put-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-name GlueBucketPolicy --policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/*"
]
}
]
}'
%%bash
aws glue delete-database --name dc_taxi_db 2> /dev/null
aws glue create-database --database-input '{
"Name": "dc_taxi_db"
}'
aws glue get-database --name 'dc_taxi_db'
Save the results of crawling the S3 bucket with the DC taxi dataset to the AWS Glue database created in the previous cell.
%%bash
aws glue delete-crawler --name dc-taxi-csv-crawler 2> /dev/null
aws glue create-crawler \
--name dc-taxi-csv-crawler \
--database-name dc_taxi_db \
--table-prefix dc_taxi_ \
--role $( aws iam get-role \
--role-name AWSGlueServiceRole-dc-taxi \
--query 'Role.Arn' \
--output text ) \
--targets '{
"S3Targets": [
{
"Path": "s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/csv"
}]
}'
aws glue start-crawler --name dc-taxi-csv-crawler
When running this notebook, you need to re-run the next cell to get updates on crawler status. It should cycle through STARTING
, RUNNING
, STOPPING
, and READY
.
It will take the crawler about a minute to finish crawling the DC taxi dataset.
%%bash
aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.State' --output text
Poll the crawler state every minute to wait for it to finish.
%%bash
printf "Waiting for crawler to finish..."
until echo "$(aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.State' --output text)" | grep -q "READY"; do
sleep 60
printf "..."
done
printf "done\n"
!aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.LastCrawl'
!aws glue get-table --database-name dc_taxi_db --name dc_taxi_csv
The next cell uses the Jupyter %%writefile
magic to save the source code for the PySpark job to the dctaxi_csv_to_parquet.py
file.
%%writefile dctaxi_csv_to_parquet.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME',
'BUCKET_SRC_PATH',
'BUCKET_DST_PATH',
'DST_VIEW_NAME'])
BUCKET_SRC_PATH = args['BUCKET_SRC_PATH']
BUCKET_DST_PATH = args['BUCKET_DST_PATH']
DST_VIEW_NAME = args['DST_VIEW_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = ( spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.option("multiLine", True)
.option("delimiter", "|")
.load(f"{BUCKET_SRC_PATH}") )
df.createOrReplaceTempView(f"{DST_VIEW_NAME}")
query_df = spark.sql(f"""
SELECT
CAST(fareamount AS DOUBLE) AS fareamount_double,
CAST(fareamount AS STRING) AS fareamount_string,
CAST(origindatetime_tr AS STRING) AS origindatetime_tr,
CAST(origin_block_latitude AS DOUBLE) AS origin_block_latitude_double,
CAST(origin_block_latitude AS STRING) AS origin_block_latitude_string,
CAST(origin_block_longitude AS DOUBLE) AS origin_block_longitude_double,
CAST(origin_block_longitude AS STRING) AS origin_block_longitude_string,
CAST(destination_block_latitude AS DOUBLE) AS destination_block_latitude_double,
CAST(destination_block_latitude AS STRING) AS destination_block_latitude_string,
CAST(destination_block_longitude AS DOUBLE) AS destination_block_longitude_double,
CAST(destination_block_longitude AS STRING) AS destination_block_longitude_string,
CAST(mileage AS DOUBLE) AS mileage_double,
CAST(mileage AS STRING) AS mileage_string
FROM {DST_VIEW_NAME}""".replace('\n', ''))
query_df.write.parquet(f"{BUCKET_DST_PATH}", mode="overwrite")
job.commit()
%%bash
aws s3 cp dctaxi_csv_to_parquet.py s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/glue/
aws s3 ls s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/glue/dctaxi_csv_to_parquet.py
%%bash
aws glue delete-job --job-name dc-taxi-csv-to-parquet-job 2> /dev/null
aws glue create-job \
--name dc-taxi-csv-to-parquet-job \
--role $(aws iam get-role --role-name AWSGlueServiceRole-dc-taxi --query 'Role.Arn' --output text) \
--default-arguments '{"--TempDir":"s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/glue/"}' \
--command '{
"ScriptLocation": "s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/glue/dctaxi_csv_to_parquet.py",
"Name": "glueetl",
"PythonVersion": "3"
}' \
--glue-version "2.0"
aws glue start-job-run \
--job-name dc-taxi-csv-to-parquet-job \
--arguments='--BUCKET_SRC_PATH="'$(
echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/*.txt
)'",
--BUCKET_DST_PATH="'$(
echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/
)'",
--DST_VIEW_NAME="dc_taxi_csv"'
In case of a successful completion, the last cell should have produced an output similar to the following:
{
"Name": "dc-taxi-csv-to-parquet-job"
}
{
"JobRunId": "jr_925ab8ea6e5bdd64d4491c6f641bcc58f5c7b0140edcdba9896052c70d3675fe"
}
Once the PySpark job completes successfully, the job execution status should change from RUNNING
to SUCCEEDED
. You can re-run the next cell to get the updated job status.
!aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --output text --query 'JobRuns[0].JobRunState'
Poll the job every minute to wait for it to finish
%%bash
printf "Waiting for the job to finish..."
while echo "$(aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --query 'JobRuns[0].JobRunState')" | grep -q -E "STARTING|RUNNING|STOPPING"; do
sleep 60
printf "..."
done
aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --output text --query 'JobRuns[0].JobRunState'
!aws s3 ls --recursive --summarize --human-readable s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/ | tail -n 2
Assuming the Parquet files have been correctly created in the S3 bucket, the previous cell should output the following:
Total Objects: 53
Total Size: 941.3 MiB
%%bash
aws glue delete-crawler --name dc-taxi-parquet-crawler 2> /dev/null
aws glue create-crawler --name dc-taxi-parquet-crawler --database-name dc_taxi_db --table-prefix dc_taxi_ --role `aws iam get-role --role-name AWSGlueServiceRole-dc-taxi --query 'Role.Arn' --output text` --targets '{
"S3Targets": [
{
"Path": "s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/parquet/"
}]
}'
aws glue start-crawler --name dc-taxi-parquet-crawler
Once done, the crawler should return to a READY
state.
!aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text
Poll the crawler status every minute to wait for it to finish
%%bash
printf "Waiting for crawler to finish..."
until echo "$(aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text)" | grep -q "READY"; do
sleep 10
printf "..."
done
aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text
dc_taxi_parquet
table¶If the crawler completed successfully, the number of records in the dc_taxi_parquet
table as reported by the following command should be equal to 53173692
!aws glue get-table --database-name dc_taxi_db --name dc_taxi_parquet --query "Table.Parameters.recordCount" --output text