#!/usr/bin/env python # coding: utf-8 # ## Direct PUT Pipeline # Direct PUT is a method to send data directly from the clients to Kinesis Data Firehose. In this part, you'll create a Firehose Delivery Stream and will use a script to send data to Firehose with Direct PUT using AWS SDK for Python (boto3). Firehose receives the records and delivers them to S3 into a configured bucket/folder and partitions the incoming records based on the their arrival date and time. # ### Create Stream # Bucket prefix: # # ``` # data/webaccess/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/ # ``` # # Error output prefix # # ``` # error/webaccess/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/ # ``` # # Buffer hint interval: # # ``` # 60 seconds # ``` # # Compression: # # ``` # GZIP # ``` # ![firehose_config](https://user-images.githubusercontent.com/62965911/214810198-c7de4084-11fd-4754-a395-d26ac19d8635.png) # ## Send Data # In[1]: import boto3 import gzip import time # In[2]: session = boto3.Session(profile_name='default', region_name='us-east-1') firehose = session.client('firehose') # In[ ]: get_ipython().system('aws s3 cp s3://wysde-datasets/http.log.gz http.log.gz') # In[4]: input_file = "http.log.gz" # Input log file, default is http.log num_messages = 100 # Number of messages to send, 0 for inifnite output_stream = "PUT-S3-wysde2" # Firehose Stream name # In[5]: print(f"Sending {num_messages} messages to {output_stream}...") sent = 0 with gzip.open(input_file, "rt") as f: line = f.readline() while line: msg = line.strip() + "\n" firehose.put_record( DeliveryStreamName=output_stream, Record={ 'Data': msg } ) line = f.readline() sent += 1 if sent % 100 == 0: print(f"{sent} sent") if sent >= num_messages and num_messages > 0: break; time.sleep(0.01) # The script starts sending simulated web access logs to firehose. It will stop after 10000 messages. You can run it again to send more messages. # ### Check Ingested data in S3 # ![s3put1](https://user-images.githubusercontent.com/62965911/214810368-e586bd3f-7360-4121-b204-16f04edfb71b.png) # ## Send via Kinesis Data Streams # ### Create Data Stream # ![kdstream_config](https://user-images.githubusercontent.com/62965911/214810270-09dd8186-08bd-40fc-9c4c-87eac8b149f3.png) # ### Create Firehose Delivery Stream # Bucket prefix: # # ``` # data/transactions/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/ # ``` # # Error output prefix # # ``` # error/transactions/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/ # ``` # # Buffer hint interval: # # ``` # 60 seconds # ``` # # Compression: # # ``` # GZIP # ``` # ![dsfh](https://user-images.githubusercontent.com/62965911/214810154-c92222cd-e930-45d4-b4ce-e15e05c69e36.png) # ### Set up Amazon Kinesis Data Generator # Go to https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html and follow the instructions to create a new Kinesis Data Generator via CloudFormation. On the CloudFormation outputs tab, You will get a URL. Go there and login with the user id and password that you provided in CloudFormation. # # Alt: Directly use this `https://aws-kdg-tools.s3.us-west-2.amazonaws.com/cognito-setup.json` template. # Use this template: # # ```json # { # "customerId": "{{random.number(50)}}", # "transactionAmount": "{{random.number( # { # "min":10, # "max":150 # } # )}}", # "sourceIp" : "{{internet.ip}}", # "status": "{{random.weightedArrayElement({ # "weights" : [0.8,0.1,0.1], # "data": ["OK","FAIL","PENDING"] # } # )}}", # "transactionTime": "{{date.now}}" # } # ``` # ![cdgcf](https://user-images.githubusercontent.com/62965911/214810148-a51b9811-ddee-4899-8aed-7bc5dda41a59.png) # ![kdg](https://user-images.githubusercontent.com/62965911/214810260-a99ab853-6d6b-41b1-befd-8b818cd3cecd.png) # ### Check Data # ![dsout](https://user-images.githubusercontent.com/62965911/214810179-59b277f8-4333-4aca-8eb6-1da705765035.png)