%%capture
!sudo apt-get update
!sudo apt-get remove docker docker-engine docker.io
!sudo apt install docker.io
!sudo systemctl start docker
!sudo systemctl enable docker
!curl -L https://raw.githubusercontent.com/docker/compose-cli/main/scripts/install/install_linux.sh | sh
!docker --version
Docker version 20.10.7, build 20.10.7-0ubuntu5~18.04.3
%%capture
!mkdir -p ~/.aws && cp /content/drive/MyDrive/AWS/d01_admin/* ~/.aws
!chmod 600 ~/.aws/credentials
!pip install awscli
!aws --version
aws-cli/1.25.39 Python/3.7.13 Linux/5.4.188+ botocore/1.27.39
!docker context create ecs myecscontext
!docker context use myecscontext
myecscontext
%%writefile mycomposefile.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.2
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
kafka0:
image: confluentinc/cp-kafka:5.2.2
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka0:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
depends_on:
- "zookeeper"
postgres:
image: postgres:11
ports:
- "5432:5432"
restart: always
environment:
POSTGRES_USER: "cta_admin"
POSTGRES_PASSWORD: "chicago"
POSTGRES_DB: "cta"
volumes:
- postgres_volume:/var/lib/postgresql/data/
volumes:
postgres_volume:
Overwriting mycomposefile.yml
!docker compose --file mycomposefile.yml up
!docker compose --file mycomposefile.yml logs
%%capture
!pip install psycopg2-binary
!pip install ipython-sql
%reload_ext sql
import os
import glob
import psycopg2
import pandas as pd
HOST = "52.87.211.208"
conn = psycopg2.connect(f"host={HOST} dbname=cta user=cta_admin password=chicago")
cur = conn.cursor()
user_table_create = ("""
CREATE TABLE IF NOT EXISTS users (
user_id INT PRIMARY KEY,
first_name VARCHAR,
last_name VARCHAR,
gender CHAR(1),
level VARCHAR
)
""")
cur.execute(user_table_create)
conn.commit()
user_data = ['88', 'Mohammad', 'Rodriguez', 'M', 'paid']
user_table_insert = ("""
INSERT INTO users (user_id, first_name, last_name, gender, level)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_id) DO UPDATE SET level = EXCLUDED.level
""")
cur.execute(user_table_insert, user_data)
conn.commit()
%sql postgresql://cta_admin:chicago@{HOST}/cta
'Connected: cta_admin@cta'
%sql SELECT * FROM users LIMIT 5;
* postgresql://cta_admin:***@52.87.211.208/cta 1 rows affected.
user_id | first_name | last_name | gender | level |
---|---|---|---|---|
88 | Mohammad | Rodriguez | M | paid |
!curl -sSOL https://downloads.apache.org/kafka/3.2.0/kafka_2.12-3.2.0.tgz
!tar -xzf kafka_2.12-3.2.0.tgz
!./kafka_2.12-3.2.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.2.0/config/zookeeper.properties
!./kafka_2.12-3.2.0/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.2.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10
Waiting for 10 secs until kafka and zookeeper services are up and running
!sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --version
3.2.0 (Commit:38103ffaa962ef50)
!sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server 54.144.191.135:9092 --list
!sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server 54.144.191.135:9092 --create --topic first-topic
Created topic first-topic.
!sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server 54.144.191.135:9092 --list
first-topic
!pip install kafka-python
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
bootstrap_servers="54.144.191.135:9092"
topic_name="kafka-ecs-python"
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('ascii'),
key_serializer=lambda v: json.dumps(v).encode('ascii')
)
producer.send(
topic_name,
key={"id":1},
value={"name":"👨 Francesco", "pizza":"Margherita 🍕"}
)
producer.flush()
group_id = "my_pizza_group"
consumer = KafkaConsumer(
bootstrap_servers = bootstrap_servers,
group_id = group_id,
auto_offset_reset='earliest',
value_deserializer = lambda v: json.loads(v.decode('ascii')),
key_deserializer = lambda v: json.loads(v.decode('ascii')),
max_poll_records = 10
)
consumer.topics()
{'first-topic', 'kafka-ecs-python'}
consumer.subscribe(topics=[topic_name])
consumer.subscription()
{'kafka-ecs-python'}
for message in consumer:
print ("%d:%d: k=%s v=%s" % (message.partition,
message.offset,
message.key,
message.value))
break;
0:0: k={'id': 1} v={'name': '👨 Francesco', 'pizza': 'Margherita 🍕'}