Revitalize Your Legacy Databases with AWS Data Lakes: Part 2 – Creating a Data Lake Using AWS DMS Data on Apache Iceberg

Revitalize Your Legacy Databases with AWS Data Lakes: Part 2 - Creating a Data Lake Using AWS DMS Data on Apache IcebergMore Info

This is the second installment in a three-part series that guides you through building a data lake on AWS using a contemporary data architecture. In this post, we detail how to transfer data from a legacy SQL Server database into a transactional data lake utilizing Apache Iceberg and AWS Glue. We will explain how to establish data pipelines with AWS Glue jobs, optimize them for cost-efficiency and performance, and implement schema evolution to automate manual tasks. For a recap of the first part of this series, which covers migrating SQL Server data into Amazon Simple Storage Service (Amazon S3) using AWS Database Migration Service (AWS DMS), check out the post here.

Overview of the Solution

In this article, we will walk through the process of constructing a data lake, discussing the rationale behind various decisions and sharing best practices for building such a solution.

The diagram below illustrates the different layers within the data lake.

To load data into the data lake, AWS Step Functions can orchestrate a workflow, Amazon Simple Queue Service (Amazon SQS) can manage the sequence of incoming files, and AWS Glue jobs, along with the Data Catalog, can be utilized to create the silver layer of the data lake. AWS DMS generates files and deposits these into the bronze bucket (as outlined in Part 1).

We can enable Amazon S3 notifications to send the names of new files to an SQS first-in-first-out (FIFO) queue. A Step Functions state machine can then process messages from this queue, ensuring files are handled in the order they arrive.

For processing these files, we need to configure two types of AWS Glue jobs:

  1. Full Load – This job imports the complete data dump of the table into an Iceberg table. The data types from the source are mapped to the corresponding Iceberg data types. Once the data is loaded, the job updates the Data Catalog with the table schemas.
  2. Change Data Capture (CDC) – This job loads the change data capture files into the appropriate Iceberg tables. The AWS Glue job utilizes Iceberg’s schema evolution feature to accommodate changes such as adding or removing columns.

Just like in Part 1, the AWS DMS jobs will deposit both full load and CDC data from the source database (SQL Server) into the raw S3 bucket. We will then process this data using AWS Glue and save it to the silver bucket in Iceberg format. AWS Glue offers a plugin for Iceberg; for additional details, see Using the Iceberg framework in AWS Glue.

As we transfer data from the bronze to the silver bucket, we will also create and update the Data Catalog for further processing toward the gold bucket.

The diagram below illustrates how the full load and CDC jobs are defined within the Step Functions workflow.

In this post, we focus on the AWS Glue jobs that form the workflow. We recommend utilizing AWS Step Functions Workflow Studio, along with Amazon S3 event notifications and an SNS FIFO queue to receive filenames as messages.

Prerequisites

Before following the solution, ensure you have the following prerequisites set up along with specific access rights and AWS Identity and Access Management (IAM) privileges:

  • An IAM role to execute Glue jobs
  • IAM permissions to create AWS DMS resources (this role was established in Part 1 of this series; you can reuse the same role here)
  • The AWS DMS job from Part 1 functioning and producing files for the source database on Amazon S3.

Establish an AWS Glue Connection for the Source Database

We need to create a connection between AWS Glue and the source SQL Server database so that the AWS Glue job can query the source for the latest schema while loading the data files. To create the connection, follow these steps:

  1. On the AWS Glue console, select Connections from the navigation pane.
  2. Click on Create custom connector.
  3. Assign a name to the connection and select JDBC as the connection type.
  4. In the JDBC URL section, input the following string and replace the placeholders with your source database endpoint and database name: jdbc:sqlserver://{Your RDS End Point Name}:1433/{Your Database Name}.
  5. Choose Require SSL connection, then click Create connector.

Create and Configure the Full Load AWS Glue Job

Complete the following steps to set up the full load job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select Script editor and choose Spark.
  3. Click Start fresh and select Create script.
  4. Provide a name for the full load job and select the IAM role (as mentioned in the prerequisites) for running the job.
  5. Finalize the job creation.
  6. On the Job details tab, expand Advanced properties.
  7. In the Connections section, add the connection you created.
  8. Under Job parameters, pass the following arguments to the job:
    • target_s3_bucket – The name of the silver S3 bucket.
    • source_s3_bucket – The name of the raw S3 bucket.
    • secret_id – The ID of the AWS Secrets Manager secret for the source database credentials.
    • dbname – The name of the source database.
    • datalake-formats – This sets the data format to Iceberg.

The full load AWS Glue job initiates once the AWS DMS task reaches 100%. The job iterates over the files located in the raw S3 bucket and processes each one sequentially. For every file, the job infers the table name from the filename and retrieves the source table schema, including column names and primary keys.

If the table has one or more primary keys, the job creates a corresponding Iceberg table. If the job lacks primary keys, the file will not be processed. In our case, all tables have primary keys, enforcing this check. Depending on your data, you might need to manage this situation differently.

To process the full load files, use the following code snippet. To start the job, select Run.

import sys, boto3, json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

# Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket'])
dbname = "AdventureWorks"
schema = "HumanResources"

# Initialize parameters
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns

# Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

# Helper Function: Load Iceberg table with Primary key(s)
def load_table(full_load_data_df, dbname, table_name):

    try:
        full_load_data_df = full_load_data_df.drop(*drop_column_list)
        full_load_data_df.createOrReplaceTempView('full_data')

        query = """
        CREATE TABLE IF NOT EXISTS glue_catalog.{0}.{1}
        USING iceberg
        LOCATION "s3://{2}/{0}/{1}"
        AS SELECT * FROM full_data
        """.format(dbname, table_name, target_s3_bucket)

For more detailed insights, you might find this blog post helpful: Chanci Turner VGT2. Moreover, if you’re looking for authoritative content on this topic, check out CHVNCI. Lastly, for those interested in joining Amazon, this resource on onboarding is excellent: Holly Lee’s Guide.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *