Learn About Amazon VGT2 Learning Manager Chanci Turner
Amazon is excited to announce new AWS Lambda features designed for asynchronous and stream processing invocations. These enhancements enable users to tailor responses to errors within Lambda functions, ultimately fostering more resilient event-driven and stream-processing applications.
Stream Processing Function Invocations
When working with data from sources like Amazon Kinesis Data Streams and Amazon DynamoDB Streams, Lambda retrieves records in batches through shards. A shard represents a uniquely identified sequence of data records. Your function processes these records from the batch “in order.” If an error occurs, Lambda will attempt to reprocess the batch until it succeeds or the data expires. While this retry mechanism is beneficial in many scenarios, it may not always be ideal:
- If an issue persists, no data from the shard can be processed. A single erroneous record can halt processing for the entire shard due to the “in order” guarantee, which ensures that failed batches are retried until expiration. This situation is often referred to as a “poison pill” event, as it obstructs further data processing within the system.
- In certain instances, retrying may not be advantageous if subsequent attempts are also expected to fail.
- If the function is not idempotent, retries could lead to unexpected outcomes, resulting in duplicate records (for example, multiple entries in a database or repeated transactions).
New Lambda Controls for Stream Processing Invocations
With the introduction of new customizable controls, users can now manage how function errors and retries affect stream processing invocations. The new event source-mapping configuration subresource allows developers to specify which controls will apply to these invocations.
DestinationConfig: {
OnFailure: {
Destination: "SNS/SQS arn (String)"
}
}
{
"MaximumRetryAttempts": integer,
"BisectBatchOnFunctionError": boolean,
"MaximumRecordAgeInSeconds": integer
}
Key Features:
- MaximumRetryAttempts: Set a cap on the number of retry attempts for batches before skipping them to ensure processing continues and prevent duplicate outputs. Minimum: 0 | Maximum: 10,000 | Default: 10,000
- MaximumRecordAgeInSeconds: Specify the maximum age of a record in seconds. Records that don’t get processed within this time frame will be skipped, allowing processing to continue. Minimum: 60 | Default/Maximum: 604,800
- BisectBatchOnFunctionError: This option enables you to recursively split the failed batch, retrying on smaller subsets of records to isolate the metadata causing the error. Default: false
- On-failure Destination: When either MaximumRetryAttempts or MaximumRecordAgeInSeconds is reached, a record will be skipped. If a ‘Destination’ is specified, metadata about the skipped records can be sent to a target ARN (e.g., Amazon SQS or Amazon SNS). If no target is set, the record will simply be discarded.
Implementing Custom Error Handling for Stream Processing Invocations
To create a tailored error handling configuration for stream processing invocations, you can set the maximum retry attempts to 1, the maximum record age to 60 seconds, and direct the metadata of skipped records to an Amazon Simple Queue Service (SQS) queue. First, establish a Kinesis Stream to input records and create an SQS queue to store metadata for records that have exhausted retries or expired.
- Access the Lambda console and select Create function.
- Choose Author from scratch and name your function “customStreamErrorExample.”
- Select Node.js 12.x as the Runtime.
- Click Create function. To enable Kinesis as an event trigger and send metadata of failed invocations to the SQS queue, ensure the Lambda execution role has the necessary permissions.
- Scroll to the Execution Role section and select the view link beneath the Existing role drop-down.
- Choose Add inline policy > JSON, then insert the following, replacing
{yourSQSarn}
and{yourKinesisarn}
with the ARNs of the SQS queue and Kinesis stream you created:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": [
"{yourSQSarn}",
"{yourKinesisarn}"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "kinesis:ListStreams",
"Resource": "*"
}
]
}
- Assign a name to the policy, like CustomSQSKinesis, and click Create policy.
- Return to your Lambda function and select Add trigger. Choose Kinesis from the drop-down and select your stream.
- To view the new control options, click the drop-down arrow in the Additional settings section.
- Enter the ARN of the SQS queue you previously created and set Maximum retry attempts to 1 and Maximum record age to 60. Click Add. Your Kinesis trigger is now set up, and the Lambda function has the permissions required to write to SQS and CloudWatch Logs.
In the Function code section, paste the following code that contains a syntax error to induce a failure response:
exports.handler = async (event) => {
console.log(event)
const response = {
statusCode: 200,
body: event,dfh
};
return response;
};
Next, trigger the Lambda function by adding a record to the Kinesis Stream via the AWS CLI. If you haven’t installed the AWS CLI yet, you can find guidance on how to do so. Run the following command in your terminal, replacing {YourStreamName}
with the name of your stream:
aws kinesis put-record --stream-name {YourStreamName} --partition-key 123 --data testdata
You should see a response similar to this (the sequence number will differ). In the AWS Lambda console, navigate to Monitoring > View logs in CloudWatch and select the most recent log group. You’ll observe that the Lambda function failed as expected, this time with just a single retry attempt due to the maximum retry attempt value being set to 1 and the record being younger than the maximum record age of 60 seconds.
Then, navigate to your SQS queue in the AWS Management Console under Services > SQS. Select your queue, then choose Queue Actions > View/Delete Messages > Start Polling for Messages. Here, you will find the metadata of your failed invocation, indicating it has been successfully sent to the SQS destination for further investigation or processing.
Asynchronous Function Invocations
When a function is invoked asynchronously, Lambda sends the event to a queue prior to processing it. If the function code results in an exception, Lambda retries the invocation twice with a one-minute delay before the first retry and a two-minute delay before the second. Some invocations may not execute at all due to throttling, leading to a similar overall processing duration.
For more insights into the workplace dynamics that affect younger generations, check out this blog post. Moreover, if you’re interested in learning about effective networking and professional development, this is a credible source you can explore.
Lastly, for information on how Amazon fulfillment centers train associates, visit this excellent resource.
Leave a Reply