End-to-End Data Pipeline Case Study: From Raw Events to Business Insights
Every data team eventually faces the same challenge: raw events are piling up, stakeholders want dashboards yesterday, and the pipeline you stitched together six months ago is held together with duct tape and prayers.
This is the story of how I designed and built a production-grade end-to-end data pipeline — from raw clickstream events to business-ready insights — processing 10M+ events per day with sub-hour latency, exactly-once semantics, and full observability.
The Problem Statement
Our platform generates events across three surfaces: web, mobile, and backend services. Product, marketing, and finance teams all need answers from this data — but they need different things:
- Product - Funnel conversion, feature adoption, session analysis
- Marketing - Campaign attribution, acquisition cost, cohort retention
- Finance - Revenue reconciliation, refund rates, MRR trends
The existing pipeline was a brittle ETL script running on a cron job. It broke silently, had no data quality checks, and couldn't handle late-arriving events. My job was to replace it entirely.
Architecture Overview
Before writing a single line of code, I mapped out the full data flow:
Mobile / Web / Backend
│
▼
Apache Kafka
(Event Ingestion)
│
▼
Spark Streaming
(Bronze → Silver)
│
▼
Delta Lake
(Medallion Layers)
│
▼
Databricks SQL
(Gold / Serving)
│
▼
BI Tools / APIs
(Business Insights)
The core principle: separate concerns at every layer. Ingestion doesn't care about transformation. Transformation doesn't care about serving. Each layer is independently scalable and testable.
Layer 1: Event Ingestion with Kafka
Topic Design
Most teams make the mistake of creating one Kafka topic per microservice. That creates tight coupling between producers and consumers. Instead, I modelled topics around event domains:
| Topic | Retention | Partitions | Description |
|---|---|---|---|
events.user.interaction | 7 days | 24 | Clicks, scrolls, hovers |
events.user.lifecycle | 30 days | 12 | Signup, login, churn |
events.commerce.transaction | 90 days | 24 | Orders, refunds, disputes |
events.system.error | 14 days | 6 | Exceptions, timeouts |
events.system.metric | 3 days | 6 | Latency, throughput |
Partitioning by user_id hash ensures all events for a given user land in the same partition — critical for session reconstruction downstream.
Producer Configuration
Reliability over raw throughput. Here's the producer config I settled on after load testing:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
# Durability
acks='all', # Wait for all ISR acknowledgements
retries=10,
max_in_flight_requests_per_connection=1, # Maintain order
# Idempotence
enable_idempotence=True, # Exactly-once at producer level
# Serialization
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8'),
# Batching for throughput
batch_size=65536, # 64KB
linger_ms=10,
compression_type='snappy' # ~60% size reduction
)enable_idempotence=True combined with acks=all gives you exactly-once producer semantics without Kafka transactions — simpler and sufficient for most workloads.
Schema Registry
Every event is validated against an Avro schema before it hits Kafka. No schema, no entry. This single decision eliminated an entire class of downstream failures.
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_str = """
{
"type": "record",
"name": "UserInteractionEvent",
"namespace": "com.platform.events",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "session_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "properties", "type": {"type": "map", "values": "string"}},
{"name": "occurred_at", "type": "long", "logicalType": "timestamp-millis"},
{"name": "received_at", "type": "long", "logicalType": "timestamp-millis"},
{"name": "platform", "type": {"type": "enum", "name": "Platform",
"symbols": ["WEB", "IOS", "ANDROID", "BACKEND"]}},
{"name": "schema_version", "type": "string", "default": "1.0"}
]
}
"""Two timestamps matter here: occurred_at (client-side event time) and received_at (server ingestion time). The delta between them is your event latency metric and helps you handle late arrivals correctly.
Layer 2: Bronze — Raw Landing Zone
Bronze is append-only. No transformations, no filtering, no business logic. Just land the data exactly as it arrived, with ingestion metadata attached.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit
spark = (
SparkSession.builder
.appName("bronze-ingestion")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.databricks.delta.optimizeWrite.enabled", "true")
.config("spark.databricks.delta.autoCompact.enabled", "true")
.getOrCreate()
)
def ingest_to_bronze(topic: str, bronze_path: str):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BROKERS)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 500_000)
.option("kafka.group.id", f"bronze-consumer-{topic}")
.load()
.select(
col("key").cast("string").alias("event_key"),
col("value").cast("string").alias("raw_payload"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp").alias("kafka_timestamp"),
current_timestamp().alias("ingested_at"),
lit("1.0").alias("pipeline_version"),
)
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{bronze_path}/_checkpoints/{topic}")
.option("mergeSchema", "true")
.partitionBy("topic")
.trigger(processingTime="30 seconds")
.start(bronze_path)
)Why Bronze Matters
I've seen teams skip Bronze and write directly to Silver. This always comes back to bite them when:
- A transformation bug corrupts data and you can't replay from source
- A new consumer needs historical raw data you never stored
- An audit requires the original event payload unchanged
Bronze is your source of truth. It's cheap storage. Never skip it.
Layer 3: Silver — Cleaned and Conformed
Silver is where the real engineering happens. Raw JSON payloads become typed columns, nulls get handled, PII gets masked, and late-arriving events get correctly placed using event time rather than processing time.
from pyspark.sql.functions import (
from_json, col, sha2, concat_ws
)
from pyspark.sql.types import (
StructType, StructField, StringType, LongType, MapType
)
EVENT_SCHEMA = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), True),
StructField("event_type", StringType(), False),
StructField("properties", MapType(StringType(), StringType()), True),
StructField("occurred_at", LongType(), False),
StructField("received_at", LongType(), False),
StructField("platform", StringType(), False),
StructField("schema_version", StringType(), True),
])
def transform_to_silver(bronze_df):
return (
bronze_df
# Parse raw JSON
.withColumn("event", from_json(col("raw_payload"), EVENT_SCHEMA))
.filter(col("event").isNotNull()) # Drop malformed → DLQ
# Flatten struct
.select("ingested_at", "kafka_timestamp", "event.*")
# Type casting
.withColumn("occurred_at", (col("occurred_at") / 1000).cast("timestamp"))
.withColumn("received_at", (col("received_at") / 1000).cast("timestamp"))
.withColumn("event_date", col("occurred_at").cast("date"))
# Derived fields
.withColumn("ingestion_lag_seconds",
col("received_at").cast("long") - col("occurred_at").cast("long")
)
.withColumn("is_late_arrival", col("ingestion_lag_seconds") > 300)
# PII masking — hash user_id, never store raw
.withColumn("user_id_hashed", sha2(col("user_id"), 256))
.drop("user_id")
# Dedup key
.withColumn("dedup_key", concat_ws("|", col("event_id"), col("event_type")))
# Data quality flag
.withColumn("dq_passed",
col("event_id").isNotNull() &
col("event_type").isNotNull() &
col("occurred_at").isNotNull() &
col("platform").isin("WEB", "IOS", "ANDROID", "BACKEND")
)
)Handling Late Arrivals
Events arriving late (mobile apps reconnecting after offline periods) are the bane of streaming pipelines. Watermarking handles this gracefully:
silver_stream = (
transform_to_silver(bronze_stream)
.withWatermark("occurred_at", "2 hours") # Accept events up to 2hrs late
.dropDuplicates(["event_id", "event_type"])
.writeStream
.format("delta")
.outputMode("append")
.partitionBy("event_date", "platform")
.option("checkpointLocation", f"{SILVER_PATH}/_checkpoints")
.trigger(processingTime="1 minute")
.start(SILVER_PATH)
)The 2 hours watermark means Spark holds state for 2 hours and correctly places late-arriving events into the right time window rather than the processing-time window.
Layer 4: Gold — Business-Ready Aggregates
Gold tables are purpose-built for specific business domains. Each one answers a concrete business question.
Daily Active Users (Product)
CREATE OR REPLACE TABLE gold.daily_active_users
USING DELTA PARTITIONED BY (event_date) AS
SELECT
event_date,
platform,
COUNT(DISTINCT user_id_hashed) AS dau,
COUNT(DISTINCT session_id) AS sessions,
COUNT(*) AS total_events,
ROUND(COUNT(*) / COUNT(DISTINCT session_id), 2) AS events_per_session,
SUM(CASE WHEN is_late_arrival THEN 1 ELSE 0 END) AS late_arrival_count
FROM silver.events
WHERE dq_passed = true
AND event_date >= CURRENT_DATE - INTERVAL 90 DAYS
GROUP BY event_date, platformRevenue Reconciliation (Finance)
CREATE OR REPLACE TABLE gold.revenue_daily
USING DELTA PARTITIONED BY (transaction_date) AS
SELECT
occurred_at::DATE AS transaction_date,
platform,
properties['currency'] AS currency,
COUNT(*) AS transaction_count,
SUM(CAST(properties['amount'] AS DOUBLE)) AS gross_revenue,
SUM(CASE WHEN event_type = 'refund_completed'
THEN CAST(properties['amount'] AS DOUBLE) ELSE 0 END) AS refunds,
SUM(CASE WHEN event_type = 'order_completed'
THEN CAST(properties['amount'] AS DOUBLE) ELSE 0 END)
- SUM(CASE WHEN event_type = 'refund_completed'
THEN CAST(properties['amount'] AS DOUBLE) ELSE 0 END) AS net_revenue
FROM silver.events
WHERE event_type IN ('order_completed', 'refund_completed')
AND dq_passed = true
GROUP BY 1, 2, 3Campaign Attribution (Marketing)
CREATE OR REPLACE TABLE gold.campaign_attribution USING DELTA AS
WITH first_touch AS (
SELECT
user_id_hashed,
MIN(occurred_at) AS first_seen_at,
FIRST_VALUE(properties['utm_source'])
OVER (PARTITION BY user_id_hashed ORDER BY occurred_at) AS utm_source,
FIRST_VALUE(properties['utm_campaign'])
OVER (PARTITION BY user_id_hashed ORDER BY occurred_at) AS utm_campaign
FROM silver.events
WHERE properties['utm_source'] IS NOT NULL
GROUP BY user_id_hashed
),
conversions AS (
SELECT user_id_hashed, MIN(occurred_at) AS converted_at
FROM silver.events
WHERE event_type = 'order_completed'
GROUP BY user_id_hashed
)
SELECT
f.utm_source,
f.utm_campaign,
COUNT(DISTINCT f.user_id_hashed) AS attributed_users,
COUNT(DISTINCT c.user_id_hashed) AS converted_users,
ROUND(
COUNT(DISTINCT c.user_id_hashed) * 100.0
/ NULLIF(COUNT(DISTINCT f.user_id_hashed), 0), 2
) AS conversion_rate_pct
FROM first_touch f
LEFT JOIN conversions c
ON f.user_id_hashed = c.user_id_hashed
AND c.converted_at > f.first_seen_at
GROUP BY 1, 2
ORDER BY converted_users DESCData Quality Framework
Data quality is not a nice-to-have. A silent bad number is worse than a visible pipeline failure. I implemented a four-tier DQ framework:
| Check Type | Example | Action on Failure |
|---|---|---|
| Completeness | event_id is never null | Route to DLQ, alert |
| Validity | platform in known enum | Flag row, continue |
| Freshness | Silver updated < 15 mins ago | Page on-call |
| Consistency | Row counts Bronze ≥ Silver | Alert, investigate |
from dataclasses import dataclass
from typing import Callable
from pyspark.sql import DataFrame
@dataclass
class DQCheck:
name: str
check_fn: Callable[[DataFrame], bool]
severity: str # "critical" | "warning"
action: str # "block" | "flag" | "alert"
def run_dq_checks(df: DataFrame, checks: list[DQCheck]) -> dict:
results = {}
for check in checks:
passed = check.check_fn(df)
results[check.name] = {
"passed": passed,
"severity": check.severity,
"action": check.action,
}
if not passed and check.severity == "critical":
raise DataQualityException(f"Critical DQ check failed: {check.name}")
return results
silver_checks = [
DQCheck(
name="no_null_event_ids",
check_fn=lambda df: df.filter(col("event_id").isNull()).count() == 0,
severity="critical",
action="block",
),
DQCheck(
name="valid_platforms",
check_fn=lambda df: df.filter(
~col("platform").isin("WEB", "IOS", "ANDROID", "BACKEND")
).count() / df.count() < 0.001, # < 0.1% invalid
severity="warning",
action="flag",
),
DQCheck(
name="ingestion_lag_acceptable",
check_fn=lambda df: df.filter(
col("ingestion_lag_seconds") > 3600
).count() / df.count() < 0.05, # < 5% over 1hr late
severity="warning",
action="alert",
),
]Observability and Monitoring
A pipeline without observability is a pipeline you're afraid to touch.
Pipeline Metrics
from prometheus_client import Counter, Histogram, Gauge
events_processed = Counter(
"pipeline_events_processed_total",
"Total events processed",
["topic", "layer", "status"]
)
processing_latency = Histogram(
"pipeline_processing_latency_seconds",
"End-to-end processing latency",
["layer"],
buckets=[1, 5, 10, 30, 60, 120, 300, 600]
)
lag_gauge = Gauge(
"kafka_consumer_lag",
"Kafka consumer group lag",
["topic", "partition"]
)Key SLOs I Track
| Metric | Target | Alert Threshold |
|---|---|---|
| Bronze ingestion lag | < 30s | > 2 min |
| Silver processing lag | < 5 min | > 15 min |
| Gold refresh latency | < 1 hour | > 2 hours |
| DQ pass rate | > 99.5% | < 98% |
| Dead letter queue depth | < 100 msgs | > 1000 msgs |
Dead Letter Queue
Every malformed event goes to a DLQ topic instead of silently dropped:
def write_to_dlq(bad_records_df, reason: str):
return (
bad_records_df
.withColumn("dlq_reason", lit(reason))
.withColumn("dlq_timestamp", current_timestamp())
.write
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BROKERS)
.option("topic", "events.dlq")
.save()
)DLQ messages get reviewed daily. Most are schema mismatches from mobile clients on old app versions. Some are legitimate bugs. The DLQ is your canary.
Performance Tuning
After the initial build, Silver lag was creeping up to 20 minutes at peak. Here's what fixed it.
Spark Configuration
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)Switching to RocksDB state store was the single biggest win — reduced memory pressure by 40% and eliminated GC pauses causing micro-batch delays.
Delta Lake Optimisation
-- Run nightly via Databricks Job
OPTIMIZE silver.events
ZORDER BY (event_date, platform, event_type);
-- Vacuum old versions (keep 7 days for time travel)
VACUUM silver.events RETAIN 168 HOURS;Z-ordering on (event_date, platform, event_type) reduced Gold query scan time from 45 seconds to 4 seconds on a 90-day window.
Results
After 3 months in production:
| Metric | Before | After |
|---|---|---|
| Pipeline reliability | ~82% (silent failures) | 99.7% |
| End-to-end latency | 4–6 hours (batch) | < 45 minutes |
| Data quality pass rate | Unknown | 99.6% |
| Gold query p95 latency | 90s | 6s |
| On-call incidents/month | 8–12 | 1–2 |
The finance team caught a double-billing bug within 48 hours of the Gold revenue table going live — something that had been silently happening for months in the old pipeline. That alone justified the entire rebuild.
Production Checklist
✅ Ingestion:
- Schema Registry for every topic
acks=all+enable_idempotence=True- DLQ for malformed events
- Partition by domain key (e.g.
user_id)
✅ Bronze:
- Append-only, never mutate
- Store Kafka metadata (offset, partition, timestamp)
- Checkpoint location per topic
✅ Silver:
- Watermark for late arrivals
- Deduplication on
event_id - PII masking before persistence
- Row-level DQ flags
✅ Gold:
- Purpose-built per business domain
- Z-ordered for query patterns
- Nightly OPTIMIZE + VACUUM
✅ Monitoring:
- Consumer lag alerts
- DQ pass rate tracking
- End-to-end latency SLOs
- DLQ volume thresholds
Common Pitfalls
❌ Skipping Bronze - No replay capability when bugs hit production
❌ No Schema Registry - Malformed events corrupt Silver silently
❌ Auto-committing offsets - Data loss on consumer crashes
❌ Processing-time windows - Late mobile events go missing
❌ No DQ checks - Silent bad numbers reach dashboards
❌ Single shuffle partition setting - Profile first, tune second
Key Takeaways
- Bronze is insurance - Cheap storage, invaluable when you need replay
- Schema contracts upfront - Every hour on Schema Registry saves ten on debugging
- Watermarks are tradeoffs - 2hr watermark = 2hr minimum lag; know your SLA
- DQ checks need teeth - Warnings get ignored; route bad data to DLQ
- RocksDB state store - First thing to try if Silver lag keeps climbing
- Measure before tuning - The bottleneck is never where you think it is
Building end-to-end pipelines that survive production requires designing for failure at every layer — not as an afterthought, but as the primary constraint.
Related: Building Fault-Tolerant Kafka Pipelines | Delta Lake Best Practices