Back to Blog
Data QualityObservabilityData EngineeringSparkDelta LakeMonitoring

Data Quality & Observability in Data Pipelines: What Most Engineers Miss

2024-05-1914 min read

Data Quality & Observability in Data Pipelines: What Most Engineers Miss

Most data pipelines fail silently. No exceptions, no alerts, no red dashboards — just wrong numbers quietly flowing into reports that executives make decisions from.

After debugging production pipelines where bad data lived undetected for weeks, I've learned that data quality and observability are not features you add after the pipeline works. They are the pipeline.

Here's what most engineers miss — and how to fix it.

The Silent Failure Problem

A data pipeline can be "running" in every technical sense while being completely broken:

  1. Schema drift - Upstream sends a new field name, your join silently produces nulls
  2. Volume anomalies - A source system goes quiet, your aggregates drop 40% undeticed
  3. Duplicates - An at-least-once producer retries, your revenue counts are inflated
  4. Late arrivals - Mobile events arrive hours late, daily metrics are understated all day
  5. Type coercion - A string "null" gets cast to 0, distorting your averages
  6. Partition skew - One partition has 10x data, your SLA breaks for that segment only

None of these throw exceptions. All of them corrupt business decisions.

The Four Pillars of Pipeline Observability

Before writing any code, understand what you're actually measuring:

PillarWhat It CatchesTooling
FreshnessData stopped flowingLag checks, heartbeat events
VolumeData dropped or duplicatedRow count comparisons
SchemaStructure changed upstreamSchema Registry, drift detection
DistributionValues shifted unexpectedlyStatistical profiling, z-scores

Most teams only instrument freshness. Volume, schema, and distribution checks are what separate mature pipelines from ones held together with hope.

Pillar 1: Freshness Monitoring

Consumer Lag as a First Signal

from kafka import KafkaAdminClient
from kafka.admin import OffsetSpec
import time
 
def get_consumer_lag(group_id: str, topic: str, num_partitions: int) -> dict:
    admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])
 
    # Current consumer position
    consumer_offsets = admin.list_consumer_group_offsets(group_id)
 
    # Latest available offsets
    topic_partitions = {
        (topic, p): OffsetSpec.latest()
        for p in range(num_partitions)
    }
    latest_offsets = admin.list_offsets(topic_partitions)
 
    lag_by_partition = {}
    total_lag = 0
 
    for (t, partition), offset_info in latest_offsets.items():
        key = (t, partition)
        current = consumer_offsets.get(key)
        if current:
            lag = offset_info.offset - current.offset
            lag_by_partition[partition] = lag
            total_lag += lag
 
    return {
        "total_lag": total_lag,
        "by_partition": lag_by_partition,
        "checked_at": time.time()
    }
 
# Alert thresholds
def check_freshness_slo(lag_result: dict):
    if lag_result["total_lag"] > 500_000:
        trigger_alert("CRITICAL: Consumer lag > 500K messages", severity="critical")
 
    if lag_result["total_lag"] > 100_000:
        trigger_alert("WARNING: Consumer lag > 100K messages", severity="warning")
 
    # Check partition skew
    lags = list(lag_result["by_partition"].values())
    if lags:
        max_lag = max(lags)
        avg_lag = sum(lags) / len(lags)
        if max_lag > avg_lag * 5:
            trigger_alert(f"Partition skew detected: max={max_lag}, avg={avg_lag:.0f}")

Table Freshness Checks

from pyspark.sql import SparkSession
from pyspark.sql.functions import max as spark_max, col
from datetime import datetime, timedelta
 
def check_table_freshness(table: str, max_age_minutes: int = 15) -> bool:
    spark = SparkSession.builder.getOrCreate()
 
    result = (
        spark.read.format("delta").load(table)
        .select(spark_max("ingested_at").alias("last_update"))
        .collect()[0]
    )
 
    last_update = result["last_update"]
    age_minutes = (datetime.utcnow() - last_update).seconds / 60
 
    if age_minutes > max_age_minutes:
        trigger_alert(
            f"Stale table: {table} last updated {age_minutes:.1f} mins ago "
            f"(SLO: {max_age_minutes} mins)",
            severity="critical"
        )
        return False
 
    return True
 
# Run checks per layer
freshness_checks = [
    ("s3://datalake/bronze/events",  2),   # Bronze: 2 min SLO
    ("s3://datalake/silver/events",  15),  # Silver: 15 min SLO
    ("s3://datalake/gold/dau",       60),  # Gold:   1 hour SLO
]
 
