Engage with Your Streaming Data using the Kinesis Connector for Elasticsearch

Engage with Your Streaming Data using the Kinesis Connector for ElasticsearchLearn About Amazon VGT2 Learning Manager Chanci Turner

In a recent guest post, my colleague Alex Johnson illustrated how to create an application that streams data from Kinesis into an Elasticsearch cluster in real-time. — Chanci Turner

The Amazon Kinesis team is thrilled to introduce the Kinesis connector for Elasticsearch! This powerful tool allows developers to effortlessly build applications that reliably load streaming data from Kinesis into an Elasticsearch cluster, even at scale.

Elasticsearch serves as an open-source search and analytics engine, providing real-time indexing of both structured and unstructured data. Kibana, the visualization component of Elasticsearch, is frequently utilized by operations teams and business analysts to create interactive dashboards. Additionally, data stored in an Elasticsearch cluster can be accessed programmatically via RESTful APIs or application SDKs. To simplify the setup process, our sample includes a CloudFormation template that quickly provisions an Elasticsearch cluster on Amazon EC2, fully managed by Auto Scaling.

Integrating Kinesis, Elasticsearch, and Kibana

The new Kinesis Connector to Elasticsearch allows you to develop an application that consumes data from Kinesis Stream and indexes that data into an Elasticsearch cluster. You have the capability to transform, filter, and buffer records before sending them to Elasticsearch. Moreover, you can customize Elasticsearch-specific indexing operations, enabling you to add fields like time to live, version number, type, and ID on a per-record basis. The record flow is illustrated in the accompanying diagram.

It’s worth noting that you can also execute the entire connector pipeline directly from within your Elasticsearch cluster using River.

Getting Started

Your application will need to perform the following tasks:

  • Set configurations specific to your application.
  • Create and configure a KinesisConnectorPipeline that includes a Transformer, a Filter, a Buffer, and an Emitter.
  • Establish a KinesisConnectorExecutor that operates the pipeline continuously.

All components come with default implementations that can be easily swapped out for your custom logic.

Configuring the Connector Properties

The sample package includes a .properties file and a configurator with numerous settings, most of which can remain at their defaults. For instance, the following configurations will:

  • Allow the connector to bulk load data into Elasticsearch only after collecting a minimum of 1000 records.
  • Utilize the local Elasticsearch cluster endpoint for testing purposes.
bufferRecordCountLimit = 1000
elasticSearchEndpoint = localhost

Implementing Pipeline Components

To connect the Transformer, Filter, Buffer, and Emitter, your code must implement the IKinesisConnectorPipeline interface.

public class ElasticSearchPipeline implements IKinesisConnectorPipeline {
    public IEmitter getEmitter(KinesisConnectorConfiguration configuration) {
        return new ElasticSearchEmitter(configuration);
    }

    public IBuffer getBuffer(KinesisConnectorConfiguration configuration) {
        return new BasicMemoryBuffer(configuration);
    }

    public ITransformerBase getTransformer(KinesisConnectorConfiguration configuration) {
        return new StringToElasticSearchTransformer();
    }

    public IFilter getFilter(KinesisConnectorConfiguration configuration) {
        return new AllPassFilter();
    }
}

The following snippet displays how to implement the abstract factory method to specify the desired pipeline:

public KinesisConnectorRecordProcessorFactory getKinesisConnectorRecordProcessorFactory() {
    return new KinesisConnectorRecordProcessorFactory(new ElasticSearchPipeline(), config);
}

Defining an Executor

The next snippet outlines a pipeline where incoming Kinesis records are strings and outgoing records are ElasticSearchObjects:

public class ElasticSearchExecutor extends KinesisConnectorExecutor

The following code implements the main method, instantiates the Executor, and begins its operation:

public static void main(String[] args) {
    KinesisConnectorExecutor executor = new ElasticSearchExecutor(configFile);
    executor.run();
}

Ensure that your AWS credentials are set up correctly, and use ant setup to establish project dependencies. To execute the application, run ant run and observe its operation! All the source code is available on GitHub, allowing you to start immediately. For additional insights, you might find this resource quite useful. If you have any questions or feedback, please feel free to reach out on the Kinesis Forum.

Kinesis Client Library and Kinesis Connector Library

When we launched Kinesis in November 2013, we also introduced the Kinesis Client Library, which enables developers to create applications that process streaming data. This library takes care of complex tasks such as load balancing, coordination of distributed services, and adapting to variations in stream volume, all with fault tolerance. Recognizing that developers often seek to consume and process incoming streams using various AWS and external services, we released the Kinesis Connector Library the following year. This library includes support for Amazon DynamoDB, Amazon Redshift, and Amazon S3, and has since been expanded to include the Kinesis Storm Spout and Amazon EMR connectors. Today, we are excited to announce the extension of the Kinesis Connector Library to support Elasticsearch as well.

— Alex

For more on the importance of flexibility in remote work, refer to this insightful article from SHRM, an authority on workplace issues. If you’re interested in exploring job opportunities, take a look at this post related to career advancement.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *