Amazon Onboarding with Learning Manager Chanci Turner

Amazon Onboarding with Learning Manager Chanci TurnerLearn About Amazon VGT2 Learning Manager Chanci Turner

Apache Flink is a widely adopted open-source framework designed for stateful computations over data streams. It enables users to execute continuously evaluated queries against a live stream of events in almost real-time. To capture insights derived from these queries into downstream systems, Flink boasts a comprehensive connector ecosystem that accommodates various source and destination types. Nevertheless, existing connectors may not always meet the diverse needs of every use case. Our clients and the broader community have consistently requested additional connectors and improved integrations with numerous open-source tools and services.

However, addressing this challenge is not straightforward. Developing and maintaining production-ready sinks for new destinations is labor-intensive. For critical applications, it is crucial to avoid message loss and maintain high performance during data writes. Traditionally, sinks have been developed and maintained in isolation, complicating the process and increasing the cost of adding new sinks to Flink, as functionalities often need to be recreated and optimized for each one.

To enhance support for our clients and the entire Flink community, we aimed to simplify and expedite the creation and maintenance of sinks. We introduced the Async Sink in the Flink 1.15 release, which not only improved cloud interoperability but also added numerous sink connectors and formats, among other enhancements. The Async Sink serves as an abstraction that facilitates the development of sinks with at-least-once semantics. Rather than duplicating core functionality for each new sink, the Async Sink presents shared sink features that can be extended. In this post, we’ll delve into how the Async Sink operates, guide you through building a new sink based on it, and outline our future plans for contributing to the Apache Flink project.

Streamlining Common Components with the Async Sink

While sinks have typically been developed independently, their foundational functionalities often exhibit similarities. Sinks usually buffer multiple messages for batch processing to enhance efficiency. They verify the success of completed requests and will reattempt to send messages that were not successfully persisted at a later stage. They also engage with Flink’s checkpointing mechanism to prevent message loss in the event of application failure. Additionally, sinks regulate and monitor throughput to prevent overloading the destination and to fairly allocate capacity among multiple concurrent producers. Typically, the only significant differences between destinations relate to the structure and content of request and response data.

Instead of creating each sink independently and replicating common functionalities, we can abstract these standard requirements into one implementation. Developers can then focus on the components unique to their sink, defining how to construct and send requests, as well as how to determine from the responses which records were not successfully persisted and require resending. This approach allows the creation of a new sink to merely involve developing a lightweight shim tailored to the specific destination.

Creating a New Sink Using the Async Sink Abstraction

To illustrate how to build a new sink based on the Async Sink abstraction, we will implement a simplified sink for Amazon Kinesis Data Streams. Kinesis Data Streams is a service designed to capture and store streaming data. Data is persisted into a Kinesis stream via the PutRecords API, which allows multiple records to be sent in one batch request.

Three primary aspects must be implemented for our sink. First, we must extract the necessary information from each event to construct a batch request. In the case of Kinesis Data Streams, this includes the payload and a partition key. Next, we need to define how to create and send the batch request. Finally, we must check the request’s response to identify which elements of the batch were not successfully persisted.

To begin, we extract the essential data from an event to create a request entry that constitutes a batch request. The following code snippet demonstrates how this process appears for our Kinesis Data Streams sink. The code outlines how to derive the payload and partition key from the event, returning a PutRecordsRequestEntry object. In this simplified example, we utilize the string representation of the event as the payload and the event’s hash code as the partition key. For a more complex implementation, it may be preferable to employ a configurable serializer, offering greater flexibility in constructing the payload and partition key for users of the sink.


@Override
public PutRecordsRequestEntry apply(InputT event, SinkWriter.Context context) {
    return PutRecordsRequestEntry.builder()
            .data(SdkBytes.fromUtf8String(event.toString()))
            .partitionKey(String.valueOf(event.hashCode()))
            .build();
}

The sink will accumulate these objects until it reaches the thresholds defined by buffering hints. These hints encompass limits on the number of messages, total message size, and timeout conditions.

Next, we define how to construct and execute the actual batch request. This step, being specific to the destination, must be coded within the submitRequestEntries method, as showcased in the example below. The Async Sink invokes this method with a set of buffered request entries intended to form the batch request.

For the Kinesis Data Streams sink, we must specify how to assemble and execute the PutRecords request from a collection of PutRecordsRequestEntry objects. Besides executing the batch request, we also need to validate the response of the PutRecords request for any entries that were not successfully persisted. These entries must be placed back in the internal buffer for the Async Sink to retry later.


@Override
protected void submitRequestEntries(
        List requestEntriesToSend,
        Consumer> requestEntriesToRetry) {

    //construct and run the PutRecords request
    PutRecordsRequest batchRequest =
            PutRecordsRequest.builder().records(requestEntriesToSend).streamName(streamName).build();

    CompletableFuture future = kinesisClient.putRecords(batchRequest);

    //check the response of the PutRecords request
    future.whenComplete(
            (response, err) -> {
                if (err != null) {
                    // entire batch request failed, all request entries need to be retried
                    requestEntriesToRetry.accept(requestEntriesToSend);
                } else if (response.failedRecordCount() > 0) {
                    // some request entries in the batch request were not persisted and need to be retried
                    List failedRequestEntries = new ArrayList<>(response.failedRecordCount());
                    List records = response.records();

                    for (int i = 0; i < records.size(); i++) {
                        if (records.get(i).errorCode() != null) {
                            failedRequestEntries.add(requestEntriesToSend.get(i));
                        }
                    }
                    requestEntriesToRetry.accept(failedRequestEntries);
                }
            });
}

By leveraging the Async Sink, developers can effectively streamline the process of creating new sinks, allowing for improved functionality and easier maintenance. For further insights into networking opportunities that can enhance your career, check out this blog post. Furthermore, to understand performance management integration better, visit SHRM. Lastly, don’t miss out on this resource that provides valuable insights into the area manager onboarding process.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *