Use Amazon MSK Connect with Lenses plugin to sink data from Amazon MSK to Amazon S3

Apache Kafka is an open-source distributed event streaming platform consisting of servers and clients communicating via high performance TCP network protocol. It allows you to decouple your source system and target system. It is optimized for ingesting and processing streaming data in real-time. Due to its distributed nature, it provides high throughput, scalability, resilient architecture and fault tolerance. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that uses Apache Kafka to process streaming data.

Kafka Connect is a framework that allows you to continuously ingest data from external systems into Apache Kafka, and vice versa. With Amazon MSK Connect you can run a fully managed Apache Kafka Connect workload on AWS. It makes it easier to stream data to and from Amazon MSK.

In this blog post I will demonstrate how you can deploy MSK connect using Lenses plugin to move data from Amazon MSK cluster with IAM role-based authentication enabled to Amazon S3. The data is flushed to Amazon S3 every 5 minute and is stored in JSON format. The Amazon S3 location is crawled using AWS Glue crawler to create a Glue catalog table. Using Amazon Athena, the serverless interactive query service, we will then query the data using standard SQL.

In my setup, Amazon MSK cluster is running with Apache Kafka version 2.7.1 and having 3 brokers.

The brokers are running in private subnet and public access set to off. Amazon MSK distributes the brokers evenly over the subnets.

Amazon MSK cluster Network settings

The private subnet is associated with route table with NAT Gateway running on public subnet. The subnets must all be in different Availability Zones.

Private subnets associate with Route table attached with NAT Gateway

Nat Gateway attached to Public subnet

For the Amazon MSK setup, IAM role-based authentication is enabled. By default, IAM users or roles do not have permission to execute Amazon MSK API actions.

For this use-case, I have written a Python script to generate some fake transaction data. This script writes the data to transaction_events topic in Amazon MSK cluster every 0.2 seconds. The script is executed from an Amazon EC2 instance which has access to Amazon MSK cluster.

import os
import sys
import json
import datetime
import time
import random
import uuid
import logging
from faker import Faker
from kafka import KafkaProducer
from kafka.errors import KafkaError

logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"),
                    format='[%(filename)s:%(lineno)s] %(asctime)-10s %(message)s',
                    datefmt='%Y-%m-%d:%H:%M:%S')

logger = logging.getLogger(__name__)

BOOTSTRAP = 'b-3.mskbuildlabcluster.xxxxxx.xxx.kafka.us-west-2.amazonaws.com:9092,b-2.mskbuildlabcluster.xxxxxx.xxx.kafka.us-west-2.amazonaws.com:9092,b-1.mskbuildlabcluster.xxxxxx.xxx.kafka.us-west-2.amazonaws.com:9092'

fake = Faker()
Faker.seed(0)

producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP,
    value_serializer=lambda m: json.dumps(m).encode('ascii')
    )

start_time = time.time()
logging.info('Start time for loop {}'.format(start_time))

card_type = ["CREDIT","DEBIT"]

total_count = 0
for i in range(100000):
    time.sleep(.2)
    producer.send('transaction_events', value={
        "additionalData":{
            "cardHolderName":fake.name(),
            "shopperCountry":fake.country(),
            "shopperIP":fake.ipv4()
        },
        "card_detail":{
            "cardNumber":fake.credit_card_number(),
            "provider":fake.credit_card_provider(),
            "expiration":fake.credit_card_expire(),
            "cardType":random.choice(card_type)
        },
        "transaction":{
            "currency":fake.currency(),
            "amount": fake.randomize_nb_elements(number=7500),
            "transactionId": str(uuid.uuid4()),
            "created_at": int(time.time() * 1000)
        },
        "eventCode":"AUTHORISATION",
        "success":fake.boolean(chance_of_getting_true=75)
    })

    total_count += 1

end_time = time.time()

logging.info(f'Total transaction count: {total_count}')
logging.info(f'End time for loop {end_time}')
logging.info(f'Total time - {end_time - start_time} sec')

producer.flush()

Below is a sample transaction record. A record contains –

  • Card details like card number, card provider, expiration month of card, and the card type – if it’s credit or debit card.
  • Transaction details – containing amount, transaction id, currency and transaction date in epoch time. Currency is a list containing the currency code and country of currency.
  • AdditionalData – This section contains the cardholder’s name, country and IP address from where the transaction happened.
  • The eventCode is Authorization and success key defines if the authorization was successful (true) or not (false).
{
   "additionalData":{
      "cardHolderName":"Cathy Martinez",
      "shopperCountry":"Cote d'Ivoire",
      "shopperIP":"177.244.104.76"
   },
   "card_detail":{
      "cardNumber":"4869141314562087",
      "provider":"Diners Club / Carte Blanche",
      "expiration":"10/22",
      "cardType":"CREDIT"
   },
   "transaction":{
      "currency":[
         "VND",
         "Vietnamese \u0111\u1ed3ng"
      ],
      "amount":5400,
      "transactionId":"cca3f71a-2805-418a-b24f-cf70d29ea10a",
      "created_at":1644887414792
   },
   "eventCode":"AUTHORISATION",
   "success":false
}

