In this blog post I will cover some of the basics of Apache Kafka and few useful commands to keep handy.
Introduction
Apache Kafka is an open-source distributed event streaming platform. It allows you to decouple your source system and target system. It is optimized for ingesting and processing streaming data in real-time. Due to its distributed nature, it provides high throughput, scalability, resilient architecture and fault tolerance. Producers can publish and consumers can subscribe to streams of records.
Few use-cases of Apache Kafka are:
- Events/Messaging system
- Application logs gathering
- Stream processing
- Metrics gathering
At a high level, below are some of important terms to know about in Apache Kafka –
Topic – A topic is a stream of data.
- You can consider it to be similar to table in a database.
- Topic should have a replication factor > 1.
- When creating a topic – number of partitions and replication-factor is mandatory
Partitions – Topics are split in partitions and each partitions contains messages.
- Messages within a partition has incremental id, called – Offset.
- Each partition has offset starting with zero and incrementing by default.
- Offset only has a meaning for a specific partition.
- The order of messages is only guaranteed within a partition.
- Store streams of records in the order in which records were generated.
- Messages are appended to a topic-partition in the order they are sent.
- Data is retained for certain period of time.
- Data is immutable in partition.
- Data will be ingested in partition in round-robin fashion unless key is provided.
Broker – A Kafka cluster is composed of multiple brokers.
- You can think of broker as individual server/node.
- After connecting to any broker you will be connected to the entire cluster.
- Topic’s partitions are distributed across broker. Each broker will contain only certain topic partitions.
- At any given time only 1 broker can be a leader for a given partition.
- Only that leader can receive and serve data for a partition.
- The leader and in-sync replica is determined by ZooKeeper.
Zookeeper – It is used to store Kafka cluster metadata information. It keeps track of status of the Kafka cluster nodes, Kafka topics, partitions
- Manages brokers
- Helps in performing leader election for partitions
- It has a leader and the rest of the servers are followers
- With Apache Kafka 2.8 version, you have option to use KIP-500 that removes the Apache Zookeeper dependency.
- In KIP-500, Kafka relies on an internal Raft quorum that can be activated through Kafka Raft metadata mode.
- When Kafka Raft Metadata mode is enabled, Kafka will store its metadata and configurations into a topic called @metadata.
Producers – A producer writes data to topics.
- Producer can choose to receive acknowledgement of data writes
- acks=0: Producer won’t wait for acknowledgment
- acks=1 : Default. Producers wait for leader acknowledgment
- acks=all : Producer waits for leader + replicas acknowledgment
- Producer can choose to send a key with the message to write to specific partition. If key is null, data is written in round robin.
- If key is sent then all messages for that key will be written to the same partition provided number of partitions remains constant.
Consumers – Consumers read data from topic.
- Data is read in order within each partitions.
- Kafka stores the offsets at which a consumer group has been reading. The offsets committed live in a Kafka topic names ___consumer_offsets.
- If a consumer dies, it will be able to read back from where it left off based on the committed consumer offsets.
- Consumers choose when to commit offsets.
- At most once – offsets are committed as soon as the message is received. If processing goes wrong, message is lost.
- At least once – offsets are committed after the message is processed. If processing goes wrong, message will be read again.
- By default, consumer reads from the point when you launch it and will only intercept the new messages.
- Consumers read messages in the order stored in topic’s partition.
I have setup Amazon MSK which makes it easy to ingest and process streaming data in real time with fully managed Apache Kafka. The MSK cluster is deployed to a VPC in a Private subnet and has 3 brokers and it provides TLS, Plaintext client integration details by default. Using Amazon MSK eliminates operational overhead of provisioning, configuration, and maintenance of Apache Kafka and Kafka Connect clusters.
Document – https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html


