Complex Transformations | Spark Practical Scenarios
← All Scenarios

Transforming Complex & Semi-Structured Data.

Advanced techniques for parsing JSON, flattening arrays, and reshaping datasets with Pivot.

In Spark SQL, you can use the colon (:) syntax to extract fields from JSON strings without full parsing. For robust pipelines, use from_json combined with schema_of_json to convert strings into typed structs.

-- Simple extraction using Colon syntax
SELECT customer_id, profile:first_name, profile:address:country 
FROM customers;

-- Dynamic parsing into a Struct
CREATE OR REPLACE TEMP VIEW parsed_customers AS
SELECT customer_id, from_json(profile, schema_of_json('{"first_name":"Thomas","address":{"country":"France"}}')) AS profile_struct
FROM customers;

-- Flattening the struct into individual columns
SELECT customer_id, profile_struct.* FROM parsed_customers;
    

Moving beyond explode, we can use collect_set to aggregate values into arrays and flatten to merge nested arrays (arrays of arrays) into a single list.

-- Flattening a nested array of book IDs and keeping unique values
SELECT customer_id,
  array_distinct(flatten(collect_set(books.book_id))) AS unique_books_bought
FROM orders
GROUP BY customer_id;
    

Set operations allow you to compare two DataFrames based on entire rows. This is essential for delta-detection or checking data consistency between updates.

Operation Result
UNION Combines rows from both tables (removes duplicates).
INTERSECT Returns only rows that exist in both tables.
MINUS / EXCEPT Returns rows present in the first table but not the second.

The PIVOT clause is used to rotate data from a state of rows to columns, typically used in reporting to create feature matrices or summaries.

-- Aggregating quantities by book_id per customer
CREATE OR REPLACE TABLE transactions AS
SELECT * FROM (
  SELECT customer_id, book.book_id, book.quantity 
  FROM orders_enriched
) PIVOT (
  sum(quantity) FOR book_id in ('B01', 'B02', 'B03', 'B04', 'B05')
);
    
Q: When should you use schema_of_json() instead of manually defining a schema? Use schema_of_json when dealing with complex or deeply nested JSON where manual DDL is error-prone. However, for production, it is safer to use a static schema to prevent downstream failures if a single record has a malformed structure.
Q: What is the difference between collect_list and collect_set? collect_list aggregates all items into an array including duplicates. collect_set only keeps unique items, effectively acting like a 'DISTINCT' operation within the array.
Q: Why is PIVOT considered an expensive operation in Spark? PIVOT usually triggers a Wide Transformation (Shuffle) because Spark needs to aggregate data from all partitions to calculate the sum/count for each specific column being created.