In the second installment of our three-part series, we explore the creation of a data lake on AWS utilizing contemporary data architecture. This article focuses on transferring data from a legacy SQL Server database into a transactional data lake powered by Apache Iceberg through AWS Glue. We’ll outline how to develop data pipelines with AWS Glue jobs, enhance them for cost-effectiveness and performance, and implement schema evolution to automate previously manual tasks. If you missed the first part, where we transferred SQL Server data into Amazon Simple Storage Service (Amazon S3) using AWS Database Migration Service (AWS DMS), check out Revitalize Your Legacy Databases with AWS Data Lakes, Part 1: Migrate SQL Server using AWS DMS.
Solution Overview
This article details the approach to constructing a data lake, providing insights into the reasoning behind various decisions and sharing best practices for building such a solution.
The accompanying diagram illustrates the different tiers of the data lake.
To populate the data lake, AWS Step Functions can orchestrate workflows, Amazon Simple Queue Service (Amazon SQS) can monitor the sequence of incoming files, and AWS Glue jobs along with the Data Catalog can be utilized to develop the silver layer of the data lake. AWS DMS generates files and stores them in the bronze bucket, as previously discussed in Part 1.
We can enable Amazon S3 notifications to send the names of newly arrived files to an SQS FIFO queue. A Step Functions state machine can consume these messages from the queue, processing the files in the order they were received.
AWS Glue Jobs
For file processing, we need to create two types of AWS Glue jobs:
- Full Load – This job loads the complete table data into an Iceberg table, mapping data types from the source to corresponding Iceberg types. Once loaded, the job updates the Data Catalog with the table schemas.
- CDC – This job processes change data capture (CDC) files into their respective Iceberg tables. The AWS Glue job employs Iceberg’s schema evolution feature to manage changes such as adding or deleting columns.
As highlighted in Part 1, the AWS DMS jobs will place full load and CDC data from the SQL Server source database into the raw S3 bucket. We will then use AWS Glue to process this data and save it in the silver bucket in Iceberg format. AWS Glue features a plugin for Iceberg; for more details, refer to this authority on the topic.
Alongside transferring data from the bronze to the silver bucket, we will also create and update the Data Catalog to facilitate further processing for the gold bucket.
The subsequent diagram demonstrates how the full load and CDC jobs are incorporated within the Step Functions workflow.
In this article, we focus on the AWS Glue jobs necessary for defining the workflow. We recommend utilizing AWS Step Functions Workflow Studio and configuring Amazon S3 event notifications along with an SNS FIFO queue to receive file names as messages.
Prerequisites
To implement the solution, ensure you have the following prerequisites in place, along with the necessary access rights and AWS Identity and Access Management (IAM) permissions:
- An IAM role for executing Glue jobs
- IAM privileges for creating AWS DMS resources (this role was established in Part 1 of this series, and you may reuse it here)
- The AWS DMS job from Part 1 operational and generating files for the source database in Amazon S3.
Create an AWS Glue Connection for the Source Database
A connection must be established between AWS Glue and the source SQL Server database to allow the AWS Glue job to access the latest schema while loading data files. To create the connection, follow these steps:
- In the AWS Glue console, navigate to Connections in the sidebar.
- Select Create custom connector.
- Assign a name to the connection and choose JDBC as the connection type.
- In the JDBC URL section, input the following string, replacing it with your source database endpoint and database name set up in Part 1:
jdbc:sqlserver://{Your RDS End Point Name}:1433/{Your Database Name}
. - Check the box for Require SSL connection, and then select Create connector.
Create and Configure the Full Load AWS Glue Job
Follow these steps to establish the full load job:
- In the AWS Glue console, navigate to ETL jobs in the sidebar.
- Choose Script editor and select Spark.
- Select Start fresh and click Create script.
- Name the full load job and select the IAM role mentioned in the prerequisites for job execution.
- Complete the job creation process.
- In the Job details tab, expand Advanced properties.
- In the Connections section, add the connection you created.
- Under Job parameters, input the following arguments for the job:
target_s3_bucket
– The name of the silver S3 bucket.source_s3_bucket
– The name of the raw S3 bucket.secret_id
– The ID of the AWS Secrets Manager secret for the source database credentials.dbname
– The name of the source database.datalake-formats
– This specifies the data format as Iceberg.
The full load AWS Glue job will initiate once the AWS DMS task reaches 100%. The job processes files from the raw S3 bucket sequentially. For each file, it derives the table name from the file name and retrieves the source table schema, including column names and primary keys.
If the table contains one or more primary keys, the job creates a corresponding Iceberg table. If there are no primary keys, the file will not be processed. In our scenario, all tables are required to have primary keys, thus enforcing this condition. Depending on your data, you may need to address this situation differently.
You can utilize the following code to handle the full load files. To initiate the job, click Run.
import sys, boto3, json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
#Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
'target_s3_bucket',
'secret_id',
'source_s3_bucket'])
dbname = "AdventureWorks"
schema = "HumanResources"
#Initialize parameters
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time'] # DMS added columns
#Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
secretsmanager = boto3.client('secretsmanager')
response = secretsmanager.get_secret_value(SecretId=secret_id)
secrets = json.loads(response['SecretString'])
return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']
#Helper Function: Load Iceberg table with Primary key(s)
def load_table(full_load_data_df, dbname, table_name):
try:
full_load_data_df = full_load_data_df.drop(*drop_column_list)
full_load_data_df.createOrReplaceTempView('full_data')
query = """
CREATE TABLE IF NOT EXISTS glue_catalog.{0}.{1}
USING iceberg
LOCATION "s3://{2}/{0}/{1}"
AS SELECT * FROM full_data
""".format(dbname, table_name, target_s3_bucket)
# Execute the query here
For additional insights on similar topics, visit this engaging content. Also, explore this excellent resource for understanding processes at Amazon.
Leave a Reply