# list
kafka-topics.sh --bootstrap-server localhost:9092 --list
# create
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-topic
# create with partitions and replication factor
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second-topic --partitions 3 --replication-factor 1
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic third-topic --partitions 3 --replication-factor 1
# list
kafka-topics.sh --bootstrap-server localhost:9092 --list
# describe
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first-topic
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second-topic
# delete
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic third-topic
# produce messages
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
# produce messages with keys
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic --property "parse.key=true" --property "key.separator=:"
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic wysde3 --property "parse.key=true" --property "key.separator=:"
# produce messages with ngrok
ngrok tcp 9092
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
kafka-console-consumer.sh --bootstrap-server tcp://<>:10147 --topic sparsh-study
# consume messages
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic
# consume messages with ngrok
kafka-console-consumer.sh --bootstrap-server tcp://6.tcp.ngrok.io:15178 --topic first-topic
# consume messages with beginning offset
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --from-beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-localhost-python --from-beginning
# consume messages with timestamp
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-localhost-python --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --from-beginning
# consume messages with consumer group
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group first-consumer-group
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning --group first-consumer-group
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wysde3 --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning --group wysde
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wysde3 --from-beginning --group wysde
Consumer Group - Multiple Consumers - Each consumer connect to a parition.
# list consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# describe consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group first-consumer-group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group wysde
# reset offsets of consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group first-consumer-group --reset-offsets --to-earliest --execute --all-topics
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group first-consumer-group --reset-offsets --shift-by 2 --execute --topic first-topic
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group first-consumer-group --reset-offsets --shift-by -2 --execute --topic first-topic