Data Quality & Observability in Data Pipelines: What Most Engineers Miss
Most data pipelines fail silently. No exceptions, no alerts, no red dashboards — just wrong numbers quietly flowing into reports that executives make decisions from.
After debugging production pipelines where bad data lived undetected for weeks, I've learned that data quality and observability are not features you add after the pipeline works. They are the pipeline.
Here's what most engineers miss — and how to fix it.
The Silent Failure Problem
A data pipeline can be "running" in every technical sense while being completely broken:
- Schema drift - Upstream sends a new field name, your join silently produces nulls
- Volume anomalies - A source system goes quiet, your aggregates drop 40% undeticed
- Duplicates - An at-least-once producer retries, your revenue counts are inflated
- Late arrivals - Mobile events arrive hours late, daily metrics are understated all day
- Type coercion - A string
"null"gets cast to0, distorting your averages - Partition skew - One partition has 10x data, your SLA breaks for that segment only
None of these throw exceptions. All of them corrupt business decisions.
The Four Pillars of Pipeline Observability
Before writing any code, understand what you're actually measuring:
| Pillar | What It Catches | Tooling |
|---|---|---|
| Freshness | Data stopped flowing | Lag checks, heartbeat events |
| Volume | Data dropped or duplicated | Row count comparisons |
| Schema | Structure changed upstream | Schema Registry, drift detection |
| Distribution | Values shifted unexpectedly | Statistical profiling, z-scores |
Most teams only instrument freshness. Volume, schema, and distribution checks are what separate mature pipelines from ones held together with hope.
Pillar 1: Freshness Monitoring
Consumer Lag as a First Signal
from kafka import KafkaAdminClient
from kafka.admin import OffsetSpec
import time
def get_consumer_lag(group_id: str, topic: str, num_partitions: int) -> dict:
admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])
# Current consumer position
consumer_offsets = admin.list_consumer_group_offsets(group_id)
# Latest available offsets
topic_partitions = {
(topic, p): OffsetSpec.latest()
for p in range(num_partitions)
}
latest_offsets = admin.list_offsets(topic_partitions)
lag_by_partition = {}
total_lag = 0
for (t, partition), offset_info in latest_offsets.items():
key = (t, partition)
current = consumer_offsets.get(key)
if current:
lag = offset_info.offset - current.offset
lag_by_partition[partition] = lag
total_lag += lag
return {
"total_lag": total_lag,
"by_partition": lag_by_partition,
"checked_at": time.time()
}
# Alert thresholds
def check_freshness_slo(lag_result: dict):
if lag_result["total_lag"] > 500_000:
trigger_alert("CRITICAL: Consumer lag > 500K messages", severity="critical")
if lag_result["total_lag"] > 100_000:
trigger_alert("WARNING: Consumer lag > 100K messages", severity="warning")
# Check partition skew
lags = list(lag_result["by_partition"].values())
if lags:
max_lag = max(lags)
avg_lag = sum(lags) / len(lags)
if max_lag > avg_lag * 5:
trigger_alert(f"Partition skew detected: max={max_lag}, avg={avg_lag:.0f}")Table Freshness Checks
from pyspark.sql import SparkSession
from pyspark.sql.functions import max as spark_max, col
from datetime import datetime, timedelta
def check_table_freshness(table: str, max_age_minutes: int = 15) -> bool:
spark = SparkSession.builder.getOrCreate()
result = (
spark.read.format("delta").load(table)
.select(spark_max("ingested_at").alias("last_update"))
.collect()[0]
)
last_update = result["last_update"]
age_minutes = (datetime.utcnow() - last_update).seconds / 60
if age_minutes > max_age_minutes:
trigger_alert(
f"Stale table: {table} last updated {age_minutes:.1f} mins ago "
f"(SLO: {max_age_minutes} mins)",
severity="critical"
)
return False
return True
# Run checks per layer
freshness_checks = [
("s3://datalake/bronze/events", 2), # Bronze: 2 min SLO
("s3://datalake/silver/events", 15), # Silver: 15 min SLO
("s3://datalake/gold/dau", 60), # Gold: 1 hour SLO
]
for table, max_age in freshness_checks:
check_table_freshness(table, max_age)Pillar 2: Volume Monitoring
Row Count Reconciliation
The most underused check in data engineering: compare row counts between layers. If Bronze has 1M rows and Silver has 800K, you have a 20% silent drop you need to explain.
from dataclasses import dataclass
from typing import Optional
@dataclass
class VolumeCheckResult:
source_table: str
target_table: str
source_count: int
target_count: int
drop_rate_pct: float
passed: bool
reason: Optional[str] = None
def check_volume_reconciliation(
source_table: str,
target_table: str,
partition_filter: str,
max_drop_rate_pct: float = 1.0
) -> VolumeCheckResult:
spark = SparkSession.builder.getOrCreate()
source_count = (
spark.read.format("delta").load(source_table)
.filter(partition_filter)
.count()
)
target_count = (
spark.read.format("delta").load(target_table)
.filter(partition_filter)
.count()
)
drop_rate = ((source_count - target_count) / source_count) * 100
passed = drop_rate <= max_drop_rate_pct
if not passed:
trigger_alert(
f"Volume drop: {source_table} → {target_table} | "
f"source={source_count:,} target={target_count:,} "
f"drop={drop_rate:.2f}% (max allowed: {max_drop_rate_pct}%)",
severity="critical"
)
return VolumeCheckResult(
source_table=source_table,
target_table=target_table,
source_count=source_count,
target_count=target_count,
drop_rate_pct=drop_rate,
passed=passed
)Anomaly Detection on Volume
Static thresholds break on weekends, holidays, and growth curves. Use statistical anomaly detection instead:
import numpy as np
from sklearn.ensemble import IsolationForest
def detect_volume_anomaly(
daily_counts: list[int],
today_count: int,
contamination: float = 0.05
) -> dict:
"""
Uses Isolation Forest to detect if today's volume
is anomalous relative to historical pattern.
"""
X = np.array(daily_counts).reshape(-1, 1)
model = IsolationForest(
contamination=contamination, # Expect 5% anomaly rate
random_state=42
)
model.fit(X)
prediction = model.predict([[today_count]])[0]
score = model.score_samples([[today_count]])[0]
is_anomaly = prediction == -1
if is_anomaly:
p25 = np.percentile(daily_counts, 25)
p75 = np.percentile(daily_counts, 75)
trigger_alert(
f"Volume anomaly detected: today={today_count:,} "
f"historical_range=[{p25:,.0f}, {p75:,.0f}] "
f"anomaly_score={score:.3f}",
severity="warning"
)
return {
"is_anomaly": is_anomaly,
"today_count": today_count,
"anomaly_score": score,
"historical_p50": np.median(daily_counts)
}Pillar 3: Schema Monitoring
Detecting Schema Drift
Schema drift is the silent killer of long-running pipelines. A producer renames a field, your downstream join on that field starts returning nulls — and you find out three weeks later when a stakeholder notices a trend reversal.
from pyspark.sql.types import StructType
import json
def detect_schema_drift(
expected_schema: StructType,
actual_df,
table_name: str
) -> dict:
expected_fields = {f.name: str(f.dataType) for f in expected_schema.fields}
actual_fields = {f.name: str(f.dataType) for f in actual_df.schema.fields}
# Fields removed upstream
missing_fields = set(expected_fields.keys()) - set(actual_fields.keys())
# New fields added upstream
new_fields = set(actual_fields.keys()) - set(expected_fields.keys())
# Type changed for existing fields
type_changes = {
field: {"expected": expected_fields[field], "actual": actual_fields[field]}
for field in expected_fields.keys() & actual_fields.keys()
if expected_fields[field] != actual_fields[field]
}
drift_detected = bool(missing_fields or type_changes)
if missing_fields:
trigger_alert(
f"Schema drift [{table_name}]: missing fields {missing_fields}",
severity="critical"
)
if type_changes:
trigger_alert(
f"Schema drift [{table_name}]: type changes {type_changes}",
severity="critical"
)
if new_fields:
trigger_alert(
f"Schema drift [{table_name}]: new fields detected {new_fields}",
severity="warning" # New fields are often safe but worth knowing
)
return {
"drift_detected": drift_detected,
"missing_fields": list(missing_fields),
"new_fields": list(new_fields),
"type_changes": type_changes
}Tracking Schema Versions in Delta
def get_schema_history(table_path: str, num_versions: int = 10) -> list:
"""
Use Delta time travel to compare schemas across versions.
Useful for post-incident investigation.
"""
spark = SparkSession.builder.getOrCreate()
history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}`")
versions = (
history
.select("version", "timestamp", "operation", "operationParameters")
.orderBy("version", ascending=False)
.limit(num_versions)
.collect()
)
schema_snapshots = []
for v in versions:
snapshot_schema = (
spark.read
.format("delta")
.option("versionAsOf", v["version"])
.load(table_path)
.schema
)
schema_snapshots.append({
"version": v["version"],
"timestamp": v["timestamp"],
"operation": v["operation"],
"fields": [f.name for f in snapshot_schema.fields]
})
return schema_snapshotsPillar 4: Distribution Monitoring
Statistical Profiling
Volume and freshness tell you if data arrived. Distribution checks tell you what arrived — and whether it makes sense.
from pyspark.sql.functions import (
col, count, countDistinct, avg, stddev,
min as spark_min, max as spark_max,
percentile_approx, isnan, isnull, when
)
def profile_column(df, column: str) -> dict:
"""
Compute statistical profile for a single column.
Run on every Silver write and compare to baseline.
"""
stats = df.select(
count(col(column)).alias("count"),
countDistinct(col(column)).alias("cardinality"),
avg(col(column)).alias("mean"),
stddev(col(column)).alias("std"),
spark_min(col(column)).alias("min"),
spark_max(col(column)).alias("max"),
percentile_approx(col(column), 0.25).alias("p25"),
percentile_approx(col(column), 0.50).alias("p50"),
percentile_approx(col(column), 0.75).alias("p75"),
(count(when(isnull(col(column)) | isnan(col(column)), 1)) * 100.0
/ count("*")).alias("null_rate_pct")
).collect()[0].asDict()
return stats
def check_distribution_drift(
baseline: dict,
current: dict,
column: str,
z_score_threshold: float = 3.0
):
"""
Flag columns where mean has shifted more than z_score_threshold
standard deviations from the baseline.
"""
if baseline["std"] and baseline["std"] > 0:
z_score = abs(current["mean"] - baseline["mean"]) / baseline["std"]
if z_score > z_score_threshold:
trigger_alert(
f"Distribution drift [{column}]: "
f"baseline_mean={baseline['mean']:.2f} "
f"current_mean={current['mean']:.2f} "
f"z_score={z_score:.2f}",
severity="warning"
)
# Null rate spike
null_delta = current["null_rate_pct"] - baseline["null_rate_pct"]
if null_delta > 5.0: # > 5% increase in nulls
trigger_alert(
f"Null rate spike [{column}]: "
f"baseline={baseline['null_rate_pct']:.1f}% "
f"current={current['null_rate_pct']:.1f}%",
severity="critical"
)Building the DQ Check Framework
Tie all four pillars into a single executable framework with consistent severity levels and actions:
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Any
class Severity(Enum):
CRITICAL = "critical" # Block pipeline, page on-call
WARNING = "warning" # Alert Slack, continue
INFO = "info" # Log only
class Action(Enum):
BLOCK = "block" # Stop pipeline
DLQ = "dlq" # Route bad rows to dead letter queue
FLAG = "flag" # Mark rows with dq_passed=False
ALERT = "alert" # Notify and continue
@dataclass
class DQCheck:
name: str
check_fn: Callable[..., bool]
severity: Severity
action: Action
message: str
class DQFramework:
def __init__(self, pipeline_name: str):
self.pipeline_name = pipeline_name
self.checks: list[DQCheck] = []
self.results: list[dict] = []
def add_check(self, check: DQCheck):
self.checks.append(check)
return self # Chainable
def run(self, **kwargs) -> bool:
all_passed = True
for check in self.checks:
try:
passed = check.check_fn(**kwargs)
except Exception as e:
passed = False
check.message = f"{check.message} [exception: {e}]"
self.results.append({
"check": check.name,
"passed": passed,
"severity": check.severity.value,
"action": check.action.value,
})
if not passed:
self._handle_failure(check)
if check.action == Action.BLOCK:
all_passed = False
return all_passed
def _handle_failure(self, check: DQCheck):
msg = f"[{self.pipeline_name}] DQ FAILED: {check.name} — {check.message}"
if check.severity == Severity.CRITICAL:
send_pagerduty_alert(msg)
send_slack_alert(msg, channel="#data-incidents")
elif check.severity == Severity.WARNING:
send_slack_alert(msg, channel="#data-warnings")
else:
log_to_datadog(msg)
# Usage
silver_dq = (
DQFramework("silver-events")
.add_check(DQCheck(
name="no_null_event_ids",
check_fn=lambda df: df.filter(col("event_id").isNull()).count() == 0,
severity=Severity.CRITICAL,
action=Action.BLOCK,
message="event_id must never be null"
))
.add_check(DQCheck(
name="valid_platforms",
check_fn=lambda df: (
df.filter(~col("platform").isin("WEB", "IOS", "ANDROID", "BACKEND"))
.count() / df.count() < 0.001
),
severity=Severity.WARNING,
action=Action.FLAG,
message="< 0.1% rows should have invalid platform values"
))
.add_check(DQCheck(
name="no_future_events",
check_fn=lambda df: (
df.filter(col("occurred_at") > col("ingested_at")).count() == 0
),
severity=Severity.WARNING,
action=Action.FLAG,
message="occurred_at should never be in the future"
))
)Instrumenting Spark Streaming Pipelines
Query Listener for Real-Time Metrics
from pyspark.sql.streaming import StreamingQueryListener
import time
class PipelineMetricsListener(StreamingQueryListener):
def onQueryStarted(self, event):
log_metric("streaming.query.started", 1, tags={"query": event.name})
def onQueryProgress(self, event):
progress = event.progress
# Rows processed per batch
log_metric("streaming.rows_per_batch",
progress.numInputRows,
tags={"query": progress.name})
# Processing rate
log_metric("streaming.rows_per_second",
progress.processedRowsPerSecond,
tags={"query": progress.name})
# Trigger execution time
log_metric("streaming.trigger_execution_ms",
progress.durationMs.get("triggerExecution", 0),
tags={"query": progress.name})
# Watermark lag (late arrival window)
if progress.eventTime.get("watermark"):
log_metric("streaming.watermark_lag_seconds",
progress.durationMs.get("latestOffset", 0) / 1000,
tags={"query": progress.name})
# Alert on stalled batches
if progress.numInputRows == 0:
trigger_alert(
f"Streaming query '{progress.name}' processed 0 rows — "
f"possible upstream issue",
severity="warning"
)
def onQueryTerminated(self, event):
if event.exception:
trigger_alert(
f"Streaming query terminated with exception: {event.exception}",
severity="critical"
)
# Register listener
spark.streams.addListener(PipelineMetricsListener())Batch-Level Audit Log
from pyspark.sql.functions import current_timestamp, lit, count, sum as spark_sum
def write_audit_log(
df,
layer: str,
table: str,
batch_id: int,
audit_table: str
):
"""
Write a row to the audit log for every batch processed.
Enables lineage tracking and post-incident investigation.
"""
stats = df.agg(
count("*").alias("total_rows"),
spark_sum(when(col("dq_passed") == False, 1).otherwise(0)).alias("failed_dq_rows"),
spark_sum(when(col("is_late_arrival") == True, 1).otherwise(0)).alias("late_rows"),
).collect()[0]
audit_row = spark.createDataFrame([{
"layer": layer,
"table": table,
"batch_id": batch_id,
"total_rows": stats["total_rows"],
"failed_dq_rows": stats["failed_dq_rows"],
"late_rows": stats["late_rows"],
"dq_pass_rate": round(
(stats["total_rows"] - stats["failed_dq_rows"])
/ stats["total_rows"] * 100, 2
),
"written_at": datetime.utcnow().isoformat(),
}])
(
audit_row.write
.format("delta")
.mode("append")
.save(audit_table)
)The SLO Dashboard
Define and publish SLOs before anything goes to production. If there's no SLO, there's no accountability:
| Layer | Metric | SLO | Measurement Window |
|---|---|---|---|
| Bronze | Ingestion lag | < 30s | Per batch |
| Bronze | Row drop rate | < 0.01% | Daily |
| Silver | Processing lag | < 5 min | Per batch |
| Silver | DQ pass rate | > 99.5% | Daily |
| Silver | Duplicate rate | < 0.001% | Daily |
| Gold | Refresh latency | < 1 hour | Per table |
| Gold | Query p95 | < 10s | Per hour |
def publish_slo_report(date: str) -> dict:
spark = SparkSession.builder.getOrCreate()
audit = spark.read.format("delta").load(AUDIT_TABLE).filter(
f"DATE(written_at) = '{date}'"
)
silver_stats = audit.filter("layer = 'silver'").agg(
avg("dq_pass_rate").alias("avg_dq_pass_rate"),
spark_max("total_rows").alias("max_batch_size"),
).collect()[0]
report = {
"date": date,
"silver_dq_rate": silver_stats["avg_dq_pass_rate"],
"slo_silver_dq": silver_stats["avg_dq_pass_rate"] >= 99.5,
}
# Post to Slack or internal dashboard
post_slo_report(report)
return reportWhat Good Observability Looks Like
After implementing this across three production pipelines, the pattern that works:
At pipeline start: Freshness + volume check on upstream source before processing begins. If the source is stale or empty, fail fast rather than propagating nulls.
During processing: Row-level DQ flags (dq_passed) on every Silver row. Never drop bad rows silently — route them to DLQ and flag them. You want a full audit trail.
After each batch: Write to audit log. Compare volume to previous batch. Check for distribution drift on key numeric columns.
Nightly: Run full statistical profile against 30-day baseline. Schema comparison across Delta versions. SLO report to stakeholders.
Production Checklist
✅ Freshness:
- Consumer lag alert (< 2x normal)
- Table staleness check per layer
- Heartbeat event every N minutes from producers
✅ Volume:
- Bronze → Silver row count reconciliation
- Statistical anomaly detection (not just static thresholds)
- Partition-level skew detection
✅ Schema:
- Schema Registry for all Kafka topics
- Drift detection on every batch
- Delta time-travel for schema version history
✅ Distribution:
- Null rate monitoring per column
- Z-score based mean drift detection
- Cardinality checks on low-cardinality fields (enums)
✅ Audit:
- Batch-level audit log (rows in, rows out, DQ pass rate)
- DLQ volume tracking with daily review
- SLO report published to stakeholders
Common Pitfalls
❌ Static alert thresholds - Break on weekends, holidays, and growth; use statistical baselines
❌ Dropping bad rows silently - Route to DLQ instead; every dropped row is a clue
❌ Only monitoring freshness - Volume drops and distribution shifts are more dangerous
❌ DQ checks at Gold only - By then the damage is done; check at every layer
❌ No audit log - When incidents happen, you'll have no lineage to investigate
❌ Alerting everything at CRITICAL - Alert fatigue kills response; calibrate severity honestly
Key Takeaways
- Silent failures are worse than crashes - A crash is visible; a wrong number is invisible
- DQ at every layer - Bronze, Silver, and Gold each need their own checks
- Statistical baselines beat static thresholds - Your pipeline's "normal" changes over time
- Audit logs are your black box - Write one on every batch, thank yourself later
- Publish SLOs before going live - No SLO means no standard to hold the pipeline to
- DLQ is not a trash can - Review it daily; it's your most honest signal about upstream health
The difference between a mature data platform and a fragile one is not the transformation logic — it's whether you know, within minutes, when something silently breaks.
Related: Building Fault-Tolerant Kafka Pipelines | End-to-End Data Pipeline Case Study