Data Lake Change Data Capture (CDC) using Amazon Database Migration Service — Part 1 —Capture

Original article was published by Manoj Kukreja on Artificial Intelligence on Medium


Data Lake Change Data Capture (CDC) using Amazon Database Migration Service — Part 1 —Capture

Easily capture data changes over time from your database to Data Lake using Amazon Database Migration Service (DMS)

Image by Gino Crescoli from Pixabay

Over my past 10 years spent in the Big Data and Analytics world, I have come to realize that capturing and processing change data sets has been a challenging area. Thorough all these years I have seen how CDC has evolved. Let me take you though the journey:

Year 2011–2013 — For many Hadoop is the major Data Analytics Platform. Typically, Sqoop was used to transfer data from a given database to HDFS. This worked pretty well for full table loads. Sqoop incremental could capture inserts as well.

But CDC is not only about inserts. Where are my updates and deletes?

Year 2016 — We created a strategy to capture updates and deletes using triggers on database table and write changes to a shadow table. Once changed data is captured we would Sqoop to transfer the data over to HDFS. The method involved database modifications so a lot of our clients objected to it.

Year 2015–2016 — The use of an new open source project called Debezium was getting strong. For several years thereafter we used this CDC tool very effectively. Initially Debezium supported only a limited number of databases but that was enough to cover most of our use case.

Debezium is able to query the database binary log and extract changes. It published each change as a JSON document to Kafka.

Image by Author — Record before and After Image

Year 2016–Now — For AWS cloud deployments we typically use Amazon Database Migration Service (DMS). DMS can read change data sets from on-premises servers or RDS and publish it to many destinations including S3, Redshift, Kafka & Elasticsearch etc.

Let me how you how to create a sample CDC pipeline. We will start by creating an RDS database on AWS, create a sample database and finally setup Amazon DMS to perform change data capture to S3.

Lets start by downloading sample data file

$ git clone https://github.com/mkukreja1/blogs.git

Create a RDS Security Group and Open Ingress

$ aws ec2 delete-security-group --group-name "RDS Security Group"$ RDS_GROUP=` aws ec2 create-security-group --description sg-rds --group-name "RDS Security Group" | grep GroupId | sed 's/"GroupId"://' |  sed 's/"//g' |  sed 's/,//g'`;echo $RDS_GROUP$ aws ec2 authorize-security-group-ingress --group-id $RDS_GROUP  --protocol tcp --port 3306 --cidr 0.0.0.0/0# For security reasons you may want the replace 0.0.0.0/0 with your web facing IP. This will limit traffic originating from your IP address only. 

We will create a MySQL database. This database will be used as the source for CDC. Start by creating a RDS parameter group.

$ aws rds delete-db-parameter-group --db-parameter-group-name rds-mysql$ PG_ARN=`aws rds create-db-parameter-group --db-parameter-group-name rds-mysql --db-parameter-group-family MySQL5.7 --description "RDS Group" | grep DBParameterGroupArn | sed -e 's/"//g' -e 's/,//g'  -e 's/DBParameterGroupArn//g' -e 's/: //g' `;echo $PG_ARN$ aws rds modify-db-parameter-group --db-parameter-group-name rds-mysql --parameters "ParameterName=binlog_format, ParameterValue=ROW,ApplyMethod=immediate" "ParameterName=binlog_checksum,ParameterValue=None,ApplyMethod=immediate"

Create the RDS instance

$ aws rds delete-db-instance --db-instance-identifier fossil --skip-final-snapshot$ aws rds create-db-instance --db-instance-identifier fossil --db-instance-class db.t2.micro --engine mysql --region us-east-1 --output text --master-username admin --master-user-password admin123 --allocated-storage 20 --vpc-security-group-ids $RDS_GROUP --db-parameter-group-name rds-mysql --option-group-name default:mysql-5-7 --engine-version 5.7.30-- Wait for 5-10 minutes after this step

Install a MySQL Client and test connection to MySQL. Import data file to newly create database.

$ sudo yum -y install mysql$ RDS_ENDPOINT=`aws rds describe-db-instances --db-instance-identifier fossil | grep "Address" | sed 's/.*://'   | sed 's/"//g'    | sed 's/,//g'`;echo $RDS_ENDPOINT$ mysql -uadmin -padmin123 -h $RDS_ENDPOINT -e "DROP DATABASE IF EXISTS fossil;CREATE DATABASE fossil;grant REPLICATION CLIENT on *.* to admin;grant REPLICATION SLAVE on *.* to admin;"$ mysql -uadmin -padmin123 -h $RDS_ENDPOINT fossil < blogs/dms/energy.sql$ mysql -uadmin -padmin123 -h $RDS_ENDPOINT -e "use fossil;select count(*) from coal_prod"

Configure DMS. Start by creating the DMS replication instance.

$ SG_RDS=`aws ec2 describe-security-groups --group-names "RDS Security Group" | grep GroupId | sed -e 's/"//g' -e 's/,//g'  -e 's/GroupId//g' -e 's/: //g' `;echo $SG_RDS$ aws dms create-replication-instance --replication-instance-identifier rds-s3-dms --replication-instance-class dms.t2.micro --no-publicly-accessible --vpc-security-group-ids $SG_RDS$ REP_ARN=`aws dms describe-replication-instances | grep ReplicationInstanceArn | sed -e 's/"//g' -e 's/,//g'  -e 's/ReplicationInstanceArn//g' -e 's/: //g' `;echo $REP_ARN# wait 5 minutes for the above to finish

Create a DMS Source Endpoint. In this case the RDS instance created above will act as the source.

$ DMS_END_SOURCE=`aws dms create-endpoint --endpoint-identifier rds-end --endpoint-type source --server-name $RDS_ENDPOINT --engine-name mysql --username admin --password admin123 --port 3306 --database-name fossil | grep EndpointArn | sed -e 's/"//g' -e 's/,//g'  -e 's/EndpointArn//g' -e 's/: //g' `;echo $DMS_END_SOURCE

Test DMS Source Endpoint Connection. Proceed further only if successful.

$ aws dms test-connection --replication-instance-arn $REP_ARN --endpoint-arn $DMS_END_SOURCE
Image by Author — DMS Source Endpoint

Create DMS Role that has access to S3. We will use this role to define the DMS Destination Endpoint.

$ aws iam detach-role-policy --role-name dms-role --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess$ aws iam delete-role --role-name dms-role$ DMS_ROLE=`aws iam create-role --role-name dms-role --assume-role-policy-document file://blogs/dms/policy.json | grep Arn | sed -e 's/"//g' -e 's/,//g'  -e 's/Arn//g' -e 's/ //g' -e 's/://' `;echo $DMS_ROLE$ aws iam attach-role-policy --role-name dms-role --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess# you ay want to tighten up the above policy to limit access to specific buckets only$ aws iam create-role --role-name dms-vpc-role --assume-role-policy-document file://blogs/dms/policy.json$ aws iam attach-role-policy --role-name dms-vpc-role --policy-arn arn:aws:iam::aws:policy/service-role/AmazonDMSVPCManagementRole

Create the DMS Destination Endpoint

$ S3_JSON="{\"ServiceAccessRoleArn\": \"$DMS_ROLE\",\"BucketFolder\": \"raw/dms\",\"BucketName\": \"aws-analytics-course\",\"DataFormat\": \"csv\", \"IncludeOpForFullLoad\": true }";echo $S3_JSON >s3.json;cat s3.json$ DMS_END_DEST=`aws dms create-endpoint --endpoint-identifier s3-end --engine-name s3 --endpoint-type target --s3-settings file://s3.json | grep EndpointArn | sed -e 's/"//g' -e 's/,//g'  -e 's/EndpointArn//g' -e 's/: //g' `;echo $DMS_END_DEST$ aws dms test-connection --replication-instance-arn $REP_ARN --endpoint-arn $DMS_END_DEST
Image by Author — DMS DestinationEndpoint

