Financial Reconciliation Pipeline | Spark Practical Scenarios
← All Scenarios

Multi-Source Financial Reconciliation Pipeline.

Validating internal ledger accuracy against external bank feeds with timezone and fuzzy-math handling.

A Fintech application records thousands of transactions daily in an internal Parquet ledger. Every morning, the partner bank sends a CSV statement. We must verify that every internal transaction appears in the bank statement and that the amounts match (within a 1-cent rounding tolerance).

Internal logs are in UTC, but the Bank CSV is in EST. We must normalize both to UTC before matching.

Internal Ledger (Parquet - UTC)
| txn_id | amount | timestamp (UTC) |
| TXN_01 | 100.05 | 2024-01-26 14:00:00 |
Bank Statement (CSV - EST)
| bank_ref | amt | date_est |
| TXN_01 | 100.04 | 2024-01-26 09:00:00 |
from pyspark.sql.functions import *

# Load Internal Data
internal_df = spark.read.parquet("s3a://ledger/daily/") \
    .select("txn_id", "amount", col("timestamp").alias("internal_ts"))

# Load Bank Data (Smaller, so we will broadcast it)
bank_df = spark.read.option("header", "true").csv("s3a://bank-feeds/statement.csv") \
    .withColumn("bank_ts_utc", to_utc_timestamp(col("date_est"), "EST")) \
    .withColumn("bank_amt", col("amt").cast("double")) \
    .select("bank_ref", "bank_amt", "bank_ts_utc")
    

The first audit step is finding transactions that exist in our system but were never acknowledged by the bank. We use a Left Anti Join for high-performance exclusion.

# Find transactions in internal_df NOT in bank_df
missing_from_bank = internal_df.join(
    bank_df, 
    internal_df.txn_id == bank_df.bank_ref, 
    "left_anti"
)

# Output: Rows that failed the first check
# | TXN_99 | 45.00 | 2024-01-26 10:00:00 |
    

Bank fees or rounding can cause 1-cent differences. We use abs() to allow a small variance and broadcast() to speed up the join.

# Join with Broadcast hint for optimization
reconciled_df = internal_df.join(
    broadcast(bank_df), 
    internal_df.txn_id == bank_df.bank_ref, 
    "inner"
)

# Apply fuzzy matching logic (Tolerance = 0.01)
comparison_df = reconciled_df.withColumn(
    "is_match", 
    abs(col("amount") - col("bank_amt")) <= 0.01
).withColumn(
    "variance", 
    round(col("amount") - col("bank_amt"), 2)
)
    

We split the output into a Discrepancy Log (all mismatches) and a Summary Report for stakeholders.

Output: Discrepancy Log (S3)
| txn_id | amount | bank_amt | variance |
| TXN_01 | 100.05 | 100.04 | 0.01 | (Pass: Rounding)
| TXN_05 | 50.00 | 40.00 | 10.00 | (Fail: Flag for Review)
# Filter for actual errors (Variance > 0.01)
discrepancies = comparison_df.filter(col("is_match") == False)

# Save Detailed Log
discrepancies.write.mode("overwrite").parquet("s3a://audit/discrepancies/date=2024-01-26/")

# Generate High-Level Summary
summary = comparison_df.select(
    count("*").alias("total_checked"),
    sum(when(col("is_match"), 1).otherwise(0)).alias("matched_count"),
    sum(when(col("is_match") == False, 1).otherwise(0)).alias("error_count"),
    sum("variance").alias("total_variance_amt")
)
    
Q: Why use Broadcast Join for the bank statement? Bank statements are typically much smaller (thousands of rows) compared to internal transaction logs (millions/billions). Broadcasting the small table avoids a "Shuffle," drastically reducing network overhead.
Q: How do you handle duplicate transaction IDs in the bank feed? Before joining, you should run a groupBy("bank_ref").count() check. If duplicates exist, the join will explode your record count. You must deduplicate or aggregate the bank feed first.
Q: What is the benefit of to_utc_timestamp()? It prevents "off-by-one-day" errors. Financial reporting is strictly date-bound; if you don't normalize timezones, a transaction at 11:00 PM EST might show up on the wrong day in UTC.