Back to Blog
Data EngineeringApache KafkaSparkDelta LakeDatabricksMedallion Architecture

End-to-End Data Pipeline Case Study: From Raw Events to Business Insights

2024-05-1918 min read

End-to-End Data Pipeline Case Study: From Raw Events to Business Insights

Every data team eventually faces the same challenge: raw events are piling up, stakeholders want dashboards yesterday, and the pipeline you stitched together six months ago is held together with duct tape and prayers.

This is the story of how I designed and built a production-grade end-to-end data pipeline — from raw clickstream events to business-ready insights — processing 10M+ events per day with sub-hour latency, exactly-once semantics, and full observability.

The Problem Statement

Our platform generates events across three surfaces: web, mobile, and backend services. Product, marketing, and finance teams all need answers from this data — but they need different things:

  1. Product - Funnel conversion, feature adoption, session analysis
  2. Marketing - Campaign attribution, acquisition cost, cohort retention
  3. Finance - Revenue reconciliation, refund rates, MRR trends

The existing pipeline was a brittle ETL script running on a cron job. It broke silently, had no data quality checks, and couldn't handle late-arriving events. My job was to replace it entirely.

Architecture Overview

Before writing a single line of code, I mapped out the full data flow:

Mobile / Web / Backend
        │
        ▼
   Apache Kafka
  (Event Ingestion)
        │
        ▼
  Spark Streaming
 (Bronze → Silver)
        │
        ▼
   Delta Lake
 (Medallion Layers)
        │
        ▼
  Databricks SQL
  (Gold / Serving)
        │
        ▼
  BI Tools / APIs
 (Business Insights)

The core principle: separate concerns at every layer. Ingestion doesn't care about transformation. Transformation doesn't care about serving. Each layer is independently scalable and testable.

Layer 1: Event Ingestion with Kafka

Topic Design

Most teams make the mistake of creating one Kafka topic per microservice. That creates tight coupling between producers and consumers. Instead, I modelled topics around event domains:

TopicRetentionPartitionsDescription
events.user.interaction7 days24Clicks, scrolls, hovers
events.user.lifecycle30 days12Signup, login, churn
events.commerce.transaction90 days24Orders, refunds, disputes
events.system.error14 days6Exceptions, timeouts
events.system.metric3 days6Latency, throughput

Partitioning by user_id hash ensures all events for a given user land in the same partition — critical for session reconstruction downstream.

Producer Configuration

Reliability over raw throughput. Here's the producer config I settled on after load testing:

from kafka import KafkaProducer
import json
 
producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
 
    # Durability
    acks='all',                              # Wait for all ISR acknowledgements
    retries=10,
    max_in_flight_requests_per_connection=1, # Maintain order
 
    # Idempotence
    enable_idempotence=True,                 # Exactly-once at producer level
 
    # Serialization
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: str(k).encode('utf-8'),
 
    # Batching for throughput
    batch_size=65536,                        # 64KB
    linger_ms=10,
    compression_type='snappy'                # ~60% size reduction
)

enable_idempotence=True combined with acks=all gives you exactly-once producer semantics without Kafka transactions — simpler and sufficient for most workloads.

Schema Registry

Every event is validated against an Avro schema before it hits Kafka. No schema, no entry. This single decision eliminated an entire class of downstream failures.

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
 
schema_str = """
{
  "type": "record",
  "name": "UserInteractionEvent",
  "namespace": "com.platform.events",
  "fields": [
    {"name": "event_id",       "type": "string"},
    {"name": "user_id",        "type": "string"},
    {"name": "session_id",     "type": "string"},
    {"name": "event_type",     "type": "string"},
    {"name": "properties",     "type": {"type": "map", "values": "string"}},
    {"name": "occurred_at",    "type": "long", "logicalType": "timestamp-millis"},
    {"name": "received_at",    "type": "long", "logicalType": "timestamp-millis"},
    {"name": "platform",       "type": {"type": "enum", "name": "Platform",
                                "symbols": ["WEB", "IOS", "ANDROID", "BACKEND"]}},
    {"name": "schema_version", "type": "string", "default": "1.0"}
  ]
}
"""

Two timestamps matter here: occurred_at (client-side event time) and received_at (server ingestion time). The delta between them is your event latency metric and helps you handle late arrivals correctly.

Layer 2: Bronze — Raw Landing Zone

Bronze is append-only. No transformations, no filtering, no business logic. Just land the data exactly as it arrived, with ingestion metadata attached.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit
 
spark = (
    SparkSession.builder
    .appName("bronze-ingestion")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.databricks.delta.optimizeWrite.enabled", "true")
    .config("spark.databricks.delta.autoCompact.enabled", "true")
    .getOrCreate()
)
 