Moving on, I downloaded the latest version (3.0.1-2.5.0 as of this writing) of Lenses Amazon S3 Connector –  Kafka Connect connector for AWS S3 from the link. Upload the JAR file to Amazon S3. This connector supports Amazon S3 both as sink and source:

  • Sink (Data flow from Kafka Connect -> S3)
  • Source (Data flow from S3 -> Kafka Connect)

Next step is to create custom plugin by providing the S3 URI of the Custom plugin object uploaded to S3.

Amazon MSK > MSK Connect > Custom plugins > Create custom plugin

Once the custom plugin creation complete, you will have the status as Active and ARN available.

Now, it’s time to move on to create MSK Connector.

Amazon MSK > MSK connect > Connectors

Select the custom plugin (in active status) you created in the previous step –

Move to next step and provide name and description for connector. Here I am provided the name “lensesio-msk-transaction-events-src-s3-sink-iam-auth”.

Next, select the MSK cluster and choose the Authentication method. I have chosen IAM as the aNext, select the MSK cluster. As my MSK cluster had IAM role-based authentication enabled, I chose authentication method as IAM.

In the connector configuration section of step-2, provide the configuration settings –

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasks.max=2
topics=transaction_events
connect.s3.vhost.bucket=true
schema.enable=false
connect.s3.kcql=INSERT INTO learn-share-repeat-us-west-2:MSKBuildLabCluster SELECT * FROM transaction_events STOREAS `JSON` WITH_FLUSH_INTERVAL = 300
aws.region=us-west-2
aws.custom.endpoint=https://s3.us-west-2.amazonaws.com/
connect.s3.aws.region=us-west-2
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
errors.log.enable=true

Few things to note and make sure change per your requirements –

  • topics – Name of kafka topic.
  • connect.s3.kcql – Name of S3 bucket and prefix in the INSERT INTO part of kcql and topic name is the SELECT part of kcql.
  • WITH_FLUSH_INTERVAL – The interval at which you want data to be sinked to S3. Parameter is defined in seconds. In my example, I am flushing the data to Amazon S3 every 5 minutes.
  • aws.region – Change AWS region to your AWS region.
  • aws.custom.endpoint – Change only the region section in URL to your AWS region.
  • connect.s3.aws.region – specify your AWS region.

You can further read here and change the configurations per your needs.

Moving ahead, if you have any custom worker configuration then use that else chose default. For access permission chose the IAM role which has access to MSK Kafka topic. I have provided the IAM role details in next section.

Let’s look into IAM role in more detail. I have three policies attached to the IAM role.

KMS-Inline-Policy : Grants access to SSE-KMS which is being used to encrypt objects stored in Amazon S3 bucket.

S3-Read-Write-Inline-Policy : This policy grants read and write permission to the Amazon S3 bucket. You can write more restrictive policy if needed.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:Get*",
                "s3:List*",
                "s3:Put*"
            ],
            "Resource": [
                "arn:aws:s3:::learn-share-repeat-us-west-2/*",
                "arn:aws:s3:::learn-share-repeat-us-west-2"
            ]
        }
    ]
}

IAM-Auth-MSK-Transaction-Events-Inline-Policy : This policy provides connect, describe cluster, describe topic and read data permissions to the transaction_events topic in MSKBuilLabCluster MSK cluster. Read more here.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-2:xxxxxxxxxxxx:cluster/MSKBuildLabCluster/182c4d23-2a59-458e-9251-8d158b115df1-10",
                "arn:aws:kafka:us-west-2:xxxxxxxxxxxx:topic/MSKBuildLabCluster/182c4d23-2a59-458e-9251-8d158b115df1-10/transaction_events"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-2:xxxxxxxxxxxx:topic/MSKBuildLabCluster/182c4d23-2a59-458e-9251-8d158b115df1-10/__amazon_msk_connect_*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-west-2:xxxxxxxxxxxx:group/MSKBuildLabCluster/182c4d23-2a59-458e-9251-8d158b115df1-10/__amazon_msk_connect_*",
                "arn:aws:kafka:us-west-2:xxxxxxxxxxxx:group/MSKBuildLabCluster/182c4d23-2a59-458e-9251-8d158b115df1-10/connect-*"
            ]
        }
    ]
}

The role must have the following trust policy so that MSK Connect can assume it.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "kafkaconnect.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