for table, max_age in freshness_checks:
    check_table_freshness(table, max_age)

Pillar 2: Volume Monitoring

Row Count Reconciliation

The most underused check in data engineering: compare row counts between layers. If Bronze has 1M rows and Silver has 800K, you have a 20% silent drop you need to explain.

from dataclasses import dataclass
from typing import Optional
 
@dataclass
class VolumeCheckResult:
    source_table: str
    target_table: str
    source_count: int
    target_count: int
    drop_rate_pct: float
    passed: bool
    reason: Optional[str] = None
 
def check_volume_reconciliation(
    source_table: str,
    target_table: str,
    partition_filter: str,
    max_drop_rate_pct: float = 1.0
) -> VolumeCheckResult:
 
    spark = SparkSession.builder.getOrCreate()
 
    source_count = (
        spark.read.format("delta").load(source_table)
        .filter(partition_filter)
        .count()
    )
 
    target_count = (
        spark.read.format("delta").load(target_table)
        .filter(partition_filter)
        .count()
    )
 
    drop_rate = ((source_count - target_count) / source_count) * 100
 
    passed = drop_rate <= max_drop_rate_pct
 
    if not passed:
        trigger_alert(
            f"Volume drop: {source_table}{target_table} | "
            f"source={source_count:,} target={target_count:,} "
            f"drop={drop_rate:.2f}% (max allowed: {max_drop_rate_pct}%)",
            severity="critical"
        )
 
    return VolumeCheckResult(
        source_table=source_table,
        target_table=target_table,
        source_count=source_count,
        target_count=target_count,
        drop_rate_pct=drop_rate,
        passed=passed
    )

Anomaly Detection on Volume

Static thresholds break on weekends, holidays, and growth curves. Use statistical anomaly detection instead:

import numpy as np
from sklearn.ensemble import IsolationForest
 
def detect_volume_anomaly(
    daily_counts: list[int],
    today_count: int,
    contamination: float = 0.05
) -> dict:
    """
    Uses Isolation Forest to detect if today's volume
    is anomalous relative to historical pattern.
    """
    X = np.array(daily_counts).reshape(-1, 1)
 
    model = IsolationForest(
        contamination=contamination,  # Expect 5% anomaly rate
        random_state=42
    )
    model.fit(X)
 
    prediction = model.predict([[today_count]])[0]
    score = model.score_samples([[today_count]])[0]
 
    is_anomaly = prediction == -1
 
    if is_anomaly:
        p25 = np.percentile(daily_counts, 25)
        p75 = np.percentile(daily_counts, 75)
        trigger_alert(
            f"Volume anomaly detected: today={today_count:,} "
            f"historical_range=[{p25:,.0f}, {p75:,.0f}] "
            f"anomaly_score={score:.3f}",
            severity="warning"
        )
 
    return {
        "is_anomaly": is_anomaly,
        "today_count": today_count,
        "anomaly_score": score,
        "historical_p50": np.median(daily_counts)
    }

Pillar 3: Schema Monitoring

Detecting Schema Drift

Schema drift is the silent killer of long-running pipelines. A producer renames a field, your downstream join on that field starts returning nulls — and you find out three weeks later when a stakeholder notices a trend reversal.

from pyspark.sql.types import StructType
import json
 
def detect_schema_drift(
    expected_schema: StructType,
    actual_df,
    table_name: str
) -> dict:
 
    expected_fields = {f.name: str(f.dataType) for f in expected_schema.fields}
    actual_fields   = {f.name: str(f.dataType) for f in actual_df.schema.fields}
 
    # Fields removed upstream
    missing_fields = set(expected_fields.keys()) - set(actual_fields.keys())
 
    # New fields added upstream
    new_fields = set(actual_fields.keys()) - set(expected_fields.keys())
 
    # Type changed for existing fields
    type_changes = {
        field: {"expected": expected_fields[field], "actual": actual_fields[field]}
        for field in expected_fields.keys() & actual_fields.keys()
        if expected_fields[field] != actual_fields[field]
    }
 
    drift_detected = bool(missing_fields or type_changes)
 
    if missing_fields:
        trigger_alert(
            f"Schema drift [{table_name}]: missing fields {missing_fields}",
            severity="critical"
        )
 
    if type_changes:
        trigger_alert(
            f"Schema drift [{table_name}]: type changes {type_changes}",
            severity="critical"
        )
 
    if new_fields:
        trigger_alert(
            f"Schema drift [{table_name}]: new fields detected {new_fields}",
            severity="warning"  # New fields are often safe but worth knowing
        )
 
    return {
        "drift_detected": drift_detected,
        "missing_fields": list(missing_fields),
        "new_fields": list(new_fields),
        "type_changes": type_changes
    }

