Upsolver joins Qlik to deliver real-time data, Iceberg optimizations and cost savings under a single platform

CDC to Iceberg: 4 Major Challenges, and How We Solved Them

Change Data Capture or CDC is a feature of modern databases that replicates row-level changes to an external system. In databases like PostgreSQL, MySQL and MS SQL Server, CDC is implemented using a Write Ahead Log that keeps track of all rows that have been inserted, updated or deleted. Clients, like Upsolver and Debezium, read events from this log and replicate them to reproduce the original table in the target system.

Apache Iceberg is an open table format for storing, updating and optimizing columnar data in object stores like Amazon S3, and as such, a key innovation driving adoption of the Lakehouse architecture. Lakehouse enables users to access data and collaborate using their engine of choice, like Snowflake, Databricks, Trino, Spark and DuckDB. Furthermore, it enables you to minimize vendor lock-in and greatly reduce data processing and storage costs, which are on everyone’s mind.

While there are many tools that offer managed CDC into various data warehouses such as Snowflake and Redshift, we couldn’t find anything that really works for Iceberg – so we decided to build it. Let’s take a look at the most difficult challenges with delivering CDC to Iceberg, and how we solved them in Upsolver. Think we missed something? Tell our product team.

The hard part about updating data in object stores

Object stores like Amazon S3 are designed to store immutable objects. These objects range from text files to videos, images to binary representation of tabular data (e.g, Apache Parquet). Object stores expect that if an update is to be made, the old file must first be deleted and only then be replaced with a new, modified, file. This however is an IO intensive and slow process especially when performed frequently as required by CDC.

Traditionally, this was solved by creating append only tables where each row is marked with  a row type of either Insert, Update or Delete. Later during a long ETL process, rows in the append table are merged based on the row type and a primary key (at a minimum). The drawback is that due to the long ETL process, target tables were always a few hours behind the originals in the source database, resulting in stale data for analysts, ML engineers and business users.

Iceberg was designed to address this issue, among other things. With Iceberg, engines can easily insert, update and delete rows in a target table and the underlying implementation handles the rest. For slow changing data, say every 30 minutes or an hour or two, this works well. But, CDC requires inserting, updating and deleting lots of rows, across many tables, every single minute. 

This is why traditional ETL or ELT tools that perform CDC to the warehouse or data lake don’t work well (or at all) for Iceberg. Specifically, there are 4 key challenges these tools struggle to solve.

4 key challenges that make CDC to Iceberg difficult, and what we did about them

1. Lack of Primary Key enforcement

Apache Iceberg doesn’t inherently define or enforce primary key (PK) constraints. This makes implementing CDC more difficult because it requires you to ensure PKs are available in the source and define the merge logic needed to properly insert, update or delete rows. 

Spark, Flink, Trino and commercial CDC tools require you to designate a PK column from the source that you will use to MERGE with current data in the target table. This is an artificial mapping that you must remember and maintain. There is no built-in validation that this column exists or is adequately populated. NULL or missing values for a PK column will at best result in a failed CDC merge and at worst corrupt your tables, forcing you to rebuild them.

Upsolver solves this problem by enforcing primary key constraints in two ways:

  1. Allowing you to define one or more PK columns when creating a new target Iceberg table which maps to the source PK columns. Upsolver uses these columns when upserting rows in the target table.

For direct ingestion and 0-ETL use cases, you don’t need to write MERGE code; Upsolver automatically uses the PK column to perform upserts on the target table – keep it simple. If a PK column is not used, rows are simply inserted. If you have unique merge requirements, you can easily implement the MERGE operation using Upsolver’s Iceberg Live Tables.

Here is how to create an Iceberg table with a PK column (read more):

CREATE ICEBERG TABLE my_iceberg_table
   col_id Bigint,
   col_1 String,
   col_2 String
PRIMARY KEY col_id;
  1. Allowing you to define data quality expectations on ingestion. Expectations validate the PK’s value before attempting to merge rows into the target table, thereby minimizing the possibility of bad data breaking your downstream tables. Expectations can be configured to alert you of an issue or block a row from being inserted altogether. Here is an expectation to validate if a PK column is not Null (read more):
WITH EXPECTATION exp_orderid_not_null 
     EXPECT orderid IS NOT NULL 
     ON VIOLATION DROP

2. Poor Merge On Read (MoR) performance

Merge-on-read is a method for updating Iceberg tables by writing changed and deleted rows into separate data files, alongside the base table data files. At query time, engines merge together these change, delete and base data files to produce the most current representation of the table. Periodically, these files are compacted together, merging changes in the process to produce larger, more read optimized files. Unlike the Copy-on-Write method, MoR tables allow for fast, concurrent writes, which are a must for CDC.

CDC events come in three flavors: row inserts, updates and deletes.  Inserts and updates are implemented in a similar fashion in Iceberg. Both create new data files with the row values to be inserted. Query engines resolve updates by filtering rows using the PK column and consulting the Iceberg manifests to only return the most current representation of the row. Resolving deletes is a bit more complicated.

Iceberg supports two types of deletes, positional and equality.  Positional deletes require the writing engine to scan every current data file and identify the position at which a deleted row is found. This operation is very IO intensive and slow, and therefore unusable for CDC use cases. Equality deletes are far simpler to produce, since only a filter predicate is created, for example  order_id = 5, to mark which rows should be deleted. This information is fast to write and doesn’t require reading through lots of data files, making equality deletes ideal for CDC.

The problem with MoR performance is two-fold. First, CDC writers like Spark, Trino, Debezium and popular commercial CDC tools don’t currently support writing equality deletes. Flink, however, does. This makes it impossible for them to deliver fresh, up to the minute replication to Iceberg. Second, other than recent versions of Trino (starting from v.449) and Presto (starting from v.0.286), query engines don’t fully support reading equality deletes making MoR tables less optimal to query.

Upsolver solves these problems in two ways:

  1. Implementing equality deletes as a first-class citizen for encoding CDC deletes. This lets Upsolver write CDC streams to Iceberg with 1000’s of changes per minute. For query engines that support equality deletes, it allows them to query CDC tables with up to the minute freshness with similar performance they would expect from popular warehouses.
  2. Aggressively compacting equality deletes to merge changes into base data files. This minimizes the need for query engines with partial or suboptimal support for equality deletes to process these files. In addition, Upsolver is working on new interoperability features that will make the transition to equality deletes simpler…stay tuned.

Our recent benchmark demonstrates the query latency reduction when using an engine (Trino v.449) that properly implements equality deletes to query a MoR table. When querying the most active partition, the one being continuously updated, the MoR enhancements result in 15X boost in query performance or a 175% reduction in query runtime (199 sec vs. 13 sec).

3. Concurrent write conflicts

In CDC, the ingest engine must juggle the writing of new row values, delete files, compacted files and delete expired and orphaned files. Each of these may operate on different or same files and attempt to commit changes at the same time. With CDC in particular, conflicts due to concurrent writes are exacerbated because of the many and frequent writes, compactions and snapshot expirations required to maintain a CDC table. If not managed in concert, these lead to long write delays, failed write and optimization jobs and unpredictable query performance – and lots of annoyed users.

Spark, Flink, Trino and commercial engines allow you to manually execute or schedule table maintenance tasks like compaction and snapshot expiration. However it’s an ad-hoc process that is detached from the jobs writing CDC rows to the target table. This leads to constant write conflicts, long retry periods and eventual failure of ingestion and compaction jobs.

Write tasks implemented in Upsolver’s 0-ETL ingestion and Iceberg Live Tables are in sync with its table maintenance tasks like compaction, retention, expiration and cleanup. Careful coordination of these tasks, handled automatically by Upsolver guarantees freshness, performance and consistency of CDC tables in Iceberg.

The result is evident in the plot below, which compares the wall time for the same query, performed using Amazon Athena (v3), on Iceberg tables optimized by Tabular, the old gold standard, to those optimized by Upsolver, the new gold standard. In the previously mentioned benchmark, queries on the active partition are 25% faster for Iceberg tables that are continuously updated and optimized by Upsolver.

4. Snapshot amplification

Any change to an Iceberg table requires creating a new snapshot. Snapshots include several metadata files that represent the changes to the table since the previous snapshot. Query engines read the latest snapshot to get the most current view of the table. Previous snapshots help retain table history and make time travel possible in Iceberg.

In CDC, the rate at which new rows are added and existing ones are updated can be in the order of 1000’s per minute. To keep the target Iceberg tables in sync with the source production tables, the writer needs to create a snapshot every time a batch of rows is added or modified. 1000’s of changes per minute lead to 100’s of new snapshots being created every minute. This write-amplification leads to ingestion delay, suboptimal queries and exponentially larger tables with each subsequent snapshot, until snapshots are expired and associated data is deleted.

Apache Spark, Flink and commercial engines leave this problem for you to solve. They require you to configure the number of allowed concurrent commits, compaction strategy, and snapshot minimum age and size to expire. These properties are designed for slow changing tables with very few commits and snapshots and are very difficult to manually tune for fast changing CDC streams, not to mention across hundreds or thousands of tables as your lake grows.

Upsolver’s intelligent Iceberg writer is designed to work in these hostile environments. It eliminates the snapshot amplification challenge by:

  1. Aggressively expiring intermittent snapshots (those written during ingestion)
  2. Writing equality instead of positional delete files to reduce write latency
  3. Pruning manifest files to reduce I/O overhead during query planning
  4. Compressing metadata to reduce overall table metadata size

There are many ways to implement CDC to Iceberg…

Implementing CDC to Iceberg is a complex problem with lots of variables you need to consider, tune and tweak to produce tables your users will enjoy querying. Vendors will often claim that if they solved CDC to the warehouse, Iceberg is just as simple.  But it’s not. CDC to the warehouse is often implemented by these vendors by pushing down logic to the warehouse leveraging its ability to merge and transform tables to produce the desired outcome. In the Lakehouse, that functionality must be provided by your CDC tool, or you must implement it on your own using Spark or Trino or another solution.

Upsolver is one of its kind, a fully managed Lakehouse solution that delivers end to end CDC for both warehouses and Iceberg lakes. Tables produced are continuously optimized using the Adaptive Optimizer to make sure you save the most money on your storage and that your users get a consistent, fast and enjoyable experience with every lake query engine. Additionally, CDC replication is deployed on Amazon EC2 Spot instances running inside of your own VPC so you can both reduce compute costs and keep your data secure.

If you want to try out Upsolver you can do it for free by signing up. Or if you want to dive deeper into your own architecture to understand if there is a fit, schedule a call with us.

Published in: Blog , Data Lakes
Roy Hasson
Roy Hasson

Roy Hasson is the head of product @ Upsolver. Previously, Roy was a product manager for AWS Glue and AWS Lake Formation.

Keep up with the latest cloud best practices and industry trends

Get weekly insights from the technical experts at Upsolver.

Subscribe

Templates

All Templates

Explore our expert-made templates & start with the right one for you.