Multi-Source Financial Reconciliation Pipeline.
Validating internal ledger accuracy against external bank feeds with timezone and fuzzy-math handling.
A Fintech application records thousands of transactions daily in an internal Parquet ledger. Every morning, the partner bank sends a CSV statement. We must verify that every internal transaction appears in the bank statement and that the amounts match (within a 1-cent rounding tolerance).
1. Input Data & Timezone NormalizationInternal logs are in UTC, but the Bank CSV is in EST. We must normalize both to UTC before matching.
| txn_id | amount | timestamp (UTC) |
| TXN_01 | 100.05 | 2024-01-26 14:00:00 |
| bank_ref | amt | date_est |
| TXN_01 | 100.04 | 2024-01-26 09:00:00 |
from pyspark.sql.functions import *
# Load Internal Data
internal_df = spark.read.parquet("s3a://ledger/daily/") \
.select("txn_id", "amount", col("timestamp").alias("internal_ts"))
# Load Bank Data (Smaller, so we will broadcast it)
bank_df = spark.read.option("header", "true").csv("s3a://bank-feeds/statement.csv") \
.withColumn("bank_ts_utc", to_utc_timestamp(col("date_est"), "EST")) \
.withColumn("bank_amt", col("amt").cast("double")) \
.select("bank_ref", "bank_amt", "bank_ts_utc")
2. Identifying Missing Transactions (Anti-Join)
The first audit step is finding transactions that exist in our system but were never acknowledged by the bank. We use a Left Anti Join for high-performance exclusion.
# Find transactions in internal_df NOT in bank_df
missing_from_bank = internal_df.join(
bank_df,
internal_df.txn_id == bank_df.bank_ref,
"left_anti"
)
# Output: Rows that failed the first check
# | TXN_99 | 45.00 | 2024-01-26 10:00:00 |
3. Fuzzy Matching & Variance Analysis
Bank fees or rounding can cause 1-cent differences. We use abs() to allow a small variance and broadcast() to speed up the join.
# Join with Broadcast hint for optimization
reconciled_df = internal_df.join(
broadcast(bank_df),
internal_df.txn_id == bank_df.bank_ref,
"inner"
)
# Apply fuzzy matching logic (Tolerance = 0.01)
comparison_df = reconciled_df.withColumn(
"is_match",
abs(col("amount") - col("bank_amt")) <= 0.01
).withColumn(
"variance",
round(col("amount") - col("bank_amt"), 2)
)
4. Output: Discrepancy Log & Summary
We split the output into a Discrepancy Log (all mismatches) and a Summary Report for stakeholders.
| txn_id | amount | bank_amt | variance |
| TXN_01 | 100.05 | 100.04 | 0.01 | (Pass: Rounding)
| TXN_05 | 50.00 | 40.00 | 10.00 | (Fail: Flag for Review)
# Filter for actual errors (Variance > 0.01)
discrepancies = comparison_df.filter(col("is_match") == False)
# Save Detailed Log
discrepancies.write.mode("overwrite").parquet("s3a://audit/discrepancies/date=2024-01-26/")
# Generate High-Level Summary
summary = comparison_df.select(
count("*").alias("total_checked"),
sum(when(col("is_match"), 1).otherwise(0)).alias("matched_count"),
sum(when(col("is_match") == False, 1).otherwise(0)).alias("error_count"),
sum("variance").alias("total_variance_amt")
)
Interview Q&A