Explore our expert-made templates & start with the right one for you.
Transform Nested JSON to Parquet with SQLake
Learn different approaches for dealing with complex nested JSON, and how Upsolver SQLake can be used to write nested JSON to Parquet, simplifying data lake ingestion and table management.
In my twenty years of working with data, I’ve realized that as our data ecosystem grows, the types of data we collect and analyze evolve. Working with unpredictable data structures and formats, such as nested data, has made data engineering challenging. In this post, I compare common methods for working with nested data and show you how Upsolver SQLake makes the process easy and painless.
The challenge: the rise in semi-structured data adds complexity to analytics pipelines
Traditionally, data warehouses enabled you to work with well-structured data. This data was commonly collected from ERP systems, OLTP databases, or commercial data providers. The data then went through a crazy complicated and rigid ETL process to cleanse, model, and transform before finally being loaded into the data warehouse.
Over the past several years, the amount of semi-structured and nested data that companies and machines generate and collect has increased exponentially. Examples include location information, sensor readings, product usage, and tracking information. Meanwhile, data structures follow myriad industry standards – or in many cases, no standard at all.
How Data Warehouses Deal With Nested JSON
How do data warehouses handle this complexity? Let’s take a look at one of the most popular modern cloud data warehouses — Snowflake.
The following is a sample (and fake) nested JSON record:
{"customer":{"email":"Christina.Collinsl@yahoo.com","firstName":"Christina","lastName":"Collins","address":{"address1":"851 6th Ave.","address2":"Apt. 16","city":"Saint Louis","country":"USA","postCode":"63109","state":"MO"}}
Before you can flatten and model this kind of data you need to understand the source events and how they are structured. First, use the COPY command to load the raw data from S3 into Snowflake so you can inspect it. The following SQL statements load nested data from S3 to Snowflake using a VARIANT column (not shown: how to create a named stage, a step that is required before you execute the COPY command in Snowflake):
CREATE OR REPLACE "TEST"."PUBLIC"."raw_source"( SRC VARIANT ); COPY INTO "TEST"."PUBLIC"."raw_source" FROM @"TEST"."PUBLIC"."MY_EXT_STAGE" FILE_FORMAT = ( TYPE = "JSON" )
After you’ve loaded the raw data into Snowflake, you create a staging table where you store the flattened and modeled data:
CREATE OR REPLACE TABLE stg_orders_fd ( CUSTOMER_ADDRESS_ADDRESS1 VARCHAR(16777216), CUSTOMER_ADDRESS_ADDRESS2 VARCHAR(16777216), CUSTOMER_ADDRESS_CITY VARCHAR(16777216), CUSTOMER_ADDRESS_COUNTRY VARCHAR(16777216), CUSTOMER_ADDRESS_POSTCODE VARCHAR(16777216), CUSTOMER_ADDRESS_STATE VARCHAR(16777216), CUSTOMER_EMAIL VARCHAR(16777216), CUSTOMER_FIRSTNAME VARCHAR(16777216), CUSTOMER_LASTNAME VARCHAR(16777216) );
Next, transform data from the nested JSON structure into a tabular format using INSERT/SELECT
.
INSERT INTO "TEST"."PUBLIC"."STG_ORDERS" SELECT src:customer.address.address1::string as CUSTOMER_ADDRESS_ADDRESS1 src:customer.address.address2::string as CUSTOMER_ADDRESS_ADDRESS2 src:customer.address.city::string as CUSTOMER_ADDRESS_CITY, src:customer.address.country::string as CUSTOMER_ADDRESS_COUNTRY, src:customer.address.postCode::string as CUSTOMER_ADDRESS_POSTCODE, src:customer.address.state::string as CUSTOMER_ADDRESS_STATE, src:customer.email::string as CUSTOMER_EMAIL, src:customer.firstName::string as CUSTOMER_FIRSTNAME, src:customer.lastName::string as CUSTOMER_LASTNAME FROM "TEST"."PUBLIC"."raw_source" raw_source, LATERAL FLATTEN( input => raw_source.src:data, outer => true, path =>'items') as itemdata;
What are the benefits of this approach? The mapping approach shown above is similar to the way most data warehouses handle nested data. It ensures that data coming into the table is unchanged.
What are the drawbacks? If the raw data has additional values or a different data type from the pre-defined DDL, the insert job fails. Such rigidity can be problematic if the source data is unpredictable (changing schema) or poorly structured. It is also labor intensive to manually scan objects to ensure each object is mapped properly. If objects are mapped incorrectly, incorrect data ends up in the target columns.
Working with Nested Data in Spark/Hadoop
The Apache Hadoop and Apache Spark ecosystem take a very different approach. This approach is flexible; you can write the data to the storage layer quickly and efficiently, without many restrictions. You do not need to format the data right away; instead, you format it only when you’re ready to query it. This is commonly referred to as “schema on read,” as opposed to “schema on write” for a data warehouse.
To follow this approach, first load the JSON data into a Spark dataframe. Then explode the array datasets, which flattens all of the array elements. Then use a Spark command such as getItem
to retrieve individual attributes from the exploded data. Below is an example of how to transform nested data into a queryable format in Apache Spark.
import org.apache.spark.sql.functions._ // Step 1: Load Nested JSON data into Spark Dataframe val ordersDf = spark.read.format("json") .option("inferSchema", "true") .option("multiLine", "true") .load("/FileStore/tables/orders_sample_datasets.json") // Step 2: Explode var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets")) // Step 3: Fetch Each Order using getItem on explode column parseOrdersDf = parseOrdersDf.withColumn("customerId", $"orders".getItem("customerId")) .withColumn("orderId", $"orders".getItem("orderId")) .withColumn("orderDate", $"orders".getItem("orderDate")) .withColumn("orderDetails", $"orders".getItem("orderDetails")) .withColumn("shipmentDetails", $"orders".getItem("shipmentDetails")) // Step 4: Explode orderDetails column to flatten all the rows parseOrdersDf = parseOrdersDf.withColumn("orderDetails", explode($"orderDetails")) // Step 5: Fetch attributes from object and make it available in a column parseOrdersDf = parseOrdersDf.withColumn("productId", $"orderDetails".getItem("productId")) .withColumn("quantity", $"orderDetails".getItem("quantity")) .withColumn("sequence", $"orderDetails".getItem("sequence")) .withColumn("totalPrice", $"orderDetails".getItem("totalPrice")) .withColumn("city", $"shipmentDetails".getItem("city")) .withColumn("country", $"shipmentDetails".getItem("country")) .withColumn("postalcode", $"shipmentDetails".getItem("postalCode")) .withColumn("street", $"shipmentDetails".getItem("street")) .withColumn("state", $"shipmentDetails".getItem("state")) // Step 6: Select required columns from the dataframe val jsonParseOrdersDf = parseOrdersDf.select($"orderId" ,$"customerId" ,$"orderDate" ,$"productId" ,$"quantity" ,$"sequence" ,$"gross" ,$"net" ,$"tax" ,$"street" ,$"city" ,$"state" ,$"postalcode" ,$"country") display(jsonParseOrdersDf)
This approach offers flexibility but requires intimate knowledge of the source data to appropriately flatten and model it. In addition you must continuously maintain this code as the data and its schema evolve, such as when new fields are added or removed.
Working with Nested Data in SQLake
When we built Upsolver SQLake, we wanted to provide a solution that provides schema-on-read flexibility but avoids the management overhead of a Spark-based solution. We solved this by creating an enhanced COPY
command that:
- reads from any source (streaming or batch);
- automatically infers schema and detects schema evolution;
- partitions the data based on a single column (usually but not always date); and
- writes it in Apache Parquet format to a staging table in your data lake.
At this stage, the data is still raw and in nested form. To flatten and model the data, you run an INSERT
job. Let’s review how to accomplish this in SQLake running on AWS:
1. Create a COPY
job that reads from the source system – in this case an S3 prefix – and writes to a staging table in S3 backed by an AWS Glue Data Catalog table.
CREATE JOB load_orders_raw_data_from_s3 START_FROM = NOW AS COPY FROM S3 upsolver_s3_samples BUCKET = 'upsolver-samples' PREFIX = 'orders/' INTO default_glue_catalog.upsolver_samples.orders_raw_data;
The data begins to populate the staging table automatically; there is no need for Airflow or other orchestration tools or techniques. You can query the staging table to perform quality and reliability checks.
2. Create an INSERT job that reads from the staging table, transforms and models the data, and writes it to your choice of destinations. In this case we write the transformed data to the data lake.
CREATE JOB transform_orders_and_insert_into_athena START_FROM = NOW ADD_MISSING_COLUMNS = true AS INSERT INTO default_glue_catalog.upsolver_samples.orders_transformed_data MAP_COLUMNS_BY_NAME -- use the SELECT statement to choose columns from the source -- and implement your business logic transformations. SELECT * FROM default_glue_catalog.upsolver_samples.orders_raw_data;
Again, there is no need for external orchestration. SQLake automatically runs your job and ensures all dependencies are running correctly. This approach lets you manage the complicated data lake environment in a declarative way, without having to manually update jobs whenever the source data changes. The COPY
and SELECT *
statements in SQLake capture raw nested data changes automatically.
If you discover issues in the data, you can use SQLake’s ability to time travel – that is, reprocess data from a specified point in time – following these steps:
- Modify the SQL for your job to account for the required fixes.
- Set
START_FROM
to an earlier timestamp. - Replay the data.
This avoids disrupting consumers of the pipeline output (data analysts, for example) or other jobs.
In summary, Upsolver SQLake makes working with nested data easy by automating schema detection and evolution. It provides a simple 3-step process for building data pipelines – stage, transform/model, and load. This enforces data ownership – you can track who created and who modified jobs. It also encourages data re-use across the organization; a staging table can be shared by many downstream consumers and use cases, avoiding redundant work. And it ensures best performance and structure by standardizing best practices so your data lake is future-proof and high-performing.
Try SQLake for Free for 30 Days
SQLake is Upsolver’s newest offering. It lets you build and run reliable data pipelines on streaming and batch data via an all-SQL experience. Try it for free for 30 days. No credit card required.
Resources for Further Learning:
- Watch our benchmarking webinar on streaming data in Snowflake
- Read our take on Snowflake as a data lake
- Read the comparison of Databricks vs Snowflake
- Find out about alternatives for Apache Spark