Apache Kafka – Introduction & Basic Commands

In this blog post I will cover some of the basics of Apache Kafka and few useful commands to keep handy.

Introduction

Apache Kafka is an open-source distributed event streaming platform. It allows you to decouple your source system and target system. It is optimized for ingesting and processing streaming data in real-time.  Due to its distributed nature, it provides high throughput, scalability, resilient architecture and fault tolerance. Producers can publish and consumers can subscribe to streams of records.

Few use-cases of Apache Kafka are:

  1. Events/Messaging system
  2. Application logs gathering
  3. Stream processing
  4. Metrics gathering

At a high level, below are some of important terms to know about in Apache Kafka –

Topic – A topic is a stream of data.

  • You can consider it to be similar to table in a database.
  • Topic should have a replication factor > 1.
  • When creating a topic – number of partitions and replication-factor is mandatory

Partitions – Topics are split in partitions and each partitions contains messages.

  • Messages within a partition has incremental id, called – Offset.
  • Each partition has offset starting with zero and incrementing by default.
  • Offset only has a meaning for a specific partition.
  • The order of messages is only guaranteed within a partition.
  • Store streams of records in the order in which records were generated.
  • Messages are appended to a topic-partition in the order they are sent.
  • Data is retained for certain period of time.
  • Data is immutable in partition.
  • Data will be ingested in partition in round-robin fashion unless key is provided.

Broker – A Kafka cluster is composed of multiple brokers.

  • You can think of broker as individual server/node.
  • After connecting to any broker you will be connected to the entire cluster.
  • Topic’s partitions are distributed across broker. Each broker will contain only certain topic partitions.
  • At any given time only 1 broker can be a leader for a given partition.
  • Only that leader can receive and serve data for a partition.
  • The leader and in-sync replica is determined by ZooKeeper.

Zookeeper –  It is used to store Kafka cluster metadata information. It keeps track of status of the Kafka cluster nodes, Kafka topics, partitions

  • Manages brokers
  • Helps in performing leader election for partitions
  • It has a leader and the rest of the servers are followers
  • With Apache Kafka 2.8 version, you have option to use KIP-500 that removes the Apache Zookeeper dependency.
  • In KIP-500, Kafka relies on an internal Raft quorum that can be activated through Kafka Raft metadata mode.
  • When Kafka Raft Metadata mode is enabled, Kafka will store its metadata and configurations into a topic called @metadata.

Producers – A producer writes data to topics.

  • Producer can choose to receive acknowledgement of data writes
    • acks=0: Producer won’t wait for acknowledgment
    • acks=1 : Default. Producers wait for leader acknowledgment
    • acks=all : Producer waits for leader + replicas acknowledgment
  • Producer can choose to send a key with the message to write to specific partition. If key is null, data is written in round robin.
  • If key is sent then all messages for that key will be written to the same partition provided number of partitions remains constant.

Consumers – Consumers read data from topic.

  • Data is read in order within each partitions.
  • Kafka stores the offsets at which a consumer group has been reading. The offsets committed live in a Kafka topic names ___consumer_offsets.
  • If a consumer dies, it will be able to read back from where it left off based on the committed consumer offsets.
  • Consumers choose when to commit offsets.
    • At most once – offsets are committed as soon as the message is received. If processing goes wrong, message is lost.
    • At least once – offsets are committed after the message is processed. If processing goes wrong, message will be read again.
  • By default, consumer reads from the point when you launch it and will only intercept the new messages.
  • Consumers read messages in the order stored in topic’s partition.

I have setup Amazon MSK which makes it easy to ingest and process streaming data in real time with fully managed Apache Kafka. The MSK cluster is deployed to a VPC in a Private subnet and has 3 brokers and it provides TLS, Plaintext client integration details by default. Using Amazon MSK eliminates operational overhead of provisioning, configuration, and maintenance of Apache Kafka and Kafka Connect clusters.

Document – https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html

Basic Commands

Note: I have “/home/ec2-user/kafka/bin/” set in PATH

Create Kafka topics with Partitions:

kafka-topics.sh --create \
--zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181,z-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181,z-2.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" \
--replication-factor 2 \
--partitions 10 \
--topic usage_event \
--config retention.ms=345600000

Deleting Kafka topics:

kafka-topics.sh --delete --zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" --topic usage_event

Listing Kafka topics:

kafka-topics.sh --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" --list

Describe Kafka topics:

kafka-topics.sh --bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" --topic usage_event --describe

Data within each Kafka Partitions:

Total message count (all partitions combined) –

kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list "b-3.mskcluster.
ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--topic usage_event | awk -F ":" '{sum+= $3} END {print sum}'

Total message count per partition –

kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--topic usage_event | awk -F ":" '{print $3}'

Kafka Topics Settings:

kafka-topics.sh \
--zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" \
--describe --topics-with-overrides

Modify Kafka Topics Settings:

kafka-configs.sh \
--zookeeper "z-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:2181" \
--entity-type topics \
--entity-name usage_event 
--alter --add-config retention.ms=5259600000

Get Kafka Topic Partition Offset:

kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--topic usage_event

Kafka Consumer:

Read all messages from beginning

kafka-console-consumer.sh \
--bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--topic usage_event \
--from-beginning

Read maximum of 10 messages from beginning –

kafka-console-consumer.sh \
--bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--topic usage_event \
--from-beginning \
--max-messages 10

Read message from specific partition and based of specific offset

kafka-console-consumer.sh \
--bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--topic mytopic \
--offset 10 \
--partition 0

Delete Records from Kafka Topic:

It is not possible to delete messages in the middle of the topic. You can delete the messages in a partition from the beginning until an offset. To do so, create a json file specifying the partition and offset, as shown below –

cat delete-records.json
{
    "partitions": [
        {
            "topic": "usage_event",
            "partition": 1,
            "offset": 6
        }
    ],
    "version": 1
}
kafka-delete-records \
--bootstrap-server "b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092" \
--offset-json-file delete-records.json

Executing records delete operation
Records delete operation completed:
partition: usage_event-0 low_watermark: 6

Performance Test:

Producer Performance test –

kafka-producer-perf-test.sh \
--topic usage_event \
--throughput -1 \
--num-records 100000 \
--record-size 1024 \
--producer-props acks=all bootstrap.servers=b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-2.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:909 \
--producer.config /tmp/kafka/producer.properties_msk

Command output –


100000 records sent, 14714.537964 records/sec (14.37 MB/sec), 1467.41 ms avg latency, 2535.00 ms max latency, 1300 ms 50th, 2431 ms 95th, 2519 ms 99th, 2533 ms 99.9th.

Consumer Performance test –

kafka-consumer-perf-test.sh \ 
--topic usage_event \
--broker-list b-3.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-2.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092,b-1.mskcluster.ab1cde.c10.kafka.us-west-2.amazonaws.com:9092 \
--messages 10000 \
--consumer.config /tmp/kafka/consumer.properties | \
jq -R .|jq -sr 'map(./",")|transpose|map(join(": "))[]'

Command output –

[2021-10-24 06:42:47,333] WARN The configuration 'SSL_KEYSTORE_PASSWORD_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-10-24 06:42:47,333] WARN The configuration 'BOOTSTRAP_SERVERS_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-10-24 06:42:47,333] WARN The configuration 'SSL_KEY_PASSWORD_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2021-10-24 06:42:47,333] WARN The configuration 'SCHEMA_REGISTRY_URL_CONFIG' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
start.time: 2021-10-24 06:42:47:337
end.time: 2021-10-24 06:42:50:888
data.consumed.in.MB: 9.8096
MB.sec: 2.7625
data.consumed.in.nMsg: 10045
nMsg.sec: 2828.7806
rebalance.time.ms: 1635057770697
fetch.time.ms: -1635057767146
fetch.MB.sec: -0.0000
fetch.nMsg.sec: -0.0000

Hope you find this helpful.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s