BUCKET_ID
fileBefore proceeding, ensure that you have a backup copy of the BUCKET_ID
file created in the Chapter 2 notebook before proceeding. The contents of the BUCKET_ID
file are reused later in this notebook and in the other notebooks.
import os
from pathlib import Path
assert Path('BUCKET_ID').exists(), "Place the BUCKET_ID file in the current directory before proceeding"
BUCKET_ID = Path('BUCKET_ID').read_text().strip()
os.environ['BUCKET_ID'] = BUCKET_ID
os.environ['BUCKET_ID']
This is unnecessary if you have already installed AWS CLI in a preceding notebook.
%%bash
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip -o awscliv2.zip
sudo ./aws/install
Modify the contents of the next cell to specify your AWS credentials as strings.
If you see the following exception:
TypeError: str expected, not NoneType
It means that you did not specify the credentials correctly.
import os
# *** REPLACE None in the next 2 lines with your AWS key values ***
os.environ['AWS_ACCESS_KEY_ID'] = None
os.environ['AWS_SECRET_ACCESS_KEY'] = None
Run the next cell to validate your credentials.
%%bash
aws sts get-caller-identity
If you have specified the correct credentials as values for the AWS_ACCESS_KEY_ID
and the AWS_SECRET_ACCESS_KEY
environment variables, then aws sts get-caller-identity
used by the previous cell should have returned back the UserId
, Account
and the Arn
for the credentials, resembling the following
{
"UserId": "█████████████████████",
"Account": "████████████",
"Arn": "arn:aws:iam::████████████:user/█████████"
}
Replace the None
in the next cell with your AWS region name, for example us-west-2
.
# *** REPLACE None in the next line with your AWS region ***
os.environ['AWS_DEFAULT_REGION'] = None
If you have specified the region correctly, the following cell should return back the region that you have specifies.
%%bash
echo $AWS_DEFAULT_REGION
Download a tiny sample of the dataset from https://gist.github.com/osipov/1fc0265f8f829d9d9eee8393657423a9 to a trips_sample.csv
file which you are going to use to learn about using the Athena interface.
%%bash
wget -q https://gist.githubusercontent.com/osipov/1fc0265f8f829d9d9eee8393657423a9/raw/9957c1f09cdfa64f8b8d89cfec532a0e150d5178/trips_sample.csv
ls -ltr trips_sample.csv
cat trips_sample.csv
Assuming the previous cell executed successfully, it should have returned the following result:
-rw-r--r-- 1 root root 378 Nov 23 19:50 trips_sample.csv
fareamount_double,origin_block_latitude,origin_block_longitude,destination_block_latitude,destination_block_longitude
8.11,38.900769,-77.033644,38.912239,-77.036514
5.95,38.912609,-77.030788,38.906445,-77.023978
7.57,38.900773,-77.03655,38.896131,-77.024975
11.61,38.892101000000004,-77.044208,38.905969,-77.06564399999999
4.87,38.899615000000004,-76.980387,38.900638,-76.97023
trips_sample.csv
to your object storage bucket¶%%bash
aws s3 cp trips_sample.csv s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/trips_sample.csv
aws s3 ls s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/trips_sample.csv
The output of the aws s3 ls
command above should include the following, confirming that the file was uploaded successfully.
2020-11-23 20:07:31 378 trips_sample.csv
Create a dc_taxi_athena_workgroup
for your Athena project, assuming one does not exist yet.
%%bash
aws athena delete-work-group --work-group dc_taxi_athena_workgroup --recursive-delete-option 2> /dev/null
aws athena create-work-group --name dc_taxi_athena_workgroup \
--configuration "ResultConfiguration={OutputLocation=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/athena},EnforceWorkGroupConfiguration=false,PublishCloudWatchMetricsEnabled=false"
%%bash
SQL="
CREATE EXTERNAL TABLE IF NOT EXISTS dc_taxi_db.dc_taxi_csv_sample_strings(
fareamount STRING,
origin_block_latitude STRING,
origin_block_longitude STRING,
destination_block_latitude STRING,
destination_block_longitude STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/'
TBLPROPERTIES ('skip.header.line.count'='1');"
ATHENA_QUERY_ID=$(aws athena start-query-execution \
--work-group dc_taxi_athena_workgroup \
--query 'QueryExecutionId' \
--output text \
--query-string "$SQL")
echo $SQL
echo $ATHENA_QUERY_ID
until aws athena get-query-execution \
--query 'QueryExecution.Status.State' \
--output text \
--query-execution-id $ATHENA_QUERY_ID | grep -v "RUNNING";
do
printf '.'
sleep 1;
done
The script is downloaded as utils.sh
and is loaded in the upcoming cells using source utils.sh
command.
%%bash
wget -q https://raw.githubusercontent.com/osipov/smlbook/master/utils.sh
ls -l utils.sh
%%bash
source utils.sh
SQL="
SELECT
origin_block_latitude || ' , ' || origin_block_longitude
AS origin,
destination_block_latitude || ' , ' || destination_block_longitude
AS destination
FROM
dc_taxi_db.dc_taxi_csv_sample_strings
"
athena_query_to_table "$SQL" "ResultSet.Rows[*].[Data[0].VarCharValue,Data[1].VarCharValue]"
%%bash
source utils.sh ; athena_query_to_pandas """
SELECT
origin_block_latitude || ' , ' || origin_block_longitude
AS origin,
destination_block_latitude || ' , ' || destination_block_longitude
AS destination
FROM
dc_taxi_db.dc_taxi_csv_sample_strings
"""
Note that the utils.sh
script saves the output from Athena to /tmp/awscli.json
import pandas as pd
def awscli_to_df():
json_df = pd.read_json('/tmp/awscli.json')
df = pd.DataFrame(json_df[0].tolist(), index = json_df.index, columns = json_df[0].tolist()[0]).drop(0, axis = 0)
return df
awscli_to_df()
DOUBLE
¶%%bash
source utils.sh ; athena_query "
CREATE EXTERNAL TABLE IF NOT EXISTS dc_taxi_db.dc_taxi_csv_sample_double(
fareamount DOUBLE,
origin_block_latitude DOUBLE,
origin_block_longitude DOUBLE,
destination_block_latitude DOUBLE,
destination_block_longitude DOUBLE
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/samples/'
TBLPROPERTIES ('skip.header.line.count'='1');
"
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT ROUND(MAX(fareamount) - MIN(fareamount), 2)
FROM dc_taxi_db.dc_taxi_csv_sample_double
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT fareamount_double,
origin_block_latitude_double,
origin_block_longitude_double,
destination_block_latitude_double,
destination_block_longitude_double,
origindatetime_tr
FROM dc_taxi_db.dc_taxi_parquet
LIMIT 10
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
(SELECT COUNT(*) FROM dc_taxi_db.dc_taxi_parquet) AS total,
COUNT(*) AS null_origindate_time_total
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
origindatetime_tr IS NULL
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
(SELECT COUNT(*) FROM dc_taxi_db.dc_taxi_parquet)
- COUNT(DATE_PARSE(origindatetime_tr, '%m/%d/%Y %H:%i'))
AS origindatetime_not_parsed
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
origindatetime_tr IS NOT NULL;
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
ROUND(100.0 * COUNT(*) / (SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet), 2)
AS percentage_null,
(SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet
WHERE origin_block_longitude_double IS NULL
OR origin_block_latitude_double IS NULL)
AS either_null,
(SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet
WHERE origin_block_longitude_double IS NULL
AND origin_block_latitude_double IS NULL)
AS both_null
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
origin_block_longitude_double IS NULL
OR origin_block_latitude_double IS NULL
"
awscli_to_df()
Repeat the previous analysis
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
ROUND(100.0 * COUNT(*) / (SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet), 2)
AS percentage_null,
(SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet
WHERE destination_block_longitude_double IS NULL
OR destination_block_latitude_double IS NULL)
AS either_null,
(SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet
WHERE destination_block_longitude_double IS NULL
AND destination_block_latitude_double IS NULL)
AS both_null
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
destination_block_longitude_double IS NULL
OR destination_block_latitude_double IS NULL
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
COUNT(*)
AS total,
ROUND(100.0 * COUNT(*) / (SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet), 2)
AS percent
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
origin_block_latitude_double IS NULL
OR origin_block_longitude_double IS NULL
OR destination_block_latitude_double IS NULL
OR destination_block_longitude_double IS NULL
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
fareamount_string,
COUNT(fareamount_string) AS rows,
ROUND(100.0 * COUNT(fareamount_string) /
( SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet), 2)
AS percent
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
fareamount_double IS NULL
AND fareamount_string IS NOT NULL
GROUP BY
fareamount_string;
"
awscli_to_df()
fareamount_double
column¶%%bash
source utils.sh ; athena_query_to_pandas "
WITH
src AS (SELECT
fareamount_double AS val
FROM
dc_taxi_db.dc_taxi_parquet),
stats AS
(SELECT
MIN(val) AS min,
APPROX_PERCENTILE(val, 0.25) AS q1,
APPROX_PERCENTILE(val ,0.5) AS q2,
APPROX_PERCENTILE(val, 0.75) AS q3,
AVG(val) AS mean,
STDDEV(val) AS std,
MAX(val) AS max
FROM
src)
SELECT
DISTINCT min, q1, q2, q3, max
FROM
dc_taxi_db.dc_taxi_parquet, stats
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
WITH
src AS (SELECT
COUNT(*) AS total
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
fareamount_double IS NOT NULL)
SELECT
ROUND(100.0 * COUNT(fareamount_double) / MIN(total), 2) AS percent
FROM
dc_taxi_db.dc_taxi_parquet, src
WHERE
fareamount_double < 3.25
AND fareamount_double IS NOT NULL
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
fareamount_string,
ROUND( MIN(mileage_double), 2) AS min,
ROUND( APPROX_PERCENTILE(mileage_double, 0.25), 2) AS q1,
ROUND( APPROX_PERCENTILE(mileage_double ,0.5), 2) AS q2,
ROUND( APPROX_PERCENTILE(mileage_double, 0.75), 2) AS q3,
ROUND( MAX(mileage_double), 2) AS max
FROM
dc_taxi_db.dc_taxi_parquet
WHERE
fareamount_string LIKE 'NULL'
GROUP BY
fareamount_string
"
awscli_to_df()
Plugging the latitude and longitude coordinates reported by the query into OpenStreetMap ( https://www.openstreetmap.org/directions?engine=fossgis_osrm_car&route=38.8110%2C-77.1130%3B38.9950%2C-76.9100#map=11/38.9025/-77.0094 ) yields 21.13 miles or an estimate of $ \$48.89 (21.13 * \$2.16/mile + \$3.25) $.
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
MIN(lat) AS lower_left_latitude,
MIN(lon) AS lower_left_longitude,
MAX(lat) AS upper_right_latitude,
MAX(lon) AS upper_right_longitude
FROM (
SELECT
MIN(origin_block_latitude_double) AS lat,
MIN(origin_block_longitude_double) AS lon
FROM "dc_taxi_db"."dc_taxi_parquet"
UNION
SELECT
MIN(destination_block_latitude_double) AS lat,
MIN(destination_block_longitude_double) AS lon
FROM "dc_taxi_db"."dc_taxi_parquet"
UNION
SELECT
MAX(origin_block_latitude_double) AS lat,
MAX(origin_block_longitude_double) AS lon
FROM "dc_taxi_db"."dc_taxi_parquet"
UNION
SELECT
MAX(destination_block_latitude_double) AS lat,
MAX(destination_block_longitude_double) AS lon
FROM "dc_taxi_db"."dc_taxi_parquet"
)
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
WITH dc_taxi AS
(SELECT *,
origindatetime_tr
|| fareamount_string
|| origin_block_latitude_string
|| origin_block_longitude_string
|| destination_block_latitude_string
|| destination_block_longitude_string
|| mileage_string AS objectid
FROM "dc_taxi_db"."dc_taxi_parquet"
WHERE fareamount_double >= 3.25
AND fareamount_double IS NOT NULL
AND mileage_double > 0 )
SELECT AVG(mileage_double) AS average_mileage
FROM dc_taxi
WHERE objectid IS NOT NULL
GROUP BY MOD( ABS( from_big_endian_64( xxhash64( to_utf8( objectid ) ) ) ), 1000)
" ResultSet.Rows[*].[Data[].VarCharValue] 1000
awscli_to_df()
%matplotlib inline
import matplotlib.pyplot as plt
plt.figure(figsize = (12, 9))
df = awscli_to_df()
df.average_mileage = df.average_mileage.astype(float)
df.average_mileage -= df.average_mileage.mean()
df.average_mileage /= df.average_mileage.std()
df.average_mileage.plot.hist(bins = 30);
%%bash
source utils.sh ; athena_query_to_pandas "
WITH dc_taxi AS
(SELECT *,
origindatetime_tr
|| fareamount_string
|| origin_block_latitude_string
|| origin_block_longitude_string
|| destination_block_latitude_string
|| destination_block_longitude_string
|| mileage_string AS objectid
FROM "dc_taxi_db"."dc_taxi_parquet"
WHERE fareamount_double >= 3.25
AND fareamount_double IS NOT NULL
AND mileage_double > 0 ),
dc_taxi_samples AS (
SELECT AVG(mileage_double) AS average_mileage
FROM dc_taxi
WHERE objectid IS NOT NULL
GROUP BY MOD( ABS( from_big_endian_64( xxhash64( to_utf8( objectid ) ) ) ) , 1000)
)
SELECT AVG(average_mileage) + 4 * STDDEV(average_mileage)
FROM dc_taxi_samples
"
awscli_to_df()
upper_mileage = 2.16 * awscli_to_df().mean().item() + 3.25
upper_mileage
%%bash
source utils.sh ; athena_query_to_pandas "
WITH dc_taxi AS
(SELECT *,
origindatetime_tr
|| fareamount_string
|| origin_block_latitude_string
|| origin_block_longitude_string
|| destination_block_latitude_string
|| destination_block_longitude_string
|| mileage_string AS objectid
FROM "dc_taxi_db"."dc_taxi_parquet"
WHERE fareamount_double >= 3.25
AND fareamount_double IS NOT NULL
AND mileage_double > 0 ),
dc_taxi_samples AS (
SELECT AVG(fareamount_double) AS average_fareamount
FROM dc_taxi
WHERE objectid IS NOT NULL
GROUP BY MOD( ABS( from_big_endian_64( xxhash64( to_utf8( objectid ) ) ) ) , 1000)
)
SELECT AVG(average_fareamount) + 4 * STDDEV(average_fareamount)
FROM dc_taxi_samples
"
awscli_to_df()
upper_fareamount = awscli_to_df().mean().item()
upper_fareamount
means = [15.96, 29.19, 48.89, 560, 2,422.45]
sum(means) / len(means)
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
100.0 * COUNT(fareamount_double) /
(SELECT COUNT(*)
FROM dc_taxi_db.dc_taxi_parquet
WHERE fareamount_double IS NOT NULL) AS percent
FROM
dc_taxi_db.dc_taxi_parquet
WHERE (fareamount_double < 3.25 OR fareamount_double > 179.75)
AND fareamount_double IS NOT NULL;
"
awscli_to_df()
%%bash
source utils.sh ; athena_query_to_pandas "
WITH src AS (SELECT fareamount_double AS val
FROM dc_taxi_db.dc_taxi_parquet
WHERE fareamount_double IS NOT NULL
AND fareamount_double >= 3.25
AND fareamount_double <= 180.0),
stats AS
(SELECT
ROUND(MIN(val), 2) AS min,
ROUND(APPROX_PERCENTILE(val, 0.25), 2) AS q1,
ROUND(APPROX_PERCENTILE(val, 0.5), 2) AS q2,
ROUND(APPROX_PERCENTILE(val, 0.75), 2) AS q3,
ROUND(AVG(val), 2) AS mean,
ROUND(STDDEV(val), 2) AS std,
ROUND(MAX(val), 2) AS max
FROM src)
SELECT min, q1, q2, q3, max, mean, std
FROM stats;
"
awscli_to_df()
Using the SQL query and OpenStreetMap ( https://www.openstreetmap.org/directions?engine=fossgis_osrm_car&route=38.8106%2C-77.1134%3B38.9940%2C-76.9100#map=11/38.9025/-77.0210 ) check that the minimum and maximum coordinates for the origin latitude and longitude columns confirm that resulting pairs (38.81138, -77.113633) and (38.994217, -76.910012) as well as ( 38.994217, -76.910012) and (38.81138, -77.113633) (https://www.openstreetmap.org/directions?engine=fossgis_osrm_car&route=38.994217%2C-76.910012%3B38.81138%2C-77.113633#map=11/38.9025/-77.0210 ) are within DC boundaries.
%%bash
source utils.sh ; athena_query_to_pandas "
SELECT
MIN(origin_block_latitude_double) AS olat_min,
MIN(origin_block_longitude_double) AS olon_min,
MAX(origin_block_latitude_double) AS olat_max,
MAX(origin_block_longitude_double) AS olon_max,
MIN(destination_block_latitude_double) AS dlat_min,
MIN(destination_block_longitude_double) AS dlon_min,
MAX(destination_block_latitude_double) AS dlat_max,
MAX(destination_block_longitude_double) AS dlon_max
FROM
dc_taxi_db.dc_taxi_parquet
"
awscli_to_df()
The next cell uses the Jupyter %%writefile
magic to save the source code for the PySpark job to the dctaxi_parquet_vacuum.py
file.
%%writefile dctaxi_parquet_vacuum.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME',
'BUCKET_SRC_PATH',
'BUCKET_DST_PATH',
])
BUCKET_SRC_PATH = args['BUCKET_SRC_PATH']
BUCKET_DST_PATH = args['BUCKET_DST_PATH']
sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = ( spark
.read
.parquet(f"{BUCKET_SRC_PATH}") )
Apply the SQL query developed though the VACUUM-based analysis of the data to prepare a version of the dataset without any NULL
values and with an interval applied to the fareamount_double
column.
%%writefile -a dctaxi_parquet_vacuum.py
df.createOrReplaceTempView("dc_taxi_parquet")
query_df = spark.sql("""
SELECT
fareamount_double,
origindatetime_tr,
origin_block_latitude_double,
origin_block_longitude_double,
destination_block_latitude_double,
destination_block_longitude_double
FROM
dc_taxi_parquet
WHERE
origindatetime_tr IS NOT NULL
AND fareamount_double IS NOT NULL
AND fareamount_double >= 3.25
AND fareamount_double <= 180.0
AND origin_block_latitude_double IS NOT NULL
AND origin_block_longitude_double IS NOT NULL
AND destination_block_latitude_double IS NOT NULL
AND destination_block_longitude_double IS NOT NULL
""".replace('\n', ''))
Convert the original, STRING
formatted origindatetime_tr
column into a SQL TIMESTAMP
column named origindatetime_ts
. The conversion is needed to extract the year, month, day of the week (dow
), and hour of the taxi trip as separate numeric, INTEGER
columns for machine learning. Lastly, drop any records that are missing values (for example due to failed conversion), or are duplicated in the dataset.
%%writefile -a dctaxi_parquet_vacuum.py
#parse to check for valid value of the original timestamp
from pyspark.sql.functions import col, to_timestamp, dayofweek, year, month, hour
from pyspark.sql.types import IntegerType
#convert the source timestamp into numeric data needed for machine learning
query_df = (query_df
.withColumn("origindatetime_ts", to_timestamp("origindatetime_tr", "dd/MM/yyyy HH:mm"))
.where(col("origindatetime_ts").isNotNull())
.drop("origindatetime_tr")
.withColumn( 'year_integer', year('origindatetime_ts').cast(IntegerType()) )
.withColumn( 'month_integer', month('origindatetime_ts').cast(IntegerType()) )
.withColumn( 'dow_integer', dayofweek('origindatetime_ts').cast(IntegerType()) )
.withColumn( 'hour_integer', hour('origindatetime_ts').cast(IntegerType()) )
.drop('origindatetime_ts') )
#drop missing data and duplicates
query_df = ( query_df
.dropna()
.drop_duplicates() )
Persists the cleaned up dataset as a Parquet formatted dataset in the AWS S3 location specified by the BUCKET_DST_PATH
parameter. The save_stats_metadata
function computes summary statistics of the clean up dataset and saves the statistics as a single CSV file located in a AWS S3 subfolder named .meta/stats
under the S3 location from the BUCKET_DST_PATH
parameter.
%%writefile -a dctaxi_parquet_vacuum.py
(query_df
.write
.parquet(f"{BUCKET_DST_PATH}", mode="overwrite"))
def save_stats_metadata(df, dest, header = 'true', mode = 'overwrite'):
return (df.describe()
.coalesce(1)
.write
.option("header", header)
.csv(dest, mode = mode))
save_stats_metadata(query_df, f"{BUCKET_DST_PATH}/.meta/stats")
job.commit()
Once the PySpark job completes successfully, the job execution status should change from RUNNING
to SUCCEEDED
.
%%bash
source utils.sh
PYSPARK_SRC_NAME=dctaxi_parquet_vacuum.py \
PYSPARK_JOB_NAME=dc-taxi-parquet-vacuum-job \
BUCKET_SRC_PATH=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet \
BUCKET_DST_PATH=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/vacuum \
run_job
In case of a successful completion, the last cell should have produced an output similar to the following:
2021-06-01 23:34:56 1840 dctaxi_parquet_vacuum.py
{
"JobName": "dc-taxi-parquet-vacuum-job"
}
{
"Name": "dc-taxi-parquet-vacuum-job"
}
{
"JobRunId": "jr_59eee7f229f448b39286f1bd19428c9082aaf6bed232342cc05e68f9246d131e"
}
Waiting for the job to finish...............SUCCEEDED
Once the PySpark job completes successfully, the job execution status should change from RUNNING
to SUCCEEDED
. You can run the next cell to get the updated job status.
!aws glue get-job-runs --job-name dc-taxi-parquet-vacuum-job --output text --query 'JobRuns[0].JobRunState'