Basic Commands
Note: I have “/home/ec2-user/kafka/bin/” set in PATH
Create Kafka topics with Partitions:
kafka-topics.sh --create \
--zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181,z-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181,z-2.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" \
--replication-factor 2 \
--partitions 10 \
--topic usage_event \
--config retention.ms=345600000
Deleting Kafka topics:
kafka-topics.sh --delete --zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" --topic usage_event
Listing Kafka topics:
kafka-topics.sh --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" --list
Describe Kafka topics:
kafka-topics.sh --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" --topic
usage_event
--describe
Data within each Kafka Partitions:
Total message count (all partitions combined) –
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list "b-3.mskcluster.ab1cde
.c10.kafka.us-west-2.amazonaws.com:9092" \
--topicusage_event
| awk -F ":" '{sum+= $3} END {print sum}'
Total message count per partition –
kafka-run-class.sh kafka.tools.GetOffsetShell \
-
-broker-list "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--topic usage_event | awk -F ":" '{print $3}'
Kafka Topics Settings:
kafka-topics.sh \
--zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" \
--describe --topics-with-overrides
Modify Kafka Topics Settings:
kafka-configs.sh \ --zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" \ --entity-type topics \ --entity-name usage_event --alter --add-config retention.ms=5259600000
Get Kafka Topic Partition Offset:
kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \ --topic usage_event
Kafka Consumer:
Read all messages from beginning
kafka-console-consumer.sh \ --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \ --topic usage_event \ --from-beginning
Read maximum of 10 messages from beginning –
kafka-console-consumer.sh \ --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \ --topic usage_event \ --from-beginning \ --max-messages 10
Read message from specific partition and based of specific offset
kafka-console-consumer.sh \ --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \ --topic mytopic \ --offset 10 \ --partition 0
Delete Records from Kafka Topic:
It is not possible to delete messages in the middle of the topic. You can delete the messages in a partition from the beginning until an offset. To do so, create a json file specifying the partition and offset, as shown below –
cat delete-records.json
{
"partitions": [
{
"topic": "usage_event",
"partition": 1,
"offset": 6
}
],
"version": 1
}
kafka-delete-records \ --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \ --offset-json-file delete-records.json Executing records delete operation Records delete operation completed: partition: usage_event-0 low_watermark: 6
Performance Test:
Producer Performance test –
kafka-producer-perf-test.sh \ --topic usage_event \ --throughput -1 \ --num-records 100000 \ --record-size 1024 \ --producer-props acks=all bootstrap.servers=b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-2.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:909 \ --producer.config /tmp/kafka/producer.properties_msk
Command output –
100000 records sent, 14714.537964 records/sec (14.37 MB/sec), 1467.41 ms avg latency, 2535.00 ms max latency, 1300 ms 50th, 2431 ms 95th, 2519 ms 99th, 2533 ms 99.9th.
Consumer Performance test –
kafka-consumer-perf-test.sh \ --topic usage_event \ --broker-list b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-2.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092 \ --messages 10000 \ --consumer.config /tmp/kafka/consumer.properties | \ jq -R .|jq -sr 'map(./",")|transpose|map(join(": "))[]'
Command output –
[2021-10-24 06:42:47,333] WARN The configuration 'SSL_KEYSTORE_PASSWORD_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) [2021-10-24 06:42:47,333] WARN The configuration 'BOOTSTRAP_SERVERS_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) [2021-10-24 06:42:47,333] WARN The configuration 'SSL_KEY_PASSWORD_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) [2021-10-24 06:42:47,333] WARN The configuration 'SCHEMA_REGISTRY_URL_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) start.time: 2021-10-24 06:42:47:337 end.time: 2021-10-24 06:42:50:888 data.consumed.in.MB: 9.8096 MB.sec: 2.7625 data.consumed.in.nMsg: 10045 nMsg.sec: 2828.7806 rebalance.time.ms: 1635057770697 fetch.time.ms: -1635057767146 fetch.MB.sec: -0.0000 fetch.nMsg.sec: -0.0000
Hope you find this helpful.