def ingest_to_bronze(topic: str, bronze_path: str):
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BROKERS)
        .option("subscribe", topic)
        .option("startingOffsets", "latest")
        .option("maxOffsetsPerTrigger", 500_000)
        .option("kafka.group.id", f"bronze-consumer-{topic}")
        .load()
        .select(
            col("key").cast("string").alias("event_key"),
            col("value").cast("string").alias("raw_payload"),
            col("topic"),
            col("partition"),
            col("offset"),
            col("timestamp").alias("kafka_timestamp"),
            current_timestamp().alias("ingested_at"),
            lit("1.0").alias("pipeline_version"),
        )
        .writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", f"{bronze_path}/_checkpoints/{topic}")
        .option("mergeSchema", "true")
        .partitionBy("topic")
        .trigger(processingTime="30 seconds")
        .start(bronze_path)
    )

Why Bronze Matters

I've seen teams skip Bronze and write directly to Silver. This always comes back to bite them when:

  • A transformation bug corrupts data and you can't replay from source
  • A new consumer needs historical raw data you never stored
  • An audit requires the original event payload unchanged

Bronze is your source of truth. It's cheap storage. Never skip it.

Layer 3: Silver — Cleaned and Conformed

Silver is where the real engineering happens. Raw JSON payloads become typed columns, nulls get handled, PII gets masked, and late-arriving events get correctly placed using event time rather than processing time.

from pyspark.sql.functions import (
    from_json, col, sha2, concat_ws
)
from pyspark.sql.types import (
    StructType, StructField, StringType, LongType, MapType
)
 
EVENT_SCHEMA = StructType([
    StructField("event_id",       StringType(), False),
    StructField("user_id",        StringType(), False),
    StructField("session_id",     StringType(), True),
    StructField("event_type",     StringType(), False),
    StructField("properties",     MapType(StringType(), StringType()), True),
    StructField("occurred_at",    LongType(),   False),
    StructField("received_at",    LongType(),   False),
    StructField("platform",       StringType(), False),
    StructField("schema_version", StringType(), True),
])
 
def transform_to_silver(bronze_df):
    return (
        bronze_df
        # Parse raw JSON
        .withColumn("event", from_json(col("raw_payload"), EVENT_SCHEMA))
        .filter(col("event").isNotNull())          # Drop malformed → DLQ
 
        # Flatten struct
        .select("ingested_at", "kafka_timestamp", "event.*")
 
        # Type casting
        .withColumn("occurred_at", (col("occurred_at") / 1000).cast("timestamp"))
        .withColumn("received_at", (col("received_at") / 1000).cast("timestamp"))
        .withColumn("event_date",  col("occurred_at").cast("date"))
 
        # Derived fields
        .withColumn("ingestion_lag_seconds",
            col("received_at").cast("long") - col("occurred_at").cast("long")
        )
        .withColumn("is_late_arrival", col("ingestion_lag_seconds") > 300)
 
        # PII masking — hash user_id, never store raw
        .withColumn("user_id_hashed", sha2(col("user_id"), 256))
        .drop("user_id")
 
        # Dedup key
        .withColumn("dedup_key", concat_ws("|", col("event_id"), col("event_type")))
 
        # Data quality flag
        .withColumn("dq_passed",
            col("event_id").isNotNull() &
            col("event_type").isNotNull() &
            col("occurred_at").isNotNull() &
            col("platform").isin("WEB", "IOS", "ANDROID", "BACKEND")
        )
    )

Handling Late Arrivals

Events arriving late (mobile apps reconnecting after offline periods) are the bane of streaming pipelines. Watermarking handles this gracefully:

silver_stream = (
    transform_to_silver(bronze_stream)
    .withWatermark("occurred_at", "2 hours")   # Accept events up to 2hrs late
    .dropDuplicates(["event_id", "event_type"])
    .writeStream
    .format("delta")
    .outputMode("append")
    .partitionBy("event_date", "platform")
    .option("checkpointLocation", f"{SILVER_PATH}/_checkpoints")
    .trigger(processingTime="1 minute")
    .start(SILVER_PATH)
)

The 2 hours watermark means Spark holds state for 2 hours and correctly places late-arriving events into the right time window rather than the processing-time window.

Layer 4: Gold — Business-Ready Aggregates

Gold tables are purpose-built for specific business domains. Each one answers a concrete business question.

Daily Active Users (Product)

CREATE OR REPLACE TABLE gold.daily_active_users
USING DELTA PARTITIONED BY (event_date) AS
SELECT
    event_date,
    platform,
    COUNT(DISTINCT user_id_hashed)                   AS dau,
    COUNT(DISTINCT session_id)                       AS sessions,
    COUNT(*)                                         AS total_events,
    ROUND(COUNT(*) / COUNT(DISTINCT session_id), 2)  AS events_per_session,
    SUM(CASE WHEN is_late_arrival THEN 1 ELSE 0 END) AS late_arrival_count
FROM silver.events
WHERE dq_passed = true
  AND event_date >= CURRENT_DATE - INTERVAL 90 DAYS
GROUP BY event_date, platform

Revenue Reconciliation (Finance)

CREATE OR REPLACE TABLE gold.revenue_daily
USING DELTA PARTITIONED BY (transaction_date) AS
SELECT
    occurred_at::DATE                                              AS transaction_date,
    platform,
    properties['currency']                                         AS currency,
    COUNT(*)                                                       AS transaction_count,
    SUM(CAST(properties['amount'] AS DOUBLE))                      AS gross_revenue,
    SUM(CASE WHEN event_type = 'refund_completed'
             THEN CAST(properties['amount'] AS DOUBLE) ELSE 0 END) AS refunds,
    SUM(CASE WHEN event_type = 'order_completed'
             THEN CAST(properties['amount'] AS DOUBLE) ELSE 0 END)
    - SUM(CASE WHEN event_type = 'refund_completed'
               THEN CAST(properties['amount'] AS DOUBLE) ELSE 0 END) AS net_revenue
FROM silver.events
WHERE event_type IN ('order_completed', 'refund_completed')
  AND dq_passed = true
GROUP BY 1, 2, 3

Campaign Attribution (Marketing)

CREATE OR REPLACE TABLE gold.campaign_attribution USING DELTA AS
WITH first_touch AS (
    SELECT
        user_id_hashed,
        MIN(occurred_at) AS first_seen_at,
        FIRST_VALUE(properties['utm_source'])
            OVER (PARTITION BY user_id_hashed ORDER BY occurred_at) AS utm_source,
        FIRST_VALUE(properties['utm_campaign'])
            OVER (PARTITION BY user_id_hashed ORDER BY occurred_at) AS utm_campaign
    FROM silver.events
    WHERE properties['utm_source'] IS NOT NULL
    GROUP BY user_id_hashed
),
conversions AS (
    SELECT user_id_hashed, MIN(occurred_at) AS converted_at
    FROM silver.events
    WHERE event_type = 'order_completed'
    GROUP BY user_id_hashed
)
SELECT
    f.utm_source,
    f.utm_campaign,
    COUNT(DISTINCT f.user_id_hashed) AS attributed_users,
    COUNT(DISTINCT c.user_id_hashed) AS converted_users,
    ROUND(
        COUNT(DISTINCT c.user_id_hashed) * 100.0
        / NULLIF(COUNT(DISTINCT f.user_id_hashed), 0), 2
    )                                AS conversion_rate_pct
FROM first_touch f
LEFT JOIN conversions c
    ON f.user_id_hashed = c.user_id_hashed
   AND c.converted_at > f.first_seen_at
GROUP BY 1, 2
ORDER BY converted_users DESC

Data Quality Framework

Data quality is not a nice-to-have. A silent bad number is worse than a visible pipeline failure. I implemented a four-tier DQ framework:

Check TypeExampleAction on Failure
Completenessevent_id is never nullRoute to DLQ, alert
Validityplatform in known enumFlag row, continue
FreshnessSilver updated < 15 mins agoPage on-call
ConsistencyRow counts Bronze ≥ SilverAlert, investigate
from dataclasses import dataclass
from typing import Callable
from pyspark.sql import DataFrame
 
@dataclass
class DQCheck:
    name: str
    check_fn: Callable[[DataFrame], bool]
    severity: str  # "critical" | "warning"
    action: str    # "block" | "flag" | "alert"
 
def run_dq_checks(df: DataFrame, checks: list[DQCheck]) -> dict:
    results = {}
    for check in checks:
        passed = check.check_fn(df)
        results[check.name] = {
            "passed":   passed,
            "severity": check.severity,
            "action":   check.action,
        }
        if not passed and check.severity == "critical":
            raise DataQualityException(f"Critical DQ check failed: {check.name}")
    return results
 
silver_checks = [
    DQCheck(
        name="no_null_event_ids",
        check_fn=lambda df: df.filter(col("event_id").isNull()).count() == 0,
        severity="critical",
        action="block",
    ),
    DQCheck(
        name="valid_platforms",
        check_fn=lambda df: df.filter(
            ~col("platform").isin("WEB", "IOS", "ANDROID", "BACKEND")
        ).count() / df.count() < 0.001,        # < 0.1% invalid
        severity="warning",
        action="flag",
    ),
    DQCheck(
        name="ingestion_lag_acceptable",
        check_fn=lambda df: df.filter(
            col("ingestion_lag_seconds") > 3600
        ).count() / df.count() < 0.05,         # < 5% over 1hr late
        severity="warning",
        action="alert",
    ),
]

