Today, AWS Lambda is unveiling new controls designed for asynchronous and stream processing invocations. These enhancements empower users to tailor their responses to Lambda function errors, facilitating the creation of more robust event-driven and stream-processing applications.
Stream Processing Function Invocations
When handling data from sources like Amazon Kinesis Data Streams and Amazon DynamoDB Streams, Lambda processes records in batches through shards, with each shard representing a unique sequence of data records. Your function is invoked to process records from the batch “in order.” If an error occurs, Lambda will retry the batch until it processes successfully or the data expires. While this retry behavior is beneficial in many scenarios, it’s not universally applicable:
- Until the error is addressed, no data within the shard is processed. A single corrupt record can halt the processing of an entire shard due to the “in order” guarantee, which mandates that failed batches be retried until the data record expires. This phenomenon is often dubbed a “poison pill” event, as it blocks the rest of the system from processing data.
- In certain situations, retrying may not be constructive if subsequent attempts are also likely to fail.
- If the function is not idempotent, retries can lead to unintended side effects, resulting in duplicate outputs (such as multiple database entries or business transactions).
The New Lambda Controls for Stream Processing Invocations
With these new customizable controls, users can determine how function errors and retries affect stream processing function invocations. A new event source-mapping configuration subresource enables developers to specify which controls apply to the invocations.
DestinationConfig: {
OnFailure: {
Destination: "SNS/SQS arn (String)"
}
}
{
"MaximumRetryAttempts": integer,
"BisectBatchOnFunctionError": boolean,
"MaximumRecordAgeInSeconds": integer
}
Maximum Retry Attempts
Set a limit on the number of retry attempts for batches before they can be bypassed to allow processing to continue and avoid duplicating outputs.
Minimum: 0 | Maximum: 10,000 | Default: 10,000
Maximum Record Age in Seconds
Set the maximum age of a record in seconds, with expired records being skipped to facilitate continuous processing. Records that do not get successfully processed within the specified age will be skipped.
Minimum: 60 | Default/Maximum: 604,800
Bisect Batch on Function Error
This feature allows for recursive splitting of the failed batch, enabling retries on smaller subsets of records to isolate the metadata causing the error.
Default: false
On-Failure Destination
When either MaximumRetryAttempts or MaximumRecordAgeInSeconds is reached, a record will be skipped. If ‘Destination’ is configured, metadata about the skipped records can be sent to a target ARN (e.g., Amazon SQS or Amazon SNS). If no target is set, the record will simply be dropped.
Getting Started with Error Handling for Stream Processing Invocations
Here’s how to utilize the new controls to create a customized error handling configuration for stream processing invocations. You will set the maximum retry attempts to 1, the maximum record age to 60 seconds, and send the metadata of skipped records to an Amazon Simple Queue Service (SQS) queue. Start by creating a Kinesis Stream to send records into, and an SQS queue to capture the metadata of retry-exhausted or expired data records.
- Navigate to the Lambda console and select Create function.
- Ensure “Author from scratch” is selected, and enter a function name, such as “customStreamErrorExample.”
- Set Runtime to Node.js.12.x.
- Click Create function. To enable Kinesis as an event trigger and send metadata of a failed invocation to the SQS standard queue, you’ll need to grant the Lambda execution role the necessary permissions.
- Scroll to the Execution Role section and click the view link under the Existing role dropdown.
- Choose Add inline policy > JSON, and paste the following (replacing {yourSQSarn} with the ARN of your SQS queue and {yourKinesisarn} with the ARN of the stream you created). Select Review policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": [
"{yourSQSarn}",
"{yourKinesisarn}"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "kinesis:ListStreams",
"Resource": "*"
}
]
}
- Name the policy “CustomSQSKinesis” and create the policy.
- Return to your Lambda function and select Add trigger.
- Choose Kinesis from the dropdown and select your stream from the Kinesis stream dropdown.
- To view the new control options, expand the Additional settings section.
- Paste the ARN of your previously created SQS queue.
- Set Maximum retry attempts to 1 and Maximum record age to 60. Select Add. The Kinesis trigger is now configured and the Lambda function has the necessary permissions to write to SQS and CloudWatch Logs.
Next, in the Function code section, paste the following code to simulate an error:
exports.handler = async (event) => {
// TODO implement
console.log(event);
const response = {
statusCode: 200,
body: event,dfh
};
return response;
};
This code contains a syntax error intended to induce a failure response. Trigger the Lambda function by adding a record to the Kinesis Stream using the AWS CLI. If you haven’t already, check out how to install the AWS CLI.
In your terminal, execute the following command, replacing {YourStreamName} with your stream’s name:
aws kinesis put-record --stream-name {YourStreamName} --partition-key 123 --data testdata
You should see a response similar to this (your sequence number will vary):
In the AWS Lambda console, navigate to Monitoring > View logs in CloudWatch and select the latest log group. You’ll observe that the Lambda function failed as expected, but this time with just a single retry attempt, since the maximum retry attempts were set to 1 and the record’s age was under the maximum record age of 60 seconds.
In the AWS Management Console, go to your SQS queue under Services > SQS. Select your queue and choose Queue Actions > View/Delete Messages > Start Polling for Messages. You will find the metadata of your failed invocation successfully sent to the SQS destination for further review or processing.
Asynchronous Function Invocations
When a function is invoked asynchronously, Lambda places the event in a queue prior to processing it with your function. Invocations that lead to exceptions in the function code are retried twice, with a minute’s delay before the first retry and two minutes before the second. Some invocations may not execute at all due to throttling.
For additional insights on the topic, be sure to visit this informative blog post. For authoritative information, check this link as well. Also, this Reddit thread is an excellent resource.
Location: Amazon IXD – VGT2, 6401 E Howdy Wells Ave, Las Vegas, NV 89115
Leave a Reply