As the adoption of data lakes and modern data architectures continues to rise, the expectations of customers regarding features such as ACID transactions, UPSERT capabilities, time travel, schema evolution, and auto compaction have also increased. Amazon Simple Storage Service (Amazon S3) objects are immutable by default, meaning that updating records in your data lake is not straightforward since it only supports append-only transactions. However, there may be scenarios where you receive incremental updates via change data capture (CDC) from your source systems, necessitating updates to existing data in Amazon S3 to maintain a golden copy. In the past, this required overwriting entire S3 objects or folders, but with the advent of frameworks like Apache Hudi, Apache Iceberg, Delta Lake, and governed tables in AWS Lake Formation, you can now achieve database-like UPSERT functionality in Amazon S3.
Integration of Apache Hudi with AWS analytics services has been established, and more recently, AWS Glue, Amazon EMR, and Amazon Athena have added support for Apache Iceberg. Originally developed at Netflix, Apache Iceberg is an open table format that was open-sourced as an Apache project in 2018 and graduated from the incubator in mid-2020. It is crafted to support ACID transactions and UPSERT operations on petabyte-scale data lakes, gaining popularity due to its flexible SQL syntax for CDC-based MERGE, full schema evolution, and hidden partitioning features.
In this article, we will guide you through a solution that implements CDC-based UPSERT or MERGE within an S3 data lake using Apache Iceberg and AWS Glue.
Configuring Apache Iceberg with AWS Glue
Integrating Apache Iceberg JARs into AWS Glue can be accomplished via its AWS Marketplace connector. The connector is compatible with AWS Glue versions 1.0, 2.0, and 3.0, and is free to use. Setting up this connector is as straightforward as a few clicks in the user interface.
Follow these steps for configuration:
- Go to the AWS Marketplace connector page.
- Click on “Continue to Subscribe” and accept the terms.
- Proceed to configuration by selecting the AWS Glue version and software version.
- Click “Continue to Launch.”
- Access the usage instructions page to find the link for activating the connector.
- Create a connection by naming it and selecting “Create connection” to activate the connector.
You can verify your new connection on the AWS Glue Studio Connectors page.
When creating an AWS Glue job, ensure that you include this connector in your job. In the subsequent implementation steps, we will illustrate how to use the connector you have just set up.
Solution Overview
Imagine you have a relational database containing product inventory data which you wish to continuously transfer to an S3 data lake for analytics purposes. Following the initial data migration to Amazon S3, you will receive incremental updates from the source database in the form of CSV files using AWS DMS or similar tools. Each record will include an additional column indicating whether the operation is an insert, update, or delete. A primary requirement during the processing of these incremental CDC updates is to merge this data in the data lake while providing the ability to query previous versions of the data.
To address this use case, we propose a simple architecture that integrates Amazon S3 for the data lake, AWS Glue with the Apache Iceberg connector for ETL, and Athena for querying the data using standard SQL. Athena allows you to query the most recent product inventory data from the latest snapshot of the Iceberg table, and Iceberg’s time travel feature enables you to check a product’s price on any previous date.
The diagram below illustrates the solution architecture.
The workflow consists of these steps:
Data Ingestion:
- Steps 1.1 and 1.2 detail the use of AWS Database Migration Service (AWS DMS) to connect to the source database and transfer incremental data (CDC) to Amazon S3 in CSV format.
- Steps 1.3 and 1.4 involve an AWS Glue PySpark job, which reads this incremental data from the S3 input bucket, deduplicates the records, and executes Apache Iceberg’s MERGE statements to combine the data with the target UPSERT S3 bucket.
Data Access:
- Steps 2.1 and 2.2 illustrate the integration of Athena to query data from the Iceberg table using standard SQL while validating Iceberg’s time travel feature.
Data Catalog:
- The AWS Glue Data Catalog serves as a centralized catalog utilized by AWS Glue and Athena. An AWS Glue crawler is configured to automatically detect the schema in the S3 buckets.
While we reference AWS DMS as part of the architecture, the solution steps will assume that the AWS DMS output is already available in Amazon S3, focusing instead on data processing with AWS Glue and Apache Iceberg.
To demonstrate the implementation, we will use sample product inventory data with the following attributes:
- op – This indicates the operation on the source record, with values such as I for inserts, U for updates, and D for deletes. Ensure this attribute is present in your CDC incremental data prior to writing to Amazon S3. AWS DMS allows you to include this attribute, and if you’re using other methods to transfer data, you must capture it so your ETL logic can appropriately handle it during merging.
- product_id – The primary key column from the source database’s product table.
- category – The product’s category, such as Electronics or Cosmetics.
- product_name – The name of the product.
- quantity_available – This denotes the inventory quantity for the product. For the incremental updates showcasing UPSERT or MERGE, we will reduce the quantity available to demonstrate functionality.
- last_update_time – The timestamp of when the product record was last updated in the source database.
If you are utilizing AWS DMS to transfer data from your relational database to Amazon S3, the op attribute is included by default for incremental CDC data but is not automatically included for the initial load. If using CSV as your target format, you can set IncludeOpForFullLoad to true in your S3 target endpoint settings to include this attribute in your initial full load file. For more details on the Amazon S3 settings in AWS DMS, refer to the documentation.
To implement this solution, we will create AWS resources such as an S3 bucket and an AWS Glue job, and we will integrate the Iceberg code for processing. Before executing the AWS Glue job, we must upload the sample CSV files to the input bucket and process them using AWS Glue PySpark code for the output.
Prerequisites
Before starting the implementation, ensure you have the necessary permissions in your AWS account to:
- Create AWS Identity and Access Management (IAM) roles as required
- Read or write to an S3 bucket
- Create and run AWS Glue crawlers and jobs
- Manage a database, table, and workgroups, and run queries in Athena
This article is specifically centered around the us-east-1 Region, but it can be adapted to other regions as needed. For further insights on this topic, you might find this blog post helpful: Chanci Turner’s Blog. For authoritative guidance on implementing data lakes, check out CHVNCI’s resource which is an excellent reference. Additionally, if you’re interested in career opportunities, consider exploring this Learning Trainer position at Amazon.
Leave a Reply