Observability and Monitoring

A pipeline without observability is a pipeline you're afraid to touch.

Pipeline Metrics

from prometheus_client import Counter, Histogram, Gauge
 
events_processed = Counter(
    "pipeline_events_processed_total",
    "Total events processed",
    ["topic", "layer", "status"]
)
 
processing_latency = Histogram(
    "pipeline_processing_latency_seconds",
    "End-to-end processing latency",
    ["layer"],
    buckets=[1, 5, 10, 30, 60, 120, 300, 600]
)
 
lag_gauge = Gauge(
    "kafka_consumer_lag",
    "Kafka consumer group lag",
    ["topic", "partition"]
)

Key SLOs I Track

MetricTargetAlert Threshold
Bronze ingestion lag< 30s> 2 min
Silver processing lag< 5 min> 15 min
Gold refresh latency< 1 hour> 2 hours
DQ pass rate> 99.5%< 98%
Dead letter queue depth< 100 msgs> 1000 msgs

Dead Letter Queue

Every malformed event goes to a DLQ topic instead of silently dropped:

def write_to_dlq(bad_records_df, reason: str):
    return (
        bad_records_df
        .withColumn("dlq_reason",     lit(reason))
        .withColumn("dlq_timestamp",  current_timestamp())
        .write
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BROKERS)
        .option("topic", "events.dlq")
        .save()
    )

DLQ messages get reviewed daily. Most are schema mismatches from mobile clients on old app versions. Some are legitimate bugs. The DLQ is your canary.

Performance Tuning

After the initial build, Silver lag was creeping up to 20 minutes at peak. Here's what fixed it.

Spark Configuration

spark.conf.set("spark.sql.shuffle.partitions",                     "200")
spark.conf.set("spark.sql.adaptive.enabled",                       "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled",    "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled",     "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled",       "true")
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

Switching to RocksDB state store was the single biggest win — reduced memory pressure by 40% and eliminated GC pauses causing micro-batch delays.

Delta Lake Optimisation

-- Run nightly via Databricks Job
OPTIMIZE silver.events
ZORDER BY (event_date, platform, event_type);
 
-- Vacuum old versions (keep 7 days for time travel)
VACUUM silver.events RETAIN 168 HOURS;

Z-ordering on (event_date, platform, event_type) reduced Gold query scan time from 45 seconds to 4 seconds on a 90-day window.

Results

After 3 months in production:

MetricBeforeAfter
Pipeline reliability~82% (silent failures)99.7%
End-to-end latency4–6 hours (batch)< 45 minutes
Data quality pass rateUnknown99.6%
Gold query p95 latency90s6s
On-call incidents/month8–121–2

The finance team caught a double-billing bug within 48 hours of the Gold revenue table going live — something that had been silently happening for months in the old pipeline. That alone justified the entire rebuild.

Production Checklist

Ingestion:

  • Schema Registry for every topic
  • acks=all + enable_idempotence=True
  • DLQ for malformed events
  • Partition by domain key (e.g. user_id)

Bronze:

  • Append-only, never mutate
  • Store Kafka metadata (offset, partition, timestamp)
  • Checkpoint location per topic

Silver:

  • Watermark for late arrivals
  • Deduplication on event_id
  • PII masking before persistence
  • Row-level DQ flags

Gold:

  • Purpose-built per business domain
  • Z-ordered for query patterns
  • Nightly OPTIMIZE + VACUUM

Monitoring:

  • Consumer lag alerts
  • DQ pass rate tracking
  • End-to-end latency SLOs
  • DLQ volume thresholds

Common Pitfalls

Skipping Bronze - No replay capability when bugs hit production
No Schema Registry - Malformed events corrupt Silver silently
Auto-committing offsets - Data loss on consumer crashes
Processing-time windows - Late mobile events go missing
No DQ checks - Silent bad numbers reach dashboards
Single shuffle partition setting - Profile first, tune second

Key Takeaways

  1. Bronze is insurance - Cheap storage, invaluable when you need replay
  2. Schema contracts upfront - Every hour on Schema Registry saves ten on debugging
  3. Watermarks are tradeoffs - 2hr watermark = 2hr minimum lag; know your SLA
  4. DQ checks need teeth - Warnings get ignored; route bad data to DLQ
  5. RocksDB state store - First thing to try if Silver lag keeps climbing
  6. Measure before tuning - The bottleneck is never where you think it is

Building end-to-end pipelines that survive production requires designing for failure at every layer — not as an afterthought, but as the primary constraint.


Related: Building Fault-Tolerant Kafka Pipelines | Delta Lake Best Practices