Create & Run the DMS Task. In the first run this task will fetch full data from all tables in the source endpoint and replicate data to the destination endpoint. After that the replication instance tracks changes on the source endpoint and promptly delivers them to the destination. While this process the replication instance maintains the log of each table.

$ aws dms create-replication-task     --replication-task-identifier rdstos3     --source-endpoint-arn $DMS_END_SOURCE     --target-endpoint-arn $DMS_END_DEST     --replication-instance-arn $REP_ARN     --migration-type full-load-and-cdc     --table-mappings file://blogs/dms/table-mappings.json$ TASK_ARN=` aws dms describe-replication-tasks | grep ReplicationTaskArn | sed -e 's/"//g' -e 's/,//g'  -e 's/ReplicationTaskArn//g' -e 's/ //g' -e 's/://' `;echo $TASK_ARN$ aws dms start-replication-task --replication-task-arn $TASK_ARN  --start-replication-task-type reload-target
Image by Author — Status of Replication Task

Once the DMS job has run we can check data for Full Load on S3

$ aws s3 ls aws-analytics-course/raw/dms/$ aws s3 ls aws-analytics-course/raw/dms/
PRE fossil/
$ aws s3 ls aws-analytics-course/raw/dms/fossil/
PRE coal_prod/
PRE fossil_capita/
PRE gas_prod/
PRE oil_prod/
$ aws s3 ls aws-analytics-course/raw/dms/fossil/coal_prod/
2020-07-13 18:08:09 326026 LOAD00000001.csv

Notice that every record has been tagged with a DML operation, all rows tagged Insert (I) in this case because this is the first time loading data from source to destination.

Image by Author

Lets perform dome more DML operations on the source database — INSERTS, UPDATES & DELETES. Start with a couple of INSERTS.

$ mysql -uadmin -padmin123 -h $RDS_ENDPOINT fossil -e "INSERT INTO fossil.coal_prod VALUES('India', 'IND', 2015, 4056.33, 0.00);INSERT INTO fossil.coal_prod VALUES('India', 'IND', 2016, 4890.45, 0.00)"
Image by Author

Note the file on S3 has tagged the newly inserted rows as ‘I’. Now lets send an UPDATE.

$ mysql -uadmin -padmin123 -h $RDS_ENDPOINT fossil -e "UPDATE fossil.coal_prod SET Production=2845.66, consumption=145.66 WHERE Entity='India' AND Year=2013"
Image by Author

Note the file on S3 has tagged the newly inserted rows as ‘U’. Finally, send a DELETE.

$ mysql -uadmin -padmin123 -h $RDS_ENDPOINT fossil -e "DELETE FROM fossil.coal_prod WHERE Entity='India' AND Year=2010"
Image by Author

Note the file on S3 has tagged the newly inserted rows as ‘D’.

What is all the fuss about? After all it is only INSERTS, UPDATES and DELETES.

Now that we have a continuous replication in place the Source and Destination ends will be kept in sync. In Part 2 of this article I will be showing you how to ingest the CDC into a Data Lake using Apache HUDI.

If you are using this article for testing/education don’t forget to cleanup your AWS resources once done. If using in production the DMS resources will remain permanently deployed.

$ aws dms stop-replication-task --replication-task-arn $TASK_ARN$ aws dms delete-replication-task --replication-task-arn $TASK_ARN$ aws dms delete-endpoint --endpoint-arn $DMS_END_SOURCE$ aws dms delete-endpoint --endpoint-arn $DMS_END_DEST$ aws dms delete-replication-instance --replication-instance-arn $REP_ARN$ aws rds delete-db-instance --db-instance-identifier fossil --skip-final-snapshot

All the code for this comparison can be found on the link below:

I hope this article was helpful. CDC using Amazon Database Migration Service is covered as part of the AWS Big Data Analytics course offered by Datafence Cloud Academy. The course is taught online by myself on weekends.