Explore our expert-made templates & start with the right one for you.
Upserting and deleting rows in Snowflake and Redshift
Upsolver SQLake makes it easy to ingest data from a variety of sources such as Amazon S3, Apache Kafka, and Amazon Kinesis Data Streams into an Amazon S3 based data lake and deliver prepared data to a data warehouse such as Snowflake and Amazon Redshift. In the simplest cases, data can be appended to a target table, whether or not the row referenced already exists (i.e. no updates or deletes). This is a great option when a business needs to maintain the entire history of the data, instead of storing only the latest values. To learn more about append-only use cases, please refer to Transforming and Loading Data into a Data Lake, Redshift, or Snowflake.
Of course, you may be interested in only the most recent value for a given row, which is usually the case for ad hoc analytics and BI reporting. For example, the sales department may want to maintain the daily revenue derived from an order data stream produced by an e-commerce application. This means as order events arrive for the day, the aggregated revenue must be updated in the target table.
Furthermore, as customer information such as the address changes, the customer table must also be updated. This is simple to do with transactional databases, but is quite difficult to achieve in a data lake or a data warehouse. SQLake makes it easy to build data pipelines that automatically insert, update and delete records (whether they include frequently or slowly changing dimensions) without the user needing to learn complex syntax or build external processes to maintain the data.
In this tutorial, you will learn how to use SQLake to extract raw data from an S3 bucket, transform and model it, and then upsert records (update and/or delete) in a target table in the data warehouse.
Note that MySQL and PostgreSQL databases are common sources for data used in analytics. In SQLake, instead of using an S3 bucket as the source, you can configure a change data capture (CDC) connection that will stream changes from a source database table to a target table in your data lake or data warehouse. In either case (S3 bucket or database CDC) SQLake will insert, update or delete records per the changes in the source table so that the target table reflects the current state of the source database.
Run it!
- Create connections to your S3 source and Snowflake target
- Ingest raw data into a staging table in the data lake
- Prepare your Snowflake tables
- Model, merge and load results into Snowflake
- As UPSERT
- As UPSERT with DELETE
- As UPSERT for nested array data
Create connections to your S3 source and Snowflake target
Start by creating a connection to the source dataset in Amazon S3. You’ll be using sample data provided by Upsolver to get started quickly. This example uses S3 as the source, but you can connect to other sources as well.
CREATE S3 CONNECTION s3_conn AWS_ROLE = 'arn:aws:iam::949275490180:role/upsolver_samples_role' EXTERNAL_ID = 'SAMPLES' READ_ONLY = TRUE;
Next, you need to create a connection to your Snowflake data warehouse. The output of your pipeline will write results using this connection to the data warehouse.
CREATE SNOWFLAKE CONNECTION sf_conn CONNECTION_STRING='jdbc:snowflake://<account>.<region>.aws.snowflakecomputing.com/?db=<dbname>' USER_NAME = '<username>' PASSWORD = '<password>';
Similarly, you can create a connection to your Amazon Redshift cluster, as follows:
CREATE REDSHIFT CONNECTION rs_conn CONNECTION_STRING='jdbc:redshift://<hostname>.<region>.redshift.amazonaws.com:5439/<dbname>' USER_NAME = '<username>' PASSWORD = '<password>';
Once the connections have been created, the remaining steps are the same for Snowflake and Redshift.
Create a job to ingest raw data into a staging table
In this step, you will create a staging table in the data lake and an ingestion job that will copy data from the source, optimize it, and store it in the staging table.
The first step is to create the staging table.
CREATE TABLE default_glue_catalog.database_2777eb.orders_raw_staging PARTITIONED BY $event_date;
The second step is to create the ingestion job. This job will read files from S3 under upsolver-samples/orders/ and process all events from the BEGINNING, i.e. the first ever event stored in that directory.
CREATE SYNC JOB load_orders_raw START_FROM = BEGINNING DATE_PATTERN = 'yyyy/MM/dd/HH/mm' CONTENT_TYPE = JSON AS COPY FROM S3 s3_conn BUCKET = 'upsolver-samples' PREFIX = 'orders/' INTO default_glue_catalog.database_2777eb.orders_raw_staging;
Once the job has been created, give it a minute or two to process and load the data. You could then be able to query it directly from SQLake or your favorite data lake query engine like Amazon Athena.
SELECT * FROM default_glue_catalog.database_2777eb.orders_raw_staging LIMIT 10;
Here is an example of the results when querying it from SQLake
Prepare your Snowflake tables
The raw data above is in JSON format and contains customer information, order information, and order item information. Next, we want to load customer information into the Snowflake Customer table, order information in the Snowflake Orders_Data table and the order items into Order_Items table. The following three tables should be created in Snowflake prior to proceeding with the next step.
Sample SQL code to create the given tables in Snowflake.
CREATE TABLE DEMO.CUSTOMERS ( CUSTOMER_ID VARCHAR, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, ADDRESS_LINE1 VARCHAR, ADDRESS_LINE2 VARCHAR, CITY VARCHAR, STATE VARCHAR, ZIPCODE VARCHAR );
CREATE TABLE DEMO.ORDERS_DATA ( ORDER_ID VARCHAR, ORDER_DATE DATE, ORDER_TYPE VARCHAR, NET_TOTAL FLOAT, TAX_RATE FLOAT, CUSTOMER_ID VARCHAR );
CREATE TABLE DEMO.ORDER_ITEMS ( ITEM_ID INT, ORDER_ID VARCHAR, ITEM_NAME VARCHAR, ITEM_CATEGORY VARCHAR, ITEM_PRICE FLOAT, ITEM_QTY INT );
Once the tables are created in Snowflake, navigate back to the SQLake console and proceed with the next steps.
Model, merge, and load results into Snowflake
With SQLake you can model your data in the data lake and then load the results into target tables residing in Snowflake, Redshift or a data lake. Once loaded, the target tables are ready for users to query or further transform.
Note that while Snowflake and Redshift allow you to add PRIMARY KEY constraints on a table, those constraints are not actually enforced by SQLake. They are (essentially) just metadata and/or documentation that tools (and users) can use to understand the *intended* uniqueness of the data in the table. From SQLake, transformation (INSERT INTO) jobs will only append records in the Snowflake or Redshift tables.
For example, Order events include customer information within each of the order objects. This customer data will be inserted into the customer table using the INSERT INTO clause. This allows SQLake to maintain an append-only table that contains the entire history of changes. You can learn more about this use case in Transforming and Loading Data into a Data Lake, Redshift, or Snowflake guide.
To maintain only the most recent information in the target tables, you create a transformation ( MERGE INTO) job. Instead of appending all new events, the MERGE command updates and deletes rows based on a primary key.
To demonstrate how to use the MERGE command, let’s create jobs to transform and model the customer, order and order items datasets. You will flatten the nested customer data structure, unnest the order items array and rename some of the columns to fit your target data model. While modeling, you would want to include reference keys such as customer id into the order table and order id into the order items table so that analysts can easily join these tables. You will create these jobs as transformation jobs using the MERGE INTO statement.
- The first example uses the MERGE INTO command to upsert rows. If there is a matching customer record already present in the target table, it will be updated. Otherwise it will be inserted.
CREATE SYNC JOB load_customers_merge RUN_INTERVAL = 1 MINUTE START_FROM = BEGINNING AS MERGE INTO SNOWFLAKE "snow_conn"."DEMO"."CUSTOMERS_MERGE" AS target USING ( SELECT customer.email AS CUSTOMER_ID, customer.firstname AS FIRST_NAME, customer.lastname AS LAST_NAME, customer.address.address1 AS ADDRESS_LINE1, customer.address.address2 AS ADDRESS_LINE2, customer.address.city AS CITY, customer.address.state AS STATE, customer.address.postcode AS ZIPCODE FROM default_glue_catalog.database_2777eb.orders_raw_staging WHERE $event_time BETWEEN run_start_time() AND run_end_time() ) source ON target.CUSTOMER_ID = source.CUSTOMER_ID WHEN MATCHED THEN REPLACE WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;
Running the job above loads the target Snowflake table, CUSTOMERS_MERGE, with historical data. The following is a sample of the output.
Double check that there are no duplicates in the table. Looks good, so let’s continue.
To show an example to slowly changing dimensions, we had modified ZIPCODE in the source dataset by updating the values for some of the customers. The MERGE job recognizes the changes and automatically updates the target table in Snowflake. Query the table again and you will see those changes.
- Next let’s use the MERGE command together with the DELETE clause. If there is a matching order record already present in the target table, it will be updated or deleted. Otherwise it will be inserted. We determine when to delete a row by evaluating a specific condition and storing the Boolean result in the to_delete field. This field may already exist in the data if the events are CDC changes or processed upstream. The target table, as you can see below, includes NET_TOTAL values of 0 and let’s assume it has an incorrect TAX_RATE of .12 which should be 12. We will update this table to demonstrate how the MERGE job updates and deletes rows based on the conditions in the SQL.
CREATE SYNC JOB load_orders_data_merge RUN_INTERVAL = 1 MINUTE START_FROM = BEGINNING AS MERGE INTO SNOWFLAKE "snow_conn"."DEMO"."ORDERS_DATA" AS target USING ( SELECT orderid AS ORDER_ID, extract_timestamp(orderdate::STRING) AS ORDER_DATE, ordertype AS ORDER_TYPE, nettotal AS NET_TOTAL, taxrate_percent AS TAX_RATE, customer.email AS CUSTOMER_ID, to_delete AS to_delete FROM default_glue_catalog.database_2777eb.orders_raw_staging LET to_delete = IF_ELSE(nettotal <= 0, true, false), taxrate_percent = taxrate * 100 WHERE $event_time BETWEEN run_start_time() AND run_end_time() ) source ON target.ORDER_ID = source.ORDER_ID WHEN MATCHED AND to_delete THEN DELETE WHEN MATCHED THEN REPLACE WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT to_delete;
This is the table before the MERGE job.
This is the output after the MERGE job applied the changes. All orders have a positive NET_TOTAL and the tax rate is corrected.
- The third example uses MERGE with nested array data that we must flatten before we upsert – This uses the UNNEST operation to flatten the array, and then it upserts the results using the MERGE INTO command. Similar to the first example, if there is a matching order item record already present in the target table, it will be updated. Otherwise it will be inserted.
CREATE SYNC JOB load_items_unnest_merge RUN_INTERVAL = 1 MINUTE START_FROM = BEGINNING AS MERGE INTO SNOWFLAKE "snow_conn"."DEMO"."ORDER_ITEMS" AS target USING( UNNEST( SELECT data.items[].itemid AS ITEM_ID, orderid AS ORDER_ID, data.items[].name AS ITEM_NAME, data.items[].category AS ITEM_CATEGORY, data.items[].unitprice AS ITEM_PRICE, data.items[].quantity AS ITEM_QTY FROM default_glue_catalog.database_2777eb.orders_raw_staging WHERE $event_time BETWEEN run_start_time() AND run_end_time() ) ) source ON target.ITEM_ID = source.ITEM_ID WHEN MATCHED THEN REPLACE WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;
The original dataset looks like this.
After unnesting the item order array we can see the array fields as top level columns.
Summary
SQLake allows you to create a pipeline to copy data from source streams, databases and object stores into data lake tables for staging. Using SQL, you model and transform the data, storing the results in the data lake. To perform BI queries and ad-hoc analysis you load the transformed results into your Snowflake or Redshift data warehouses. SQLake enables you to write results as append-only or as fully merged tables. This allows you to keep transformation and modeling logic in a central place and reduce the cost of continuously transforming and deduplicating data in your data warehouse. SQLake automatically scales compute resources up and down to match the velocity and volume of your data so you don’t need to.
Get started today for free with sample data or bring your own.