!mkdir -p ~/.aws && cp /content/drive/MyDrive/AWS/684947_admin ~/.aws/credentials
!chmod 600 ~/.aws/credentials
!pip install -qq awscli boto3
!aws sts get-caller-identity
BUCKET_NAME = "wys-glueworkshop"
%env BUCKET_NAME=$BUCKET_NAME
env: BUCKET_NAME=wys-glueworkshop
!aws s3 mb s3://$BUCKET_NAME
!aws s3api put-public-access-block --bucket $BUCKET_NAME \
--public-access-block-configuration "BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true"
Note: We will use data from a public COVID-19 dataset curated by AWS. If you are interested in learning more about the dataset, read this blog post for more information.
!curl 'https://static.us-east-1.prod.workshops.aws/public/058af3a5-469d-4a2a-9619-1190c6a970ec/static/download/glue-workshop.zip' --output glue-workshop.zip
!unzip glue-workshop.zip
!mkdir glue-workshop/library
!mkdir glue-workshop/output
!git clone https://github.com/jefftune/pycountry-convert.git
%cd pycountry-convert
!zip -r pycountry_convert.zip pycountry_convert/
%cd ..
!mv pycountry-convert/pycountry_convert.zip glue-workshop/library/
!aws s3 sync glue-workshop/code/ s3://$BUCKET_NAME/script/
!aws s3 sync glue-workshop/data/ s3://$BUCKET_NAME/input/
!aws s3 sync glue-workshop/library/ s3://$BUCKET_NAME/library/
!aws s3 sync s3://covid19-lake/rearc-covid-19-testing-data/json/states_daily/ s3://$BUCKET_NAME/input/lab5/json/
!aws s3 ls $BUCKET_NAME/
PRE input/ PRE library/ PRE script/
%%writefile NoVPC.yaml
AWSTemplateFormatVersion: 2010-09-09
Parameters:
UniquePostfix:
Type: String
Default: glueworkshop
Description: 'Enter a unique postfix value, must be all lower cases!'
S3Bucket:
Type: String
Default: s3://
Description: 'enter the S3 bucket path for workshop'
Resources:
AWSGlueServiceRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: !Join
- ''
- - AWSGlueServiceRole-
- !Ref UniquePostfix
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: glue.amazonaws.com
Action: 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
- 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole'
- 'arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess'
- 'arn:aws:iam::aws:policy/AmazonKinesisFullAccess'
Policies:
- PolicyName: "iam-passrole"
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action: 'iam:PassRole'
Resource: !Sub 'arn:aws:iam::${AWS::AccountId}:role/AWSGlueServiceRole-${UniquePostfix}'
AWSGlueServiceSageMakerNotebookRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: !Join
- ''
- - AWSGlueServiceSageMakerNotebookRole-
- !Ref UniquePostfix
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: sagemaker.amazonaws.com
Action: 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
- 'arn:aws:iam::aws:policy/service-role/AWSGlueServiceNotebookRole'
- 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess'
- 'arn:aws:iam::aws:policy/CloudWatchLogsFullAccess'
AWSGlueDataBrewServiceRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: !Join
- ''
- - AWSGlueDataBrewServiceRole-
- !Ref UniquePostfix
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: databrew.amazonaws.com
Action: 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
- 'arn:aws:iam::aws:policy/service-role/AWSGlueDataBrewServiceRole'
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Ref UniquePostfix
RetentionPeriodHours: 24
ShardCount: 2
GlueCatalogDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: !Join
- ''
- - !Ref UniquePostfix
- -cloudformation
Description: Database to tables for workshop
JsonStreamingTable:
DependsOn: GlueCatalogDatabase
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref GlueCatalogDatabase
TableInput:
Name: json-streaming-table
Description: Define schema for streaming json
TableType: EXTERNAL_TABLE
Parameters: { "classification": "json" }
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Columns:
- Name: "uuid"
Type: bigint
- Name: "country"
Type: string
- Name: "item type"
Type: string
- Name: "sales channel"
Type: string
- Name: "order priority"
Type: string
- Name: "order date"
Type: string
- Name: "region"
Type: string
- Name: "ship date"
Type: string
- Name: "units sold"
Type: int
- Name: "unit price"
Type: decimal
- Name: "unit cost"
Type: decimal
- Name: "total revenue"
Type: decimal
- Name: "total cost"
Type: decimal
- Name: "total profit"
Type: decimal
Parameters: {"endpointUrl": "https://kinesis.us-east-2.amazonaws.com", "streamName": !Ref UniquePostfix,"typeOfData": "kinesis"}
SerdeInfo:
Parameters: {"paths": "Country,Item Type,Order Date,Order Priority,Region,Sales Channel,Ship Date,Total Cost,Total Profit,Total Revenue,Unit Cost,Unit Price,Units Sold,uuid"}
SerializationLibrary: org.openx.data.jsonserde.JsonSerDe
JsonStaticTable:
DependsOn: GlueCatalogDatabase
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref GlueCatalogDatabase
TableInput:
Name: json-static-table
Description: Define schema for static json
TableType: EXTERNAL_TABLE
Parameters: { "classification": "json" }
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Columns:
- Name: "uuid"
Type: bigint
- Name: "country"
Type: string
- Name: "item type"
Type: string
- Name: "sales channel"
Type: string
- Name: "order priority"
Type: string
- Name: "order date"
Type: string
- Name: "region"
Type: string
- Name: "ship date"
Type: string
- Name: "units sold"
Type: int
- Name: "unit price"
Type: decimal
- Name: "unit cost"
Type: decimal
- Name: "total revenue"
Type: decimal
- Name: "total cost"
Type: decimal
- Name: "total profit"
Type: decimal
Location: !Join
- ''
- - !Ref S3Bucket
- input/lab4/json/
SerdeInfo:
Parameters: {"paths": "Country,Item Type,Order Date,Order Priority,Region,Sales Channel,Ship Date,Total Cost,Total Profit,Total Revenue,Unit Cost,Unit Price,Units Sold,uuid"}
SerializationLibrary: org.openx.data.jsonserde.JsonSerDe
GlueDevEndpoint:
Type: 'AWS::Glue::DevEndpoint'
Properties:
EndpointName: !Join
- ''
- - GlueSageMakerNotebook-
- !Ref UniquePostfix
Arguments:
GLUE_PYTHON_VERSION: 3
GlueVersion: 1.0
NumberOfWorkers: 4
WorkerType: Standard
RoleArn: !GetAtt AWSGlueServiceRole.Arn
ExtraPythonLibsS3Path: !Join
- ''
- - !Ref S3Bucket
- 'library/pycountry_convert.zip'
DependsOn: AWSGlueServiceRole
Outputs:
EndpointName:
Value: !Ref GlueDevEndpoint
Description: Endpoint created for Glue Workshop Lab.
!aws cloudformation create-stack --stack-name glueworkshop \
--template-body file://NoVPC.yaml \
--capabilities CAPABILITY_NAMED_IAM \
--parameters \
ParameterKey=UniquePostfix,ParameterValue=glueworkshop \
ParameterKey=S3Bucket,ParameterValue=s3://$BUCKET_NAME/
We will configure an AWS Glue crawler to scan and create metadata definitions in the Glue Data Catalog.
!head glue-workshop/data/lab1/csv/sample.csv
uuid,Country,Item Type,Sales Channel,Order Priority,Order Date,Region,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit 535113847,Azerbaijan,Snacks,Online,C,10/8/14,Middle East and North Africa,10/23/14,934,152.58,97.44,142509.72,91008.96,51500.76 874708545,Panama,Cosmetics,Offline,L,2/22/15,Central America and the Caribbean,2/27/15,4551,437.2,263.33,1989697.2,1198414.83,791282.37 854349935,Sao Tome and Principe,Fruits,Offline,M,12/9/15,Sub-Saharan Africa,1/18/16,9986,9.33,6.92,93169.38,69103.12,24066.26 892836844,Sao Tome and Principe,Personal Care,Online,M,9/17/14,Sub-Saharan Africa,10/12/14,9118,81.73,56.67,745214.14,516717.06,228497.08 129280602,Belize,Household,Offline,H,2/4/10,Central America and the Caribbean,3/5/10,5858,668.27,502.54,3914725.66,2943879.32,970846.34 473105037,Denmark,Clothes,Online,C,2/20/13,Europe,2/28/13,1149,109.28,35.84,125562.72,41180.16,84382.56 754046475,Germany,Cosmetics,Offline,M,3/31/13,Europe,5/3/13,7964,437.2,263.33,3481860.8,2097160.12,1384700.68 772153747,Turkey,Fruits,Online,C,3/26/12,Middle East and North Africa,4/7/12,6307,9.33,6.92,58844.31,43644.44,15199.87 847788178,United Kingdom,Snacks,Online,H,12/29/12,Europe,1/15/13,8217,152.58,97.44,1253749.86,800664.48,453085.38
Go to the AWS Glue console , click Databases on the left. You should see a database with name glueworkshop-cloudformation. This was created by the CloudFormation template we launched during workshop setup and contains two pre-defined tables that we will use later in Glue streaming lab.
Create another database with name glueworkshop by clicking Add Database and then clicking Create.
We will create 2 crawlers to crawl CSV and JSON folders.
Add Crawler for CSV folder
Add Crawler for JSON folder
AWS Glue provides multiple options to develop and test Spark code. Data engineers and data scientists can use tools of their choice to author Glue ETL scripts before deploying them to production. Data scientists can continue to work with Sagemaker notebooks connected to Glue Dev Endpoint, others can use Glue Job Notebooks to quickly launch and use jupyter-based fully-managed notebooks directly in browser. If you prefer to work locally, you can use Glue interactive sessions.
AWS Glue Studio Job Notebooks allows you to interactively author extract-transform-and-load (ETL) jobs in a notebook interface based on Jupyter Notebooks. AWS Glue Studio Job Notebooks requires minimal setup so developers can get started quickly, and feature one-click conversion of notebooks into AWS Glue data integration jobs. Notebooks also support live data integration, fast startup times, and built-in cost management. In this module you will learn how to create, configure and use AWS Glue Studio Job Notebooks to develop code that will be used as an independent Glue Job.
Create Glue Job Notebook
%idle_timeout 30
to ensure your session will automatically stop after 30 minutes of inactivity so you dont need to pay for idle resources.%number_of_workers 2
to reduce number of active workers.%extra_py_files "s3://${BUCKET_NAME}/library/pycountry_convert.zip"
- this will load 3rd party python library that we will use in next modules.We will see how you can use the development environment created to create and test ETL code. We will package and deploy the code we created to AWS Glue and execute it as a Glue job. Then we will use Glue Triggers and a Workflow to manage job executions.
We will now write some PySpark code. For each step you will need to copy the code block into a notebook cell and then click Run.
The first code we will write is some boiler-plate imports that will generally be included in the start of every Spark/Glue job and then an import statement for the 3rd party library.
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from pyspark import SparkContext
from pyspark.sql import SQLContext
from datetime import datetime
from pycountry_convert import (
convert_country_alpha2_to_country_name,
convert_country_alpha2_to_continent,
convert_country_name_to_country_alpha2,
convert_country_alpha3_to_country_alpha2,
)
We will define a UDF (user defined function) to use for processing a Spark dataframe. UDFs allow a developer to extend the standard Spark functionality using Python code. To do that your code needs to be in the form of a UDF lambda. The code below creates a Spark UDF udf_get_country_code2 to convert a country name into a two-letter code.
def get_country_code2(country_name):
country_code2 = 'US'
try:
country_code2 = convert_country_name_to_country_alpha2(country_name)
except KeyError:
country_code2 = ''
return country_code2
udf_get_country_code2 = udf(lambda z: get_country_code2(z), StringType())
Load the data from S3 into a Spark dataframe (named sparkDF) then check the schema of the Spark DataFrame.
sparkDF = spark.read.load("s3://${BUCKET_NAME}/input/lab2/sample.csv",
format="csv",
sep=",",
inferSchema="true",
header="true")
Now let's look at the data loaded by Spark. Compare this with the data you examined earlier and see if the schema inferred by Spark is the same as you what you saw earlier.
sparkDF.printSchema()
Next we will create a new dataframe that includes a column created using the UDF we created previously.
new_df = sparkDF.withColumn('country_code_2', udf_get_country_code2(col("Country")))
new_df.printSchema()
Let's take a look at the data in this new dataframe - notice the new column country_code_2. This contains two-letter country codes that were determined based on the Country column.
new_df.show(10)
So far, we have been running standard Spark code. Now, we will try some Glue-flavored code. Remember the Glue Data Catalog tables we created earlier? We will now load them into a Glue dynamic frame. After the data is loaded into a Glue dynamic frame, compare the schema it presented with the schema stored in the Glue Data Catalog table.
Glue dynamic frames differ from Spark dataframes because they have a flexible schema definition - hence the name dynamic. This enables each record to have a different schema which is computed on-the-fly. This is especially useful in handling messy data.
Notice here we don't need to specify an S3 location - this is because the Glue Data Catalog knows where the data lives thanks to the crawler we configured and ran earlier.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
dynaFrame = glueContext.create_dynamic_frame.from_catalog(database="glueworkshop", table_name="csv")
dynaFrame.printSchema()
Just as with the Spark dataframe, we can view the data in the Glue dynamic frame by calling the toDF function on it and then using the standard Spark show function.
dynaFrame.toDF().show(10)
Now we will package together the code snippets we have been testing and exploring on their own and create a Spark script for Glue. The code below is the combined and cleaned-up code from previous sections that we had run in the notebook. It is standard Spark code - not using anything Glue-specific except for the imports at the beginning.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from datetime import datetime
from pycountry_convert import (
convert_country_alpha2_to_country_name,
convert_country_alpha2_to_continent,
convert_country_name_to_country_alpha2,
convert_country_alpha3_to_country_alpha2,
)
def get_country_code2(country_name):
country_code2 = 'US'
try:
country_code2 = convert_country_name_to_country_alpha2(country_name)
except KeyError:
country_code2 = ''
return country_code2
udf_get_country_code2 = udf(lambda z: get_country_code2(z), StringType())
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3_bucket'])
s3_bucket = args['s3_bucket']
job_time_string = datetime.now().strftime("%Y%m%d%H%M%S")
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.load(s3_bucket + "input/lab2/sample.csv",
format="csv",
sep=",",
inferSchema="true",
header="true")
new_df = df.withColumn('country_code_2', udf_get_country_code2(col("Country")))
new_df.write.csv(s3_bucket + "/output/lab3/notebook/" + job_time_string + "/")
job.commit()
Run the following command to create a Glue ETL job glueworkshop-lab3-etl-job with the same Spark code we created earlier which is stored in s3://${BUCKET_NAME}/script/lab3/spark.py.
!aws glue create-job \
--name glueworkshop-lab3-etl-job \
--role AWSGlueServiceRole-glueworkshop \
--command "Name=glueetl,ScriptLocation=s3://${BUCKET_NAME}/script/lab3/spark.py,PythonVersion=3" \
--glue-version '2.0' \
--default-arguments "{\"--extra-py-files\": \"s3://${BUCKET_NAME}/library/pycountry_convert.zip\", \
\"--s3_bucket\": \"s3://${BUCKET_NAME}/\" }"
{ "Name": "glueworkshop-lab3-etl-job" }
!aws s3 sync s3://wys-glueworkshop/output/ glue-workshop/output
download: s3://wys-glueworkshop/output/lab3/20221124062626/part-00000-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv to glue-workshop/output/lab3/20221124062626/part-00000-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv download: s3://wys-glueworkshop/output/lab3/20221124062626/part-00002-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv to glue-workshop/output/lab3/20221124062626/part-00002-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv download: s3://wys-glueworkshop/output/lab3/20221124062626/part-00001-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv to glue-workshop/output/lab3/20221124062626/part-00001-60212af7-cbbc-4187-9b7c-d034d2241dd2-c000.csv
Follow the steps below to create a scheduled trigger to run the ETL job every hour.
Before creating a streaming ETL job, you must manually create a Data Catalog table that specifies the source data stream properties, including the data schema. This table is used as the data source for the streaming ETL job. We will use the Data Catalog table json-streaming-table created earlier by CloudFormation. This table's data source is AWS Kinesis and it has the schema definition of the JSON data we will send through the stream.
Go to the AWS Kinesis console and click Data streams on the left to open the UI for Kinesis Data Streams. You should see a data stream with name glueworkshop which was created by CloudFormation.
During the streaming processing, we will use a lookup table to convert a country name from the full name to a 2-letter country code. The lookup table data is stored in S3 at s3://${BUCKET_NAME}/input/lab4/country_lookup/.
Next, we will develop the code for the streaming job. Glue streaming is micro-batch based and the streaming job processes incoming data using a Tumbling Window method. All data inside a given window will be processed by a batch function. Inside the Glue Streaming job, the invocation of the tumbling window's function is shown below. The window functions is named batch_function and takes in the micro-batch dataframe, processing it at the window interval (20 seconds in this case).
glueContext.forEachBatch(frame=sourceData,
batch_function=processBatch,
options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_location})
Our goal is to use this Jupyter Notebook to develop and test a batch_function named processBatch. This function will process a given dataframe with the same schema as the streaming data inside our development environment.
Copy the following code to notebook cells.
Set up the environment and variables for the test.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from pyspark import SparkContext
from pyspark.sql import SQLContext
from datetime import datetime
glueContext = GlueContext(SparkContext.getOrCreate())
s3_bucket = "s3://${BUCKET_NAME}"
output_path = s3_bucket + "/output/lab4/notebook/"
job_time_string = datetime.now().strftime("%Y%m%d%H%M%S")
s3_target = output_path + job_time_string
Load the lookup dataframe from the S3 folder.
country_lookup_frame = glueContext.create_dynamic_frame.from_options(
format_options = {"withHeader":True, "separator":',', "quoteChar":"\""},
connection_type = "s3",
format = "csv",
connection_options = {"paths": [s3_bucket + "/input/lab4/country_lookup/"], "recurse":True},
transformation_ctx = "country_lookup_frame")
Here is the batch function body where we do type conversion and a look-up transformation on the incoming data.
def processBatch(data_frame, batchId):
if (data_frame.count() > 0):
dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
apply_mapping = ApplyMapping.apply(frame=dynamic_frame, mappings=[
("uuid", "string", "uuid", "bigint"),
("country", "string", "country", "string"),
("item type", "string", "item type", "string"),
("sales channel", "string", "sales channel", "string"),
("order priority", "string", "order priority", "string"),
("order date", "string", "order date", "string"),
("region", "string", "region", "string"),
("ship date", "string", "ship date", "string"),
("units sold", "int", "units sold", "int"),
("unit price", "string", "unit price", "decimal"),
("unit cost", "string", "unit cost", "decimal"),
("total revenue", "string", "total revenue", "decimal"),
("total cost", "string", "total cost", "decimal"),
("total profit", "string", "total profit", "decimal")],
transformation_ctx="apply_mapping")
final_frame = Join.apply(apply_mapping, country_lookup_frame, 'country', 'CountryName').drop_fields(
['CountryName', 'country', 'unit price', 'unit cost', 'total revenue', 'total cost', 'total profit'])
s3sink = glueContext.write_dynamic_frame.from_options(frame=final_frame,
connection_type="s3",
connection_options={"path": s3_target},
format="csv",
transformation_ctx="s3sink")
Now we will load some test data to test the batch function.
dynaFrame = glueContext.create_dynamic_frame.from_catalog(database="glueworkshop-cloudformation",
table_name="json-static-table")
processBatch(dynaFrame.toDF(), "12")
Check the output path of s3://${BUCKET_NAME}/output/lab4/notebook/ and you should see some new folders which are generated by the test script. Copy the following to your terminal. You should see a new folder with a recent timestamp. When you check the folder you should see some new files created by the test script.
aws s3 cp s3://${BUCKET_NAME}/output/ ~/environment/glue-workshop/output --recursive
import sys
from datetime import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3_bucket'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# S3 sink locations
output_path = args['s3_bucket'] + "/output/lab4/"
job_time_string = datetime.now().strftime("%Y%m%d%H%M%S")
s3_target = output_path + job_time_string
checkpoint_location = output_path + "checkpoint/"
temp_path = output_path + "temp/"
country_lookup_path = args['s3_bucket'] + "/input/lab4/country_lookup/"
country_lookup_frame = glueContext.create_dynamic_frame.from_options( \
format_options = {"withHeader"\:True, "separator":",", "quoteChar":"\""}, \
connection_type = "s3", \
format = "csv", \
connection_options = {"paths": [country_lookup_path], "recurse"\:True}, \
transformation_ctx = "country_lookup_frame")
def processBatch(data_frame, batchId):
if (data_frame.count() > 0):
dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
("uuid", "string", "uuid", "bigint"), \
("country", "string", "country", "string"), \
("item type", "string", "item type", "string"), \
("sales channel", "string", "sales channel", "string"), \
("order priority", "string", "order priority", "string"), \
("order date", "string", "order date", "string"), \
("region", "string", "region", "string"), \
("ship date", "string", "ship date", "string"), \
("units sold", "int", "units sold", "int"), \
("unit price", "string", "unit price", "decimal"), \
("unit cost", "string", "unit cost", "decimal"), \
("total revenue", "string", "total revenue", "decimal"), \
("total cost", "string", "total cost", "decimal"), \
("total profit", "string", "total profit", "decimal")],\
transformation_ctx = "apply_mapping")
final_frame = Join.apply(apply_mapping, country_lookup_frame, 'country', 'CountryName').drop_fields( \
['CountryName', 'country', 'unit price', 'unit cost', 'total revenue', 'total cost', 'total profit'])
s3sink = glueContext.write_dynamic_frame.from_options( frame = final_frame, \
connection_type = "s3", \
connection_options = {"path": s3_target}, \
format = "csv", \
transformation_ctx = "s3sink")
# Read from Kinesis Data Stream from catalog table
sourceData = glueContext.create_data_frame.from_catalog( \
database = "glueworkshop-cloudformation", \
table_name = "json-streaming-table", \
transformation_ctx = "datasource0", \
additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})
glueContext.forEachBatch(frame = sourceData, \
batch_function = processBatch, \
options = {"windowSize": "60 seconds", "checkpointLocation": checkpoint_location})
job.commit()
Run the following command in the Cloud9 terminal to create a Glue Streaming job with the name glueworkshop_lab4_glue_streaming and the Spark code we created earlier. This code is stored in s3://${BUCKET_NAME}/script/lab4/streaming.py.
!aws glue create-job \
--name glueworkshop_lab4_glue_streaming \
--role AWSGlueServiceRole-glueworkshop \
--command "Name=gluestreaming,ScriptLocation=s3://${BUCKET_NAME}/script/lab4/streaming.py,PythonVersion=3" \
--glue-version "2.0" \
--default-arguments "{\"--s3_bucket\": \"s3://${BUCKET_NAME}/\" }"
{ "Name": "glueworkshop_lab4_glue_streaming" }
cd glue-workshop
python code/lab4/PutRecord_Kinesis.py
aws s3 sync s3://${BUCKET_NAME}/output/ output
We will use a dataset from a public data lake for COVID-19 research and development hosted by AWS.
We will first create a DataBrew Dataset using a Glue crawler to explore the COVID-19 data stored in a Data Catalog table. In DataBrew, Dataset simply means a set of data—rows or records that are divided into columns or fields.
When you create a DataBrew project, you connect to or upload data that you want to transform or prepare. DataBrew can work with data from any source, imported from formatted files, and it connects directly to a growing list of data stores. In DataBrew, a Dataset is a read-only connection to your data.
DataBrew collects a set of descriptive metadata to refer to the data. No actual data can be altered or stored by DataBrew. For simplicity, we use the word dataset to refer to both the actual dataset and the metadata that DataBrew uses.
Follow the steps below to create a new DataBrew dataset
Once the new dataset is created, we will run the profiling job on the new dataset. When you profile your data, DataBrew creates a report called a data profile. This summary tells you about the existing shape of your data, including the context of the content, the structure of the data, and its relationships. You can make a data profile for any dataset by running a data profile job.
We will create a DataBrew project and a recipe to transform the dataset we created earlier.
Once the project is created, you will have a work area with the sample data displayed in the data grid. You can also check the schema and profile of the data by clicking SCHEMA and PROFILE in the upper right-corner of the grid. On the right of the screen is the recipe work area.
Next, we are going to create a simple recipe using built-in transformations.
dateChecked
death
deathIncrease
fips
hash
hospitalized
hospitalizedIncrease
negative
negativeIncrease
pending
positive
total
totalTestResults
positiveIncrease
totalTestResultsIncrease
positiveRate
As you proceed with developing your recipe, you can save your work by publishing the recipe. DataBrew maintains a list of published versions for your recipe. You can use any published version in a recipe job to transform your dataset. You can also download a copy of the recipe steps so you can reuse the recipe in other projects or other transformations.
Convert COVID testing data to time series of positive percentage in NY and CA
.- Action:
Operation: DELETE
Parameters:
sourceColumns: >-
["dateChecked","death","deathIncrease","fips","hash","hospitalized","negative","negativeIncrease","pending","positive","total","totalTestResults"]
- Action:
Operation: REMOVE_VALUES
Parameters:
sourceColumn: state
ConditionExpressions:
- Condition: IS_NOT
Value: '["NY","CA"]'
TargetColumn: state
- Action:
Operation: DATE_FORMAT
Parameters:
dateTimeFormat: yyyy-mm-dd
functionStepType: DATE_FORMAT
sourceColumn: date
targetColumn: date_formated
- Action:
Operation: CHANGE_DATA_TYPE
Parameters:
columnDataType: date
replaceType: REPLACE_WITH_NULL
sourceColumn: date_formated
- Action:
Operation: DELETE
Parameters:
sourceColumns: '["date"]'
- Action:
Operation: DIVIDE
Parameters:
functionStepType: DIVIDE
sourceColumn1: positiveIncrease
sourceColumn2: totalTestResultsIncrease
targetColumn: positiveRate
- Action:
Operation: MULTIPLY
Parameters:
functionStepType: MULTIPLY
sourceColumn1: positiveRate
targetColumn: positivePercentage
value2: '100'
- Action:
Operation: DELETE
Parameters:
sourceColumns: '["totalTestResultsIncrease","positiveRate","positiveIncrease"]'
- Action:
Operation: PIVOT
Parameters:
aggregateFunction: MEAN
sourceColumn: state
valueColumn: positivePercentage
Create a new DataBrew job
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output
download: s3://wys-glueworkshop/output/lab5/profile/covid-testing_9c3921a56e8b9e67b5dd03c847f957d1551d3fa280d9969dd2d05d476f3b6e19.json to glue-workshop/output/lab5/profile/covid-testing_9c3921a56e8b9e67b5dd03c847f957d1551d3fa280d9969dd2d05d476f3b6e19.json
We will be creating a new Glue Studio job. Start by opening the menu on the left and clicking Jobs.
Under Create job, select Blank graph.
Click Create.
Rename the job to glueworkshop-lab6-etl-job. Now you have a blank Glue Studio visual job editor. On the top of the editor are the tabs for different configurations.
Click Script tab, you should see an empty shell of Glue ETL script. As we add new steps in the Visual editor the script will be updated automatically. You will not ba able to edit the script inside Glue Studio.
Click Job Details tab you can see all job configurations.
Under IAM role select AWSGlueServiceRole-glueworkshop.
Under Job bookmark select Disable. Note: In a production environment, you probably want to enable the bookmark. We will disable it for this lab so we can reuse the test dataset.
You don't need to change any other settings here, but you should take some time to explore what settings are available in this tab. When you are done exploring, click Save on the upper right to save the changed settings.
Click the Visual tab again to go back to visual editor. You should see 3 dropdown buttons: Source, Transform, and Target. Click Transform. You will notice there are a limited number of transformations compared to what Glue DataBrew offers as we have seen in a previous lab. This is because Glue Studio is designed to be used by developers who write Apache Spark code but want to leverage Glue Studio for job orchestration and monitoring. In a later section of this lab, we will demonstrate how to develop custom code in Glue Studio.
Click Source dropdown icon and select S3,
Click Transform dropdown icon and select Custom transform. Copy the code below into code block on the right.
def DeleteFields (glueContext, dfc) -> DynamicFrameCollection:
sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
sparkDF.createOrReplaceTempView("covidtesting")
df = spark.sql("select date, \
state , \
positiveIncrease , \
totalTestResultsIncrease \
from covidtesting")
dyf = DynamicFrame.fromDF(df, glueContext, "results")
return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)
Click the Output Schema tab on the right then click Edit. Leave only the following column in the output schema then click Apply.
date
state
positiveincrease
totaltestresultsincrease
Click Transform dropdown and select SelectFromCollection. Make sure under Transform tab that the Frame index value is 0.
Click the Target dropdown and select S3. Under the Data target properties - S3 tab set the S3 Target Location value to s3://${BUCKET_NAME}/output/lab6/json/temp1/ then replace ${BUCKET_NAME} with your own S3 bucket name.
Click the Save button in the upper right corner then click the Run button next to it.
You can use the command below to copy the output files to your environment and explore the content. You will see the output JSON file only contains 4 fields - date, state, positiveIncrease and totalTestResultsIncrease.
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output
download: s3://wys-glueworkshop/output/lab6/json/temp1/run-1669278265659-part-r-00000 to glue-workshop/output/lab6/json/temp1/run-1669278265659-part-r-00000
We used a Custom Transformation Node in last section of the lab. Here we will have a deep dive into the Custom Transformation Node and provide guidelines on how to develop scripts for Custom Transformation Node.
A Custom Transformation Node can have any number of parent nodes, each providing a DynamicFrame as an input. A Custom Transformation Node returns a collection of DynamicFrames. Each DynamicFrame that is used as input has an associated schema. You must add a schema that describes each DynamicFrame returned by the custom code node.
A Custom Transformation Node's script looks like the following. The input will be a GlueContext and a DynamicFrameCollection. The DynamicFrameCollection contains 1 to n DynamicFrame and at the start of the script we will get an Apache DataFrame from each of the DynamicFrame. Then transformations will be performed and the resulting Apache DataFrames will be converted back to DynamicFrame and returned in a DynamicFrameCollection.
def CustomTransform (glueContext, dfc) -> DynamicFrameCollection:
df0 = dfc.select(list(dfc.keys())[0]).toDF()
df1 = dfc.select(list(dfc.keys())[1]).toDF()
...
# do transformation on the Spark DataFrame df0, df1, ...
...
# The result DataFrames named have names like resultDF0, resultDF1, ... in the end
# Convert them to DynamicFrame and return in a DynamicFrameCollection
dyf0 = DynamicFrame.fromDF(resultDF, glueContext, "result0")
dyf1 = DynamicFrame.fromDF(resultDF, glueContext, "result1")
...
return(DynamicFrameCollection( {
"CustomTransform0": dyf0,
"CustomTransform1": dyf1,
...
},
glueContext))
Click on the data target node then click Remove at the top of the visual editor to remove it from the graph.
Click the Transform dropdown icon and select Custom transform. If the new node is not connected to the existing SelectFromCollection node, click Node properties and select it in the Node parents dropdown. Then copy the code below to the Code block field under Transform tab.
def ConvertDateStringToDate (glueContext, dfc) -> DynamicFrameCollection:
sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
sparkDF.createOrReplaceTempView("inputTable")
df = spark.sql("select TO_DATE(CAST(UNIX_TIMESTAMP(date, 'yyyyMMdd') AS TIMESTAMP)) as date, \
state , \
positiveIncrease , \
totalTestResultsIncrease \
from inputTable")
dyf = DynamicFrame.fromDF(df, glueContext, "results")
return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)
Click the Output schema tab and click Edit. Change the data type of date from string to date, then click Apply.
Click the Transform dropdown icon and select SelectFromCollection. Make sure under the Transform tab, the Frame index value is 0.
Note: At this point we have showed the full process of developing scripts for Custom Transformation Node. You can repeat this process as you develop more custom scripts using Spark SQL. In later part of this section, we will add more Custom Transformation Nodes and finish the ETL job.
Click the Transform dropdown icon and select Custom transform. Copy the code below to the Code block field under the Transform tab.
def FilterAndCalculatePercentage (glueContext, dfc) -> DynamicFrameCollection:
sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
sparkDF.createOrReplaceTempView("inputTable")
df = spark.sql("select date , \
state , \
(positiveIncrease * 100 / totalTestResultsIncrease) as positivePercentage \
from inputTable \
where state in ('NY', 'CA')")
dyf = DynamicFrame.fromDF(df, glueContext, "results")
return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)
Click the Output schema tab then click Edit. Remove columns positiveIncrease and totalTestResultsIncrease. Add a new column named positivePercentage with type double by clicking ... and then Add root key. Finally, click Apply.
Click the Transform dropdown icon and choose SelectFromCollection. Make sure under the Transform tab the Frame index value is 0.
Click the Transform dropdown icon and select Custom transform. Copy the code below to the Code block field under Transform tab.
def PivotValue (glueContext, dfc) -> DynamicFrameCollection:
sparkDF = dfc.select(list(dfc.keys())[0]).toDF()
sparkDF.createOrReplaceTempView("inputTable")
df = spark.sql("select * from inputTable \
pivot (avg(positivePercentage) as positivePercentage \
for state in ('NY' as positivePercentageNY, 'CA' as positivePercentageCA))")
dyf = DynamicFrame.fromDF(df, glueContext, "results")
return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)
Click the Output schema tab then click Edit. Remove columns state and positivePercentage and add 2 new columns positivePercentageNY and positivePercentageCA with type double, then click Apply.
Click the Transform dropdown icon and select SelectFromCollection. Make sure under Transform tab the Frame index value is 0.
Click the Target dropdown icon and select S3. Under the Data target properties - S3 tab, set the S3 Target Location value to s3://${BUCKET_NAME}/output/lab6/json/finalResult/.
Click the Save button on the upper right corner and then click Run.
You can use the command below to copy the output files to your environment and explore the content. You will see that the output JSON file only contains 3 fields date, positivePercentageNY and positivePercentageCA.
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output
We will see how to create a Glue streaming ETL job with Glue Studio.
order priority
order date
region
ship date
units sold
unit price
unit cost
CountryName
cd glue-workshop
python code/lab4/PutRecord_Kinesis.py
!aws s3 sync s3://$BUCKET_NAME/output/ glue-workshop/output
Execute the commands below to delete the resources. If you choose to use names that are different than the default names provided in the labs, please modify the names match your own resources in the command below.
Delete the DataBrew project, jobs, and dataset.
!aws databrew delete-job --name covid-testing-recipe-job
!aws databrew delete-job --name covid-testing-profile-job
!aws databrew delete-project --name covid-testing
!aws databrew delete-dataset --name covid-testing
Find the recipe versions in the recipe store and delete the versions list by above cli command.
!aws databrew list-recipe-versions --name covid-testing-recipe
Replace the recipe version number with what you find from command above.
!aws databrew batch-delete-recipe-version --name covid-testing-recipe --recipe-version "1.0"
Delete the Glue Data Catalog database, crawlers, jobs, and triggers created manually.
!aws glue delete-job --job-name glueworkshop-lab3-etl-job
!aws glue delete-job --job-name glueworkshop_lab4_glue_streaming
!aws glue delete-job --job-name glueworkshop-lab6-streaming-job
!aws glue delete-job --job-name glueworkshop-lab6-etl-job
!aws glue delete-trigger --name glueworkshop-lab3-etl-job-trigger
!aws glue delete-crawler --name lab1-csv
!aws glue delete-crawler --name covid-testing
!aws glue delete-database --name glueworkshop
Delete the CloudFormation stack.
!aws cloudformation delete-stack --stack-name glueworkshop
Delete the S3 bucket.
!aws s3 rb s3://$BUCKET_NAME --force