Direct PUT is a method to send data directly from the clients to Kinesis Data Firehose. In this part, you'll create a Firehose Delivery Stream and will use a script to send data to Firehose with Direct PUT using AWS SDK for Python (boto3). Firehose receives the records and delivers them to S3 into a configured bucket/folder and partitions the incoming records based on the their arrival date and time.
Bucket prefix:
data/webaccess/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
Error output prefix
error/webaccess/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
Buffer hint interval:
60 seconds
Compression:
GZIP
import boto3
import gzip
import time
session = boto3.Session(profile_name='default', region_name='us-east-1')
firehose = session.client('firehose')
!aws s3 cp s3://wysde-datasets/http.log.gz http.log.gz
input_file = "http.log.gz" # Input log file, default is http.log
num_messages = 100 # Number of messages to send, 0 for inifnite
output_stream = "PUT-S3-wysde2" # Firehose Stream name
print(f"Sending {num_messages} messages to {output_stream}...")
sent = 0
with gzip.open(input_file, "rt") as f:
line = f.readline()
while line:
msg = line.strip() + "\n"
firehose.put_record(
DeliveryStreamName=output_stream,
Record={
'Data': msg
}
)
line = f.readline()
sent += 1
if sent % 100 == 0:
print(f"{sent} sent")
if sent >= num_messages and num_messages > 0:
break;
time.sleep(0.01)
Sending 100 messages to PUT-S3-wysde2... 100 sent
The script starts sending simulated web access logs to firehose. It will stop after 10000 messages. You can run it again to send more messages.
Bucket prefix:
data/transactions/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
Error output prefix
error/transactions/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
Buffer hint interval:
60 seconds
Compression:
GZIP
Go to https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html and follow the instructions to create a new Kinesis Data Generator via CloudFormation. On the CloudFormation outputs tab, You will get a URL. Go there and login with the user id and password that you provided in CloudFormation.
Alt: Directly use this https://aws-kdg-tools.s3.us-west-2.amazonaws.com/cognito-setup.json
template.
Use this template:
{
"customerId": "{{random.number(50)}}",
"transactionAmount": "{{random.number(
{
"min":10,
"max":150
}
)}}",
"sourceIp" : "{{internet.ip}}",
"status": "{{random.weightedArrayElement({
"weights" : [0.8,0.1,0.1],
"data": ["OK","FAIL","PENDING"]
}
)}}",
"transactionTime": "{{date.now}}"
}