On Step-3, the access control and encryption methods are set by default, so click on Next.

At Step-4, you have the choice to select the destinations for log delivery.

In the next step, review and click create. Once you click on Connect, open CloudWatch log group to view and monitor Connector creation. Once the connector is created successfully, you will see the status as “Running”.

Creating a connector, creates three new internal __amazon_msk_connect_* topics – status, offsets, and configs

__amazon_msk_connect_status_lensesio-msk-transacton-events-src-s3-sink-iam_165f318b-28c2-46ac-a41c-bdc3e2dc3f87-3
__amazon_msk_connect_offsets_lensesio-msk-transacton-events-src-s3-sink-iam_165f318b-28c2-46ac-a41c-bdc3e2dc3f87-3
__amazon_msk_connect_configs_lensesio-msk-transacton-events-src-s3-sink-iam_165f318b-28c2-46ac-a41c-bdc3e2dc3f87-3

After the connector is running, you should notice S3 prefix with the topic name as shown below. Since the kcql command had “learn-share-repeat-us-west-2:MSKBuildLabCluster” as S3 location, the prefix is created under s3://learn-share-repeat-us-west-2/MSKBuildLabCluster/.

The transaction_events topic was created with 5 partitions and replication factor of 2.

[ec2-user@ip-10-0-1-43 ~]$ kafka-topics.sh --create --zookeeper $ZOOKEEPER --replication-factor 2 --partitions 5 --topic transaction_events

Before I created MSK connect, I had sent 10 messages to transaction_events topic. Below is the message count in each partitions.

[ec2-user@ip-10-0-1-43 ~]$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $BOOTSTRAP --topic transaction_events | awk -F ":" '{print $3}'
1
2
2
2
3
[ec2-user@ip-10-0-1-43 ~]$ 

On S3, you will observe that the connector has created folder/prefix for each partition.

And within each partition, you have the messages stored in JSON format. For example, in the below screenshot a .json file is stored in s3://learn-share-repeat-us-west-2/MSKBuilLabCluster/transaction_events/0/

To view data within the S3 json file, you can use “Query with S3 Select” . Select the .json file > Object actions > Query with S3 Select

Now that we have few files on Amazon S3, I crawled the S3 location using AWS Glue crawler to create a table in Glue catalog. The Glue crawler created msk_transaction_events table.

Let’s query the table in Athena –

As I had mentioned earlier, before creating the MSK connector I had loaded 10 messages to MSK topic and the count sql in Athena confirms 10 records.

At this point, I started 5 parallel SSH sessions and executed the python script to send messages to transaction_events topic.

[ec2-user@ip-10-0-1-43 ~]$ nohup python3 cc-data-gen.py > session-1-cc-data-gen.log 2>&1 &
[1] 10649

[ec2-user@ip-10-0-1-43 ~]$ ps -ef | grep cc-data-gen
ec2-user 10649  1794  1 03:17 pts/0    00:00:02 python3 cc-data-gen.py
ec2-user 10657  9736  1 03:17 pts/1    00:00:02 python3 cc-data-gen.py
ec2-user 10665  9800  1 03:17 pts/2    00:00:02 python3 cc-data-gen.py
ec2-user 10667  9858  1 03:17 pts/3    00:00:02 python3 cc-data-gen.py
ec2-user 10675  9922  1 03:17 pts/4    00:00:02 python3 cc-data-gen.py

As you can see in the below screenshot, we have new json file written to S3 by MSK connect every 5 minute.

Since the data is now available on Amazon S3, and is cataloged in AWS Glue catalog, you can use standard SQL query to perform analysis. Using the below SQL I am looking for any transaction which has amount over 7000 and the transaction was successful. Since the transaction time is captured in epoch time, I am using from_unixtime to convert it to human readable format.

After running the script for little over an hour, I killed the running sessions on EC2 instance. By then, transaction_events topic had total 111547 messages.

[ec2-user@ip-10-0-1-43 ~]$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $BOOTSTRAP --topic transaction_events | awk -F ":" '{print $3}'
22369
22060
22547
22231
22340

The final count of records in Athena also shows the same count.

To conclude, in this blog post we created a python script to load fake transaction data to transaction_events topic in MSK cluster. We then downloaded the latest version of Lenses Amazon S3 Connector — Kafka Connect connector for AWS S3 and created Amazon MSK Connect Custom plugin. Then we moved on to create MSK Connector using the Lenses custom plugin. MSK connector sinks the data to Amazon S3 every 5 minutes as set in the connector configuration. The data on Amazon S3 was crawled using AWS Glue crawler which created a table in Glue catalog. With the table available in Glue catalog, we queried the data using Amazon Athena.

References

Apache Kafka — https://kafka.apache.org/documentation/

MSK Connect — https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect.html

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s