Nested JSON to Redshift | Spark Practical Scenarios
← All Scenarios

E2E Pipeline: Processing Nested JSON & Deduplicating Arrays

A real-world scenario covering ingestion from S3, complex parsing, and multi-destination writing (Redshift & Logs).

In this workflow, we process raw Parquet data from an S3 landing zone. The data contains user activity where the core details are trapped inside a nested JSON string. This JSON includes arrays that have duplicate entries. We must clean this, extract the useful attributes, calculate data quality metrics, and sink the results into Amazon Redshift.

The raw Parquet file contains several root columns (ID, region, device_type, etc.) and a complex user_metadata column.

Source: raw_events.parquet
{ "id": 101, "event_time": "2024-05-10T14:30:00Z", "raw_meta": '{"user": "John Doe", "emails": ["j@test.com", "j@test.com"], "addr": "123 Spark St"}' }

First, we define the StructType schema for the JSON column to avoid expensive schema inference. We read the Parquet file and select our 15+ relevant columns.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Define nested schema for the JSON column
json_schema = StructType([
    StructField("user", StringType()),
    StructField("emails", ArrayType(StringType())),
    StructField("addr", StringType())
])

df_raw = spark.read.parquet("s3a://landing-zone/events/") \
    .select("id", "event_time", "region", "device_type", "os", 
            "ip_address", "session_id", "user_agent", "raw_meta")
    

We use from_json to parse the metadata. Crucially, we use array_distinct to remove duplicates from the email array before using explode to create one row per unique email.

df_parsed = df_raw.withColumn("meta", from_json(col("raw_meta"), json_schema))

# Clean arrays and Flatten
df_cleaned = df_parsed.withColumn("unique_emails", array_distinct(col("meta.emails"))) \
    .withColumn("email", explode(col("unique_emails"))) \
    .withColumn("user_name", col("meta.user")) \
    .withColumn("address", col("meta.addr")) \
    .withColumn("processing_ts", current_timestamp()) \
    .withColumn("event_date", to_date(col("event_time")))

# Final transformation: Email Domain Extraction
df_final = df_cleaned.withColumn("email_domain", regexp_extract(col("email"), "@(.+)$", 1)) \
    .drop("raw_meta", "meta", "unique_emails")
    

Notice how ID 101 now has distinct email rows and the timestamp is normalized.

Target: Redshift Table (dim_user_emails)
| id | email | email_domain | event_date | user_name |
| 101 | j@test.com | test.com | 2024-05-10 | John Doe |

We calculate data quality metrics (e.g., total records, unique users per region) and write them to S3 as logs, while the main data goes to Redshift.

# 1. Write to Redshift (Warehouse)
df_final.write \
  .format("jdbc") \
  .option("url", "jdbc:redshift://cluster:5439/dev") \
  .option("dbtable", "warehouse.user_activity") \
  .option("tempdir", "s3a://temp-bucket/redshift") \
  .save()

# 2. Generate and Log Metrics to S3
metrics_df = df_final.groupBy("region").agg(
    count("*").alias("total_events"),
    countDistinct("id").alias("unique_users")
)

metrics_df.write.json("s3a://logs-bucket/metrics/date=2024-05-10/")
    
Q: Why use array_distinct before explode? Exploding an array with duplicates creates redundant rows. Deduplicating first ensures data integrity and prevents downstream Cartesian product-like issues in your warehouse.
Q: How does StructType help with performance in this scenario? By providing a schema to from_json, Spark doesn't have to scan the entire dataset to guess data types (InferSchema), which significantly reduces execution time on large Parquet files.