Explore our expert-made templates & start with the right one for you.
Build an ETL pipeline for CDC events (model, redact, enrich)
Change data capture (CDC) enables you to replicate changes from transactional databases to your data lake and data warehouse. Once data has been replicated it needs to be modeled, cleaned and transformed to meet business needs. In this tutorial you will learn how to clean, redact and model a CDC table managed in Upsolver SQLake. You will then write the transformed table into the data lake for analysis.
(Need a primer on the basics? Check out our Guide to Change Data Capture.)
Run it in two easy steps!
1. Configuring PostgreSQL for CDC replication
2. Transform the replicated data and load results to the data lake
Configuring PostgreSQL for CDC replication
The first thing you need to do is enable CDC on your source database. Then you can configure SQLake to listen to change events and replicate them to your data lake. Follow the steps in the tutorial How to replicate a PostgreSQL table to your data lake to get started. After you complete the tutorial, continue to the next step below.
Note: After configuring your source database, an easy way to follow along with this tutorial would be to launch the Replicating PostgreSQL Tables to the Data Lake template in SQLake. After you create the ingestion job, you can copy, paste and execute the code below.
Transform the replicated data and load results to the data lake
After connecting SQLake to your database, the ingestion job will continuous update a staging table with the latest change events. The staging table is a wide, denormalized table that holds data from all of the source tables you chose to replicate. Oftentimes, it’s beneficial to clean and prepare the source data to fit the needs of analysts. For example, an e-commerce application will use a table for customer payment information. This detail is needed by the application, but certain columns, like credit card numbers, are personal data not required for the purpose of analytics. In this step, you prepare the data for analytics by renaming columns, updating timestamps to reflect local timezone, redacting sensitive information, and adding computed columns to enrich the data, and more.
To demonstrate this, you will read and transform the creditcard table from the staging table. The data will be modeled and masked and then written to the data lake for analysis. SQLake will automatically merge inserts, updates and deletes to ensure that the source and targets are in sync.
Start by creating a target table in the data lake to hold the transformed data. The syntax below includes a PRIMARY KEY clause which holds the value of the primary key from the source table. This key will be mapped to credit_card_id and used to merge inserts, updates and deletes on the output table to keep it consistent with the source.
CREATE TABLE default_glue_catalog.database_11f174.sales_cc ( credit_card_id bigint ) PRIMARY KEY credit_card_id;
Next, create a job that will run every 1 minute and insert, update, or delete rows into the output table for each CDC event that has been ingested. The primary_key column is used to identify similar rows between the staging table and the output table. The merge logic is as follows:
- WHEN MATCHED AND is_delete deletes the record in the output table when deleted in the source.
- WHEN MATCHED THEN REPLACE updates the target table when the primary key matches.
- WHEN NOT MATCHED THEN INSERT inserts the new record into the output table when the primary key does not match.
Now you’re ready to build the job that transforms the data. You’ll perform the following:
- Rename columns to be more descriptive
- Convert Epoch time into a valid date/time format
- Mask sensitive information such as credit card numbers
CREATE SYNC JOB sync_pg_creditcard_demo START_FROM = BEGINNING ADD_MISSING_COLUMNS = TRUE RUN_INTERVAL = 1 MINUTE AS MERGE INTO default_glue_catalog.database_11f174.sales_cc AS target USING ( SELECT masked_cc AS credit_card_number, cardtype AS credit_card_type, expmonth AS expiration_month, expyear AS expiration_year, FROM_UNIXTIME(modifieddate/1000000) AS modified_timestamp, $primary_key AS credit_card_id, $is_delete AS is_delete FROM default_glue_catalog.database_11f174.postgres_raw_data LET last_4 = SUBSTRING(cardnumber,-4,4), first_4 = SUBSTRING(cardnumber,1,4), masked_cc = first_4 || '-xxxx-' || last_4 WHERE $full_table_name = 'AdventureWorks.sales.creditcard' AND $event_time BETWEEN run_start_time() AND run_end_time() ) AS source ON (target.credit_card_id = source.credit_card_id) WHEN MATCHED AND is_delete THEN DELETE WHEN MATCHED THEN REPLACE WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT is_delete;
Execute the job and wait a few minutes for data to begin to populate the output table. Run the query below, substituting the database and table names for yours, to verify that data is available in the output table.
Summary
SQLake provides a straightforward CDC capability that allows you to easily replicate tables from your operational PostgreSQL database to the data lake and data warehouse. Before data can be presented to you data analysts, data scientists and BI users, it needs to be modeled, cleaned, secured and prepared. It is simple to do so directly in SQLake by creating a job with your SQL-based transformation logic that automatically updates the data as it is being merged into the target table. Although data preparation can be performed after the data has been loaded into the data warehouse, it is more cost-effective and secure to perform it prior to loading it into the target.
That’s it! To try this out for yourself by launching the Replicating PostgreSQL Tables to the Data Lake template in SQLake!