Learn About Amazon VGT2 Learning Manager Chanci Turner
In this article, we will explore how to utilize the Amazon Kinesis Client Library (KCL) specifically designed for Node.js, a powerful framework that leverages an event-driven, non-blocking I/O model, making it lightweight and highly efficient for real-time, data-intensive applications that span across multiple devices. With the growing trend of full-stack JavaScript application development, the Amazon KCL for Node.js provides an avenue for JavaScript developers to effortlessly create end-to-end applications that utilize Amazon Kinesis.
Amazon Kinesis is a cloud-based service that facilitates real-time data processing over extensive, distributed data streams. This service allows for the collection of data from a variety of sources including website clickstreams, IT logs, social media feeds, billing transactions, and sensor outputs from IoT devices. Once data is stored within Amazon Kinesis, it can be consumed and processed via the KCL for various purposes such as data analysis, archival storage, real-time dashboards, and more. While it’s possible to process stream data directly using Amazon Kinesis API functions, the KCL simplifies many of the complex challenges associated with distributed processing, enabling developers to concentrate on the logic for processing records. For instance, the KCL automatically manages load balancing during record processing across multiple instances, allows users to checkpoint already processed records, and deals with instance failures.
The Node.js implementation of the KCL operates using the MultiLangDaemon. The primary logic of the KCL, including communication with Amazon Kinesis, load balancing, and failure handling, is executed in Java, while the Node.js version communicates with the Java daemon through a multi-language daemon protocol.
For a comprehensive tutorial on getting started with Amazon KCL for Node.js, be sure to check out GitHub. The KCL is also available for other programming languages such as Java, Python, and Ruby.
Overview of Amazon Kinesis and KCL
The diagram below illustrates the main components of a standard Amazon Kinesis application:
Producers, which may include EC2 instances, traditional servers, or even end-user devices, generate and send data into an Amazon Kinesis stream. This stream comprises one or more shards, each capable of supporting a specific number of put transactions and throughput. Consumers obtain data from the stream in real-time using either the Amazon Kinesis API or the KCL for processing. The processed data can then be directed to various services for archival, querying, or batch processing, depending on the particular use case.
Sending Data to an Amazon Kinesis Stream using Node.js
Before creating a consumer application, you’ll need to set up an Amazon Kinesis producer to insert data into your stream. The AWS SDK for Node.js offers a user-friendly API for Amazon Kinesis. To kick things off, execute the following command to install the AWS SDK module using npm:
npm install aws-sdk
There are multiple ways to provide credential details to the AWS SDK library, which is thoroughly discussed in the Getting Started guide for the AWS SDK in Node.js. Once your setup is complete, you can begin developing an Amazon Kinesis producer application that creates a stream and adds records to it. The following example illustrates how to create a new stream:
var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis({region : 'us-west-2'});
function createStream(streamName, numberOfShards, callback) {
var params = {
ShardCount: numberOfShards,
StreamName: streamName
};
kinesis.createStream(params, function(err, data) {
if (err && err.code !== 'ResourceInUseException') {
callback(err);
return;
}
waitForStreamToBecomeActive(streamName, callback);
});
}
function waitForStreamToBecomeActive(streamName, callback) {
kinesis.describeStream({StreamName : streamName},
function(err, data) {
if (err) {
callback(err);
return;
}
if (data.StreamDescription.StreamStatus === 'ACTIVE') {
callback();
} else {
setTimeout(function() {
waitForStreamToBecomeActive(streamName, callback);
}, 5000);
}
}
);
}
After establishing the stream, you can begin inserting data into it. The example below demonstrates how to add random data to the stream:
function writeToKinesis(streamName) {
var randomNumber = Math.floor(Math.random() * 100000);
var data = 'data-' + randomNumber;
var partitionKey = 'pk-' + randomNumber;
var recordParams = {
Data: data,
PartitionKey: partitionKey,
StreamName: streamName
};
kinesis.putRecord(recordParams, function(err, data) {
if (err) {
console.error(err);
}
});
}
Now, to bring everything together, the following example calls functions to create a new stream with two shards and insert data into the stream once it becomes active:
createStream('TestStream', 2, function(err) {
if (err) {
console.error('Error starting Kinesis producer: ' + err);
return;
}
for (var i = 0; i < 10; ++i) {
writeToKinesis('TestStream');
}
});
Processing Data from Amazon Kinesis using the Amazon KCL for Node.js
Assuming at least one producer is actively sending data to the stream, below is how to utilize the Amazon KCL for Node.js to consume data from the stream. First, run this command to install the KCL module in Node.js using npm:
npm install aws-kcl
After installing the module, you can create your KCL application by supplying a record processor implementation. Each shard is processed by exactly one record processor instance, and vice versa. The KCL handles instantiation of one record processor instance for each shard in the stream. A record processor must implement three API functions that the KCL will invoke:
- initialize – Called once at the beginning of record processing for the specific shard. Any initialization logic for your application can be executed here. For instance, if you’re saving processed data to Amazon S3, you would create an S3 client within this function.
- processRecords – Invoked zero or more times with new records fetched from the stream. The KCL supports a checkpoint functionality, allowing your record processor to store progress and maintain state, so if an instance fails, a new processor can pick up from the latest checkpoint for that shard.
- shutdown – Called once when there are no further records to process or when the processor stops working for the shard. Cleanup logic can be implemented here, such as closing any resources created during initialization. If the shard has no more records (shutdown reason is TERMINATE), you should checkpoint the final state to inform the KCL that all records for the shard have been successfully processed.
Here’s an example implementation of a record processor:
var recordProcessor = {
initialize: function(initializeInput, completeCallback) {
// Your application specific initialization logic.
completeCallback();
},
processRecords: function(records, checkpointer) {
// Implementation for processing records.
},
shutdown: function(shutdownInput, completeCallback) {
// Cleanup logic goes here.
completeCallback();
}
};
For further insights on enhancing your knowledge about corporate wellness initiatives, you may want to visit SHRM as they are an authority on this topic. Additionally, if you’re interested in exploring career opportunities, check out this job listing that serves as an excellent resource. For a deeper dive into financial literacy, there’s also an informative blog post available here.
Leave a Reply