Tracking Schema Versions in Delta

def get_schema_history(table_path: str, num_versions: int = 10) -> list:
    """
    Use Delta time travel to compare schemas across versions.
    Useful for post-incident investigation.
    """
    spark = SparkSession.builder.getOrCreate()
    history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}`")
 
    versions = (
        history
        .select("version", "timestamp", "operation", "operationParameters")
        .orderBy("version", ascending=False)
        .limit(num_versions)
        .collect()
    )
 
    schema_snapshots = []
    for v in versions:
        snapshot_schema = (
            spark.read
            .format("delta")
            .option("versionAsOf", v["version"])
            .load(table_path)
            .schema
        )
        schema_snapshots.append({
            "version":   v["version"],
            "timestamp": v["timestamp"],
            "operation": v["operation"],
            "fields":    [f.name for f in snapshot_schema.fields]
        })
 
    return schema_snapshots

Pillar 4: Distribution Monitoring

Statistical Profiling

Volume and freshness tell you if data arrived. Distribution checks tell you what arrived — and whether it makes sense.

from pyspark.sql.functions import (
    col, count, countDistinct, avg, stddev,
    min as spark_min, max as spark_max,
    percentile_approx, isnan, isnull, when
)
 
def profile_column(df, column: str) -> dict:
    """
    Compute statistical profile for a single column.
    Run on every Silver write and compare to baseline.
    """
    stats = df.select(
        count(col(column)).alias("count"),
        countDistinct(col(column)).alias("cardinality"),
        avg(col(column)).alias("mean"),
        stddev(col(column)).alias("std"),
        spark_min(col(column)).alias("min"),
        spark_max(col(column)).alias("max"),
        percentile_approx(col(column), 0.25).alias("p25"),
        percentile_approx(col(column), 0.50).alias("p50"),
        percentile_approx(col(column), 0.75).alias("p75"),
        (count(when(isnull(col(column)) | isnan(col(column)), 1)) * 100.0
         / count("*")).alias("null_rate_pct")
    ).collect()[0].asDict()
 
    return stats
 
def check_distribution_drift(
    baseline: dict,
    current: dict,
    column: str,
    z_score_threshold: float = 3.0
):
    """
    Flag columns where mean has shifted more than z_score_threshold
    standard deviations from the baseline.
    """
    if baseline["std"] and baseline["std"] > 0:
        z_score = abs(current["mean"] - baseline["mean"]) / baseline["std"]
 
        if z_score > z_score_threshold:
            trigger_alert(
                f"Distribution drift [{column}]: "
                f"baseline_mean={baseline['mean']:.2f} "
                f"current_mean={current['mean']:.2f} "
                f"z_score={z_score:.2f}",
                severity="warning"
            )
 
    # Null rate spike
    null_delta = current["null_rate_pct"] - baseline["null_rate_pct"]
    if null_delta > 5.0:  # > 5% increase in nulls
        trigger_alert(
            f"Null rate spike [{column}]: "
            f"baseline={baseline['null_rate_pct']:.1f}% "
            f"current={current['null_rate_pct']:.1f}%",
            severity="critical"
        )

Building the DQ Check Framework

Tie all four pillars into a single executable framework with consistent severity levels and actions:

from dataclasses import dataclass
from enum import Enum
from typing import Callable, Any
 
class Severity(Enum):
    CRITICAL = "critical"   # Block pipeline, page on-call
    WARNING  = "warning"    # Alert Slack, continue
    INFO     = "info"       # Log only
 
class Action(Enum):
    BLOCK   = "block"       # Stop pipeline
    DLQ     = "dlq"         # Route bad rows to dead letter queue
    FLAG    = "flag"        # Mark rows with dq_passed=False
    ALERT   = "alert"       # Notify and continue
 
@dataclass
class DQCheck:
    name:      str
    check_fn:  Callable[..., bool]
    severity:  Severity
    action:    Action
    message:   str
 
class DQFramework:
    def __init__(self, pipeline_name: str):
        self.pipeline_name = pipeline_name
        self.checks: list[DQCheck] = []
        self.results: list[dict]   = []
 
    def add_check(self, check: DQCheck):
        self.checks.append(check)
        return self  # Chainable
 
    def run(self, **kwargs) -> bool:
        all_passed = True
 
        for check in self.checks:
            try:
                passed = check.check_fn(**kwargs)
            except Exception as e:
                passed = False
                check.message = f"{check.message} [exception: {e}]"
 
            self.results.append({
                "check":    check.name,
                "passed":   passed,
                "severity": check.severity.value,
                "action":   check.action.value,
            })
 
            if not passed:
                self._handle_failure(check)
                if check.action == Action.BLOCK:
                    all_passed = False
 
        return all_passed
 
    def _handle_failure(self, check: DQCheck):
        msg = f"[{self.pipeline_name}] DQ FAILED: {check.name}{check.message}"
 
        if check.severity == Severity.CRITICAL:
            send_pagerduty_alert(msg)
            send_slack_alert(msg, channel="#data-incidents")
        elif check.severity == Severity.WARNING:
            send_slack_alert(msg, channel="#data-warnings")
        else:
            log_to_datadog(msg)
 
# Usage
silver_dq = (
    DQFramework("silver-events")
    .add_check(DQCheck(
        name="no_null_event_ids",
        check_fn=lambda df: df.filter(col("event_id").isNull()).count() == 0,
        severity=Severity.CRITICAL,
        action=Action.BLOCK,
        message="event_id must never be null"
    ))
    .add_check(DQCheck(
        name="valid_platforms",
        check_fn=lambda df: (
            df.filter(~col("platform").isin("WEB", "IOS", "ANDROID", "BACKEND"))
            .count() / df.count() < 0.001
        ),
        severity=Severity.WARNING,
        action=Action.FLAG,
        message="< 0.1% rows should have invalid platform values"
    ))
    .add_check(DQCheck(
        name="no_future_events",
        check_fn=lambda df: (
            df.filter(col("occurred_at") > col("ingested_at")).count() == 0
        ),
        severity=Severity.WARNING,
        action=Action.FLAG,
        message="occurred_at should never be in the future"
    ))
)

Instrumenting Spark Streaming Pipelines

Query Listener for Real-Time Metrics

from pyspark.sql.streaming import StreamingQueryListener
import time
 
class PipelineMetricsListener(StreamingQueryListener):
 
    def onQueryStarted(self, event):
        log_metric("streaming.query.started", 1, tags={"query": event.name})
 
    def onQueryProgress(self, event):
        progress = event.progress
 
        # Rows processed per batch
        log_metric("streaming.rows_per_batch",
                   progress.numInputRows,
                   tags={"query": progress.name})
 
        # Processing rate
        log_metric("streaming.rows_per_second",
                   progress.processedRowsPerSecond,
                   tags={"query": progress.name})
 
        # Trigger execution time
        log_metric("streaming.trigger_execution_ms",
                   progress.durationMs.get("triggerExecution", 0),
                   tags={"query": progress.name})
 
        # Watermark lag (late arrival window)
        if progress.eventTime.get("watermark"):
            log_metric("streaming.watermark_lag_seconds",
                       progress.durationMs.get("latestOffset", 0) / 1000,
                       tags={"query": progress.name})
 
        # Alert on stalled batches
        if progress.numInputRows == 0:
            trigger_alert(
                f"Streaming query '{progress.name}' processed 0 rows — "
                f"possible upstream issue",
                severity="warning"
            )
 
    def onQueryTerminated(self, event):
        if event.exception:
            trigger_alert(
                f"Streaming query terminated with exception: {event.exception}",
                severity="critical"
            )
 
# Register listener
spark.streams.addListener(PipelineMetricsListener())

Batch-Level Audit Log

from pyspark.sql.functions import current_timestamp, lit, count, sum as spark_sum
 
def write_audit_log(
    df,
    layer: str,
    table: str,
    batch_id: int,
    audit_table: str
):
    """
    Write a row to the audit log for every batch processed.
    Enables lineage tracking and post-incident investigation.
    """
    stats = df.agg(
        count("*").alias("total_rows"),
        spark_sum(when(col("dq_passed") == False, 1).otherwise(0)).alias("failed_dq_rows"),
        spark_sum(when(col("is_late_arrival") == True, 1).otherwise(0)).alias("late_rows"),
    ).collect()[0]
 
    audit_row = spark.createDataFrame([{
        "layer":          layer,
        "table":          table,
        "batch_id":       batch_id,
        "total_rows":     stats["total_rows"],
        "failed_dq_rows": stats["failed_dq_rows"],
        "late_rows":      stats["late_rows"],
        "dq_pass_rate":   round(
            (stats["total_rows"] - stats["failed_dq_rows"])
            / stats["total_rows"] * 100, 2
        ),
        "written_at":     datetime.utcnow().isoformat(),
    }])
 
    (
        audit_row.write
        .format("delta")
        .mode("append")
        .save(audit_table)
    )

The SLO Dashboard

Define and publish SLOs before anything goes to production. If there's no SLO, there's no accountability:

LayerMetricSLOMeasurement Window
BronzeIngestion lag< 30sPer batch
BronzeRow drop rate< 0.01%Daily
SilverProcessing lag< 5 minPer batch
SilverDQ pass rate> 99.5%Daily
SilverDuplicate rate< 0.001%Daily
GoldRefresh latency< 1 hourPer table
GoldQuery p95< 10sPer hour
def publish_slo_report(date: str) -> dict:
    spark = SparkSession.builder.getOrCreate()
 
    audit = spark.read.format("delta").load(AUDIT_TABLE).filter(
        f"DATE(written_at) = '{date}'"
    )
 
    silver_stats = audit.filter("layer = 'silver'").agg(
        avg("dq_pass_rate").alias("avg_dq_pass_rate"),
        spark_max("total_rows").alias("max_batch_size"),
    ).collect()[0]
 
    report = {
        "date":             date,
        "silver_dq_rate":   silver_stats["avg_dq_pass_rate"],
        "slo_silver_dq":    silver_stats["avg_dq_pass_rate"] >= 99.5,
    }
 
    # Post to Slack or internal dashboard
    post_slo_report(report)
    return report

What Good Observability Looks Like

After implementing this across three production pipelines, the pattern that works:

At pipeline start: Freshness + volume check on upstream source before processing begins. If the source is stale or empty, fail fast rather than propagating nulls.

During processing: Row-level DQ flags (dq_passed) on every Silver row. Never drop bad rows silently — route them to DLQ and flag them. You want a full audit trail.

After each batch: Write to audit log. Compare volume to previous batch. Check for distribution drift on key numeric columns.

Nightly: Run full statistical profile against 30-day baseline. Schema comparison across Delta versions. SLO report to stakeholders.

Production Checklist

Freshness:

  • Consumer lag alert (< 2x normal)
  • Table staleness check per layer
  • Heartbeat event every N minutes from producers

Volume:

  • Bronze → Silver row count reconciliation
  • Statistical anomaly detection (not just static thresholds)
  • Partition-level skew detection

Schema:

  • Schema Registry for all Kafka topics
  • Drift detection on every batch
  • Delta time-travel for schema version history

Distribution:

  • Null rate monitoring per column
  • Z-score based mean drift detection
  • Cardinality checks on low-cardinality fields (enums)

Audit:

  • Batch-level audit log (rows in, rows out, DQ pass rate)
  • DLQ volume tracking with daily review
  • SLO report published to stakeholders

Common Pitfalls

Static alert thresholds - Break on weekends, holidays, and growth; use statistical baselines
Dropping bad rows silently - Route to DLQ instead; every dropped row is a clue
Only monitoring freshness - Volume drops and distribution shifts are more dangerous
DQ checks at Gold only - By then the damage is done; check at every layer
No audit log - When incidents happen, you'll have no lineage to investigate
Alerting everything at CRITICAL - Alert fatigue kills response; calibrate severity honestly

Key Takeaways

  1. Silent failures are worse than crashes - A crash is visible; a wrong number is invisible
  2. DQ at every layer - Bronze, Silver, and Gold each need their own checks
  3. Statistical baselines beat static thresholds - Your pipeline's "normal" changes over time
  4. Audit logs are your black box - Write one on every batch, thank yourself later
  5. Publish SLOs before going live - No SLO means no standard to hold the pipeline to
  6. DLQ is not a trash can - Review it daily; it's your most honest signal about upstream health

The difference between a mature data platform and a fragile one is not the transformation logic — it's whether you know, within minutes, when something silently breaks.


Related: Building Fault-Tolerant Kafka Pipelines | End-to-End Data Pipeline Case Study