Creating a Java Application for Apache Kafka Data Processing with AWS CDK

Creating a Java Application for Apache Kafka Data Processing with AWS CDKMore Info

In Amazon DynamoDB, Amazon Elastic Container Service, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Architecture, AWS CLI, AWS Cloud Development Kit, AWS Fargate, AWS Java Development, AWS Lambda, AWS SDK for Java, Developer Tools, Expert (400), How-To, Java, Node.js, Open Source, Technical How-to

Processing data with a Java application that utilizes Apache Kafka is a prevalent scenario across various sectors. For instance, event-driven and microservices architectures frequently depend on Apache Kafka for efficient data streaming and decoupling of components. It serves as both a message queue and an event bus, enhancing the resilience and reproducibility of events within the application.

In this article, I will guide you through the steps of building a straightforward end-to-end data processing solution using AWS tools and services, alongside other standard industry practices. We will begin with a brief overview of the architecture and infrastructure definitions. You’ll see how, with minimal code, you can establish an Apache Kafka cluster using Amazon Managed Streaming for Apache Kafka (Amazon MSK) and the AWS Cloud Development Kit (AWS CDK). Following that, I will demonstrate how to structure your project and package your application for deployment. We will also examine the implementation details, including how to create Kafka topics in the Amazon MSK cluster and how to send and receive messages using AWS Lambda and AWS Fargate.

The AWS CDK allows for automation in infrastructure creation and application deployment. This open-source framework lets you define cloud application resources using familiar programming languages. For further details, refer to the Developer Guide, AWS CDK Intro Workshop, and the AWS CDK Examples GitHub repository. You can find all the code mentioned in this article open-sourced on GitHub.

Solution Overview

The architecture of our solution is depicted in the following diagram. The TransactionHandler Lambda function triggers the publishing of messages to an Apache Kafka topic. The application is packaged in a container and deployed to ECS Fargate, where it consumes messages from the Kafka topic, processes them, and stores the results in an Amazon DynamoDB table. The KafkaTopicHandler Lambda function runs once during deployment to create the Kafka topic. Both the Lambda function and the consumer application log their output to Amazon CloudWatch.

To proceed with this guide, you will need the following prerequisites:

  • An active AWS account
  • Java SE Development Kit (JDK) 11
  • Apache Maven
  • The AWS CDK
  • AWS Command Line Interface (AWS CLI) version 2
  • Docker

Project Structure and Infrastructure Definition

The project comprises three primary components: the infrastructure (which includes the Kafka cluster and Amazon DynamoDB), a Spring Boot Java consumer application, and the Lambda producer code. Let’s delve into the infrastructure and deployment definition, implemented through a series of AWS CDK stacks and constructs. I have opted for TypeScript for this example due to personal preference, but you can use other CDK-supported languages, including Python, Java, .NET, and Go. For additional information, check out Working with the AWS CDK.

The project directory structure is organized as follows: all AWS CDK stacks are located in the amazon-msk-java-app-cdk/lib directory. The main AWS CDK application that instantiates all stacks can be found in amazon-msk-java-app-cdk/bin. The lambda directory contains the code for TransactionHandler, which publishes messages to a Kafka topic, and KafkaTopicHandler, which creates the Kafka topic. The Kafka consumer’s business logic, written in Java as a Maven project, is housed in the consumer directory. The Dockerfile required for Fargate container creation is in consumer/docker/Dockerfile. Lastly, architecture diagrams are stored in the doc directory, while the scripts directory contains the deployment script.

Setting Up Your Kafka Cluster

The Kafka cluster, a pivotal component of the architecture, is set up using Amazon MSK, which can be easily defined and deployed with the AWS CDK. In the following code snippet, I utilize the CfnCluster construct to create my cluster:

new msk.CfnCluster(this, "kafkaCluster", {
    brokerNodeGroupInfo: {
        securityGroups: [vpcStack.kafkaSecurityGroup.securityGroupId],
        clientSubnets: [...vpcStack.vpc.selectSubnets({
            subnetType: ec2.SubnetType.PRIVATE
        }).subnetIds],
        instanceType: "kafka.t3.small",
        storageInfo: {
            ebsStorageInfo: {
                volumeSize: 5
            }
        }
    },
    clusterName: "TransactionsKafkaCluster",
    kafkaVersion: "2.7.0",
    numberOfBrokerNodes: 2
});

In the code above, vpcStack refers to the AWS CDK stack that defines the VPC. For this demonstration, I have restricted the storage to 5 GB, selected the instance type as kafka.t3.small, and set the minimum number of broker nodes to two. To ensure the cluster is not accessible from outside the VPC, it is placed within a private subnet. For more details on the permissible settings, see the interface CfnClusterProps. To learn more about Amazon MSK, refer to this excellent resource.

Topic Creation

As of this writing, Amazon MSK does not permit the creation of Kafka topics via the AWS service API, so you must connect directly to the Kafka cluster using appropriate tools or libraries within your application code. In this project, I am leveraging the AWS CDK’s custom resource provider, which enables the use of a custom Lambda function to manage AWS CloudFormation’s lifecycle events. You can find the definitions of CustomResource, Provider, and Lambda function resources in the kafka-topic-stack.ts file, while the implementation of the handler Lambda function is in the kafka-topic-handler.ts file.

Here’s a glimpse of the function’s code:

export const handler = async (event: any, context: any = {}): Promise => {
    try {
        if (event.RequestType === 'Create' || event.RequestType === 'Update') {
            let result = await createTopic(event.ResourceProperties.topicConfig);
            response.send(event, context, response.SUCCESS, {alreadyExists: !result});
        } else if (event.RequestType === 'Delete') {
            await deleteTopic(event.ResourceProperties.topicConfig.topic);
            response.send(event, context, response.SUCCESS, {deleted: true});
        }
    } catch (e) {
        response.send(event, context, response.FAILED, {reason: e});
    }
}

The handler is invoked during the deployment of the KafkaTopicStack and again during its destruction. The admin client from the KafkaJS open-source library is utilized to create Kafka topics on the ‘Create’ AWS CloudFormation event and to remove them on the ‘Delete’ event. The createTopics method from KafkaJS resolves to true if the topic is successfully created or false if it already exists.

Consumer Implementation Details

The primary function of the Kafka consumer in this project is to process and validate incoming transaction messages and store the results in the DynamoDB table. The consumer application is developed in Java using the Spring Boot framework. The core functionality is encapsulated in the KafkaConsumer class, where the @KafkaListener annotation defines the entry point for processing messages.

For a deeper dive into similar topics, be sure to check out this blog post for additional insights.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *