Implementing a CDC-Based UPSERT in a Data Lake Using Apache Iceberg and AWS Glue

Implementing a CDC-Based UPSERT in a Data Lake Using Apache Iceberg and AWS GlueMore Info

Author: Alex Thompson

Date: 15 JUN 2022

Category: Analytics, AWS Big Data, AWS Glue, Serverless

As the adoption of data lakes and advanced data architectures continues to grow, so too do customer expectations regarding features such as ACID transactions, UPSERT capabilities, time travel, schema evolution, and automated compaction. Notably, Amazon Simple Storage Service (Amazon S3) objects are immutable by default, meaning records within your data lake cannot be modified since the platform only supports append-only transactions. However, many use cases require handling incremental updates via change data capture (CDC) from source systems, necessitating updates to the data stored in Amazon S3 to maintain a golden copy. Traditionally, this meant overwriting entire S3 objects or directories, but with the emergence of frameworks like Apache Hudi, Apache Iceberg, Delta Lake, and governed tables in AWS Lake Formation, users can now enjoy database-like UPSERT functionalities in Amazon S3.

Apache Hudi is already integrated with AWS analytics services, and recently AWS Glue, Amazon EMR, and Amazon Athena have announced support for Apache Iceberg. Originally developed by Netflix, Apache Iceberg became an open-source project in 2018 and graduated from the incubator in mid-2020. It is designed to enable ACID transactions and UPSERT operations on petabyte-scale data lakes and has gained popularity due to its flexible SQL syntax for CDC-based MERGE operations, comprehensive schema evolution, and hidden partitioning features.

In this article, we guide you through a solution to implement CDC-based UPSERT or MERGE in an Amazon S3 data lake using Apache Iceberg and AWS Glue.

Configuring Apache Iceberg with AWS Glue

You can easily integrate Apache Iceberg JARs into AWS Glue via its AWS Marketplace connector, which supports AWS Glue versions 1.0, 2.0, and 3.0 and is available free of charge. The configuration process is straightforward and can be accomplished with just a few clicks on the user interface.

Follow these steps to set up the connector:

  1. Navigate to the AWS Marketplace connector page.
  2. Click on “Continue to Subscribe” and accept the terms.
  3. Proceed to configuration by selecting the AWS Glue and software versions.
  4. Click “Continue to Launch.”
  5. Access Usage Instructions to find a link to activate the connector.
  6. Create a connection by providing a name and selecting “Create connection” to activate the connector.

You can verify your new connection on the AWS Glue Studio Connectors page. When creating an AWS Glue job, ensure you add this newly configured connector.

Solution Overview

Consider a scenario where you have a relational database containing product inventory data, and you wish to transfer it continuously into an S3 data lake for analytics purposes. After the initial data transfer to Amazon S3, you will receive incremental updates from the source database in CSV format using AWS DMS or similar tools, with each record including an additional column representing the type of operation: insert, update, or delete. A critical requirement during the processing of incremental CDC data is to merge it into the data lake and enable querying of historical versions.

To address this use case, we present a simple architecture that integrates Amazon S3 for the data lake, AWS Glue with the Apache Iceberg connector for ETL (extract, transform, load), and Athena for querying data via standard SQL. Athena allows for querying the latest product inventory data from the most recent snapshot of the Iceberg table, while Iceberg’s time travel feature enables users to identify product pricing at any previous date.

The solution workflow comprises the following key steps:

  • Data Ingestion: Utilizing AWS Database Migration Service (AWS DMS) to connect to the source database and transfer incremental CDC data to Amazon S3 in CSV format. AWS Glue PySpark job then reads this data from the S3 input bucket, deduplicates the records, and executes Apache Iceberg’s MERGE statements to update the target UPSERT S3 bucket.
  • Data Access: Athena is integrated to query data from the Iceberg table using standard SQL and to validate Iceberg’s time travel capability.
  • Data Catalog: The AWS Glue Data Catalog serves as a centralized schema registry for both AWS Glue and Athena, with an AWS Glue crawler automatically detecting the schema on S3 buckets.

Though we reference AWS DMS in the architecture, we will focus on processing data using AWS Glue and Apache Iceberg, assuming the AWS DMS output is already accessible in Amazon S3.

For demonstration purposes, we will work with sample product inventory data containing attributes such as:

  • op: Indicates the operation type on the source record, with values I for insert, U for update, and D for delete. Ensure this attribute is included in your CDC incremental data before writing to Amazon S3. AWS DMS can include this attribute, but if you use other methods, ensure it is captured so that your ETL logic can handle it correctly.
  • product_id: The primary key in the source database’s products table.
  • category: Represents the product’s category, such as Electronics or Cosmetics.
  • product_name: The name of the product.
  • quantity_available: Displays the amount of inventory available for each product. For the demonstration, we will reduce this quantity to illustrate the functionality.
  • last_update_time: The timestamp of the most recent update to the product record in the source database.

If using AWS DMS, it automatically includes the op attribute for incremental CDC data but not for the initial load. To include this attribute in full loads when using CSV, set IncludeOpForFullLoad to true in the S3 target endpoint settings of AWS DMS. For further information on Amazon S3 settings in AWS DMS, visit this link.

To implement the solution, we will set up AWS resources such as an S3 bucket and an AWS Glue job while integrating the Iceberg code for processing. Before executing the AWS Glue job, we must upload the sample CSV files into the input bucket.

Prerequisites

Before beginning the implementation, ensure you have the necessary permissions in your AWS account:

  • Create AWS Identity and Access Management (IAM) roles as needed.
  • Read or write access to an S3 bucket.
  • Create and execute AWS Glue crawlers and jobs.
  • Manage databases, tables, and workgroups, and run queries in Athena.

For this discussion, we will use the us-east-1 region. For more insights on this topic, you can check out this authoritative source, or visit this excellent resource for additional information.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *