%%capture !sudo apt-get update !sudo apt-get remove docker docker-engine docker.io !sudo apt install docker.io !sudo systemctl start docker !sudo systemctl enable docker !curl -L https://raw.githubusercontent.com/docker/compose-cli/main/scripts/install/install_linux.sh | sh !docker --version %%capture !mkdir -p ~/.aws && cp /content/drive/MyDrive/AWS/d01_admin/* ~/.aws !chmod 600 ~/.aws/credentials !pip install awscli !aws --version !docker context create ecs myecscontext !docker context use myecscontext %%writefile mycomposefile.yml version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:5.2.2 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: "2181" kafka0: image: confluentinc/cp-kafka:5.2.2 ports: - "9092:9092" environment: KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka0:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" depends_on: - "zookeeper" postgres: image: postgres:11 ports: - "5432:5432" restart: always environment: POSTGRES_USER: "cta_admin" POSTGRES_PASSWORD: "chicago" POSTGRES_DB: "cta" volumes: - postgres_volume:/var/lib/postgresql/data/ volumes: postgres_volume: !docker compose --file mycomposefile.yml up !docker compose --file mycomposefile.yml logs %%capture !pip install psycopg2-binary !pip install ipython-sql %reload_ext sql import os import glob import psycopg2 import pandas as pd HOST = "52.87.211.208" conn = psycopg2.connect(f"host={HOST} dbname=cta user=cta_admin password=chicago") cur = conn.cursor() user_table_create = (""" CREATE TABLE IF NOT EXISTS users ( user_id INT PRIMARY KEY, first_name VARCHAR, last_name VARCHAR, gender CHAR(1), level VARCHAR ) """) cur.execute(user_table_create) conn.commit() user_data = ['88', 'Mohammad', 'Rodriguez', 'M', 'paid'] user_table_insert = (""" INSERT INTO users (user_id, first_name, last_name, gender, level) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (user_id) DO UPDATE SET level = EXCLUDED.level """) cur.execute(user_table_insert, user_data) conn.commit() %sql postgresql://cta_admin:chicago@{HOST}/cta %sql SELECT * FROM users LIMIT 5; !curl -sSOL https://downloads.apache.org/kafka/3.2.0/kafka_2.12-3.2.0.tgz !tar -xzf kafka_2.12-3.2.0.tgz !./kafka_2.12-3.2.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.2.0/config/zookeeper.properties !./kafka_2.12-3.2.0/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.2.0/config/server.properties !echo "Waiting for 10 secs until kafka and zookeeper services are up and running" !sleep 10 !sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --version !sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server 54.144.191.135:9092 --list !sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server 54.144.191.135:9092 --create --topic first-topic !sh ./kafka_2.12-3.2.0/bin/kafka-topics.sh --bootstrap-server 54.144.191.135:9092 --list !pip install kafka-python from kafka import KafkaProducer from kafka import KafkaConsumer import json bootstrap_servers="54.144.191.135:9092" topic_name="kafka-ecs-python" producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('ascii'), key_serializer=lambda v: json.dumps(v).encode('ascii') ) producer.send( topic_name, key={"id":1}, value={"name":"👨 Francesco", "pizza":"Margherita 🍕"} ) producer.flush() group_id = "my_pizza_group" consumer = KafkaConsumer( bootstrap_servers = bootstrap_servers, group_id = group_id, auto_offset_reset='earliest', value_deserializer = lambda v: json.loads(v.decode('ascii')), key_deserializer = lambda v: json.loads(v.decode('ascii')), max_poll_records = 10 ) consumer.topics() consumer.subscribe(topics=[topic_name]) consumer.subscription() for message in consumer: print ("%d:%d: k=%s v=%s" % (message.partition, message.offset, message.key, message.value)) break;