Back to Blog

Designing Fault-Tolerant Streaming Systems: Lessons from Production

2024-04-2914 min read

Designing Fault-Tolerant Streaming Systems: Lessons from Production

Streaming systems fail in ways that batch systems never do. The failures are subtle, time-sensitive, and often invisible until they've already caused damage downstream. After running Kafka-based pipelines processing 10M+ events/day across several production systems, here's what I've actually learned.

The Problem with Streaming

Batch jobs fail loudly. A pipeline either finishes or it doesn't. You rerun it, fix the bug, move on.

Streaming systems fail silently. They can appear healthy while:

  • Dropping messages without acknowledgment
  • Producing duplicate records downstream
  • Processing events out of order
  • Falling hours behind without alerting anyone

The goal of fault-tolerant design is not to prevent failures — it's to make failures recoverable, observable, and non-destructive.

The Failure Taxonomy

Before designing for fault tolerance, you need to understand what actually fails in streaming systems.

1. Producer Failures

What fails:

  • Application crashes mid-batch
  • Network timeouts before acknowledgment
  • Serialization errors on malformed records

What happens:

Producer → [sends message] → Kafka Broker
         ← [no ack received] ←
Producer → [retries message] → Kafka Broker
         → DUPLICATE in topic

Real example:

# Dangerous: fire and forget
producer.send("events", value=payload)
 
# Safe: wait for acknowledgment
future = producer.send("events", value=payload)
try:
    record_metadata = future.get(timeout=10)
except KafkaError as e:
    # Handle, log, retry with backoff
    handle_producer_failure(e, payload)

2. Consumer Failures

What fails:

  • Consumer crashes before committing offset
  • Processing logic throws unhandled exception
  • Consumer group rebalance during processing

What happens:

Consumer reads offset 100–110
Consumer processes records
Consumer crashes before committing offset 110
Consumer restarts → reads from offset 100 again
→ Records 100–110 processed TWICE

3. Broker Failures

What fails:

  • Kafka broker goes down
  • Network partition between brokers
  • Disk full on broker node

What happens:

  • Leader election takes 30–60 seconds
  • Producers/consumers see connection errors
  • In-flight messages may be lost if replication is misconfigured

4. Consumer Lag Buildup

What fails:

  • Nothing crashes — but consumers fall behind
  • Usually caused by slow processing logic
  • Or sudden spike in producer throughput

What happens:

Producer rate:  10,000 events/sec
Consumer rate:   8,000 events/sec
Lag growth:      2,000 events/sec

After 1 hour:   7.2M events behind
After 1 day:   172M events behind

Core Fault-Tolerance Patterns

Pattern 1: Idempotent Consumers

The single most important pattern in streaming. If your consumer is idempotent, duplicates don't matter.

Non-idempotent (dangerous):

def process_event(event):
    db.execute("""
        INSERT INTO orders (id, amount, user_id)
        VALUES (%s, %s, %s)
    """, (event.id, event.amount, event.user_id))
    # Duplicate event = duplicate row

Idempotent (safe):

def process_event(event):
    db.execute("""
        INSERT INTO orders (id, amount, user_id)
        VALUES (%s, %s, %s)
        ON CONFLICT (id) DO NOTHING
    """, (event.id, event.amount, event.user_id))
    # Duplicate event = no-op

For aggregations, use event IDs in a dedup table:

def process_event(event):
    if redis.sismember("processed_events", event.id):
        return  # Already processed, skip
 
    with db.transaction():
        # Do the actual work
        update_user_balance(event.user_id, event.amount)
        # Mark as processed inside the same transaction
        db.execute("INSERT INTO processed_events VALUES (%s)", (event.id,))
 
    redis.sadd("processed_events", event.id)
    redis.expire("processed_events", 86400)  # 24hr TTL

Pattern 2: Offset Management Strategy

Never let Kafka auto-commit offsets. Always commit manually after successful processing.

Auto-commit (dangerous):

consumer = KafkaConsumer(
    "events",
    enable_auto_commit=True,       # Commits every 5 seconds
    auto_commit_interval_ms=5000   # Regardless of processing state
)
 
for message in consumer:
    result = process(message)  # If this throws, offset already committed
    save_to_db(result)         # Data lost if this fails

Manual commit (safe):

consumer = KafkaConsumer(
    "events",
    enable_auto_commit=False,
    group_id="my-consumer-group"
)
 
for message in consumer:
    try:
        result = process(message)
        save_to_db(result)
        consumer.commit()  # Only commit after successful processing
    except ProcessingError as e:
        log_error(e, message)
        send_to_dlq(message)  # Dead letter queue
        consumer.commit()     # Still commit — we handled it
    except DatabaseError as e:
        log_error(e, message)
        # Do NOT commit — let it retry on restart
        raise

Pattern 3: Dead Letter Queues

Not every message can be processed. Malformed data, schema mismatches, and logic errors will happen. Without a DLQ, these messages block your pipeline forever.

DLQ Architecture:

Main Topic → Consumer → Success → Downstream
                      → Failure (retry 1)
                               → Failure (retry 2)
                                          → Failure (retry 3)
                                                     → DLQ Topic

Implementation:

MAX_RETRIES = 3
 
def process_with_dlq(consumer, dlq_producer, message):
    retry_count = message.headers.get("retry-count", 0)
 
    try:
        result = process(message)
        save_to_db(result)
        consumer.commit()
 
    except RetryableError as e:
        if retry_count < MAX_RETRIES:
            # Republish with incremented retry count
            dlq_producer.send(
                "events-retry",
                value=message.value,
                headers={
                    "retry-count": retry_count + 1,
                    "original-topic": message.topic,
                    "error": str(e),
                    "timestamp": str(time.time())
                }
            )
        else:
            # Exhausted retries — send to DLQ
            dlq_producer.send(
                "events-dlq",
                value=message.value,
                headers={
                    "original-topic": message.topic,
                    "final-error": str(e),
                    "failed-at": str(time.time())
                }
            )
        consumer.commit()
 
    except PoisonPillError as e:
        # Unrecoverable — go straight to DLQ
        dlq_producer.send("events-dlq", value=message.value)
        consumer.commit()

Pattern 4: Exactly-Once Semantics

True exactly-once is hard. In Kafka, it requires idempotent producers + transactions + transactional consumers working together.

Kafka Transactions:

producer = KafkaProducer(
    transactional_id="my-transactional-producer",
    enable_idempotence=True,
    acks="all"
)
 
producer.init_transactions()
 
def process_and_forward(consumer, producer, messages):
    producer.begin_transaction()
    try:
        for message in messages:
            result = transform(message)
            producer.send("output-topic", value=result)
 
        # Commit consumer offsets as part of the transaction
        producer.send_offsets_to_transaction(
            {TopicPartition("input-topic", 0): OffsetAndMetadata(last_offset + 1, "")},
            consumer_group_id="my-group"
        )
        producer.commit_transaction()
 
    except Exception as e:
        producer.abort_transaction()
        raise

When you truly need exactly-once:

  • Financial transactions (payments, ledger updates)
  • Inventory systems (stock counts)
  • Any aggregation where duplicates cause incorrect totals

When at-least-once + idempotency is good enough:

  • Analytics events (a duplicate pageview is fine)
  • Log aggregation
  • Most notification systems

Pattern 5: Backpressure Handling

When consumers can't keep up with producers, you need backpressure — not unbounded queue growth.

Without backpressure:

Producer: 10k events/sec
Consumer:  8k events/sec
Result:   Kafka lag grows → OOM → crash → restart loop

With backpressure (rate limiting):

from ratelimit import limits, sleep_and_retry
 
CALLS_PER_SECOND = 8000
 
@sleep_and_retry
@limits(calls=CALLS_PER_SECOND, period=1)
def process_event(event):
    # Processing logic here
    pass

With backpressure (dynamic scaling):

# Kubernetes HPA based on Kafka consumer lag
# keda-scaledobject.yaml
 
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: consumer-scaledobject
spec:
  scaleTargetRef:
    name: my-consumer-deployment
  minReplicaCount: 2
  maxReplicaCount: 20
  triggers:
  - type: kafka
    metadata:
      bootstrapServers: kafka:9092
      consumerGroup: my-consumer-group
      topic: events
      lagThreshold: "1000"

Failure Recovery Patterns

Checkpointing

For stateful stream processing, checkpoints are your safety net.

Flink Checkpointing:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
// Enable checkpointing every 30 seconds
env.enableCheckpointing(30000);
 
// Set checkpoint mode
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
// Minimum time between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
 
// Checkpoint must complete within 60 seconds
env.getCheckpointConfig().setCheckpointTimeout(60000);
 
// Keep last 3 checkpoints on failure
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

Spark Structured Streaming Checkpointing:

stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://checkpoints/my-stream/") \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .start("s3://output/table/")
 
# On restart, Spark reads checkpoint and resumes from last committed offset
# No data loss, no duplicates (with Delta)

Reprocessing from Offset

When you need to replay history after a bug fix:

from kafka import KafkaConsumer, TopicPartition
 
def reprocess_from_offset(topic, partition, start_offset, end_offset):
    consumer = KafkaConsumer(
        bootstrap_servers=["kafka:9092"],
        auto_offset_reset="earliest",
        enable_auto_commit=False,
        group_id=None  # No group — independent offset control
    )
 
    tp = TopicPartition(topic, partition)
    consumer.assign([tp])
    consumer.seek(tp, start_offset)
 
    for message in consumer:
        if message.offset >= end_offset:
            break
        reprocess_event(message)
 
    consumer.close()

Circuit Breaker Pattern

When downstream systems are degraded, fail fast instead of backing up.

import pybreaker
 
db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,           # Open after 5 consecutive failures
    reset_timeout=60      # Try again after 60 seconds
)
 
@db_breaker
def save_to_db(event):
    db.execute("INSERT INTO events VALUES (%s)", (event,))
 
def process_event(event):
    try:
        result = transform(event)
        save_to_db(result)
    except pybreaker.CircuitBreakerError:
        # Circuit is open — downstream is down
        # Buffer to Redis or send to DLQ instead
        buffer_to_redis(result)
        log.warning("Circuit open: buffering event %s", event.id)

Observability: What You Must Monitor

A streaming system without observability is a ticking time bomb.

The 5 Metrics That Matter

1. Consumer Lag (most important)

# Alert if lag exceeds 5 minutes of normal throughput
lag_threshold = avg_events_per_second * 300
 
if consumer_lag > lag_threshold:
    alert("Consumer lag critical: {}".format(consumer_lag))

2. Processing Rate vs Ingestion Rate

# Prometheus metric
streaming_events_processed_total{topic="events", consumer_group="my-group"}
streaming_events_ingested_total{topic="events"}

# Alert when processing_rate < 0.95 * ingestion_rate for 5 minutes

3. DLQ Message Rate

# Any DLQ messages = something is broken
# Alert immediately on DLQ growth
if dlq_message_count > 0:
    alert("DLQ has {} unprocessed messages".format(dlq_message_count))

4. End-to-End Latency

# Embed event timestamp in message
event = {
    "id": str(uuid4()),
    "payload": data,
    "produced_at": time.time()  # Producer timestamp
}
 
# Measure on consumer side
def process_event(message):
    latency = time.time() - message.value["produced_at"]
    metrics.histogram("event_latency_seconds", latency)
 
    if latency > 60:  # More than 1 minute behind
        alert("High event latency: {}s".format(latency))

5. Error Rate by Error Type

# Classify errors for actionable alerts
ERROR_TYPES = {
    "deserialization": "schema_mismatch",
    "database": "downstream_failure",
    "timeout": "processing_slow",
    "unknown": "investigate_immediately"
}
 
def handle_error(e, message):
    error_type = classify_error(e)
    metrics.increment("processing_errors_total", tags={"type": error_type})
    log.error("Event processing failed", extra={
        "error_type": error_type,
        "topic": message.topic,
        "offset": message.offset,
        "error": str(e)
    })

Distributed Tracing for Streaming

Propagate trace IDs through your entire pipeline:

# Producer: inject trace context
from opentelemetry import trace
from opentelemetry.propagate import inject
 
tracer = trace.get_tracer(__name__)
 
def produce_event(data):
    headers = {}
    with tracer.start_as_current_span("produce_event") as span:
        inject(headers)  # Inject trace context into headers
        producer.send("events", value=data, headers=list(headers.items()))
 
# Consumer: extract and continue trace
from opentelemetry.propagate import extract
 
def process_event(message):
    context = extract(dict(message.headers))
    with tracer.start_as_current_span("process_event", context=context) as span:
        span.set_attribute("kafka.topic", message.topic)
        span.set_attribute("kafka.offset", message.offset)
        do_processing(message)

Real Production Failures and What Fixed Them

Failure 1: The Silent Data Loss

What happened:
A consumer was processing events and committing offsets. Everything looked healthy. Downstream dashboards showed data gaps. Investigation revealed the DB insert was silently failing due to a schema mismatch — but the exception was caught and swallowed by an overly broad except Exception: pass.

The fix:

# Before (dangerous)
try:
    save_to_db(event)
    consumer.commit()
except Exception:
    pass  # NEVER DO THIS
 
# After (safe)
try:
    save_to_db(event)
    consumer.commit()
except SchemaValidationError as e:
    send_to_dlq(event, error=e)
    consumer.commit()
    metrics.increment("schema_errors")
except DatabaseError as e:
    # Don't commit — let the message retry
    log.error("DB error, will retry: %s", e)
    raise

Failure 2: The Rebalance Storm

What happened:
A consumer group had 20 consumers. Processing was slow (8 seconds per message). Kafka's default max.poll.interval.ms is 300 seconds. Under load, some consumers hit this limit and were kicked out of the group — triggering rebalances — which caused more consumers to miss their poll interval — which triggered more rebalances.

The fix:

consumer = KafkaConsumer(
    "events",
    # Increase poll interval to match your slowest processing time
    max_poll_interval_ms=600000,    # 10 minutes
    # Reduce records per poll to process faster
    max_poll_records=10,            # Down from 500
    # Tune session timeout
    session_timeout_ms=45000,
    heartbeat_interval_ms=15000
)

Failure 3: The Thundering Herd on Restart

What happened:
A Kubernetes deployment restarted all 15 consumer pods simultaneously after a config change. All 15 consumers tried to reprocess 2 hours of backlog at maximum speed simultaneously, overloaded the downstream database, caused DB connection pool exhaustion, which caused consumers to fail, which caused Kubernetes to restart them again.

The fix:

# Add jitter to startup processing rate
import random
import time
 
def startup_delay():
    # Each pod waits a random 0–30 seconds before starting
    delay = random.uniform(0, 30)
    log.info("Startup delay: %.1f seconds", delay)
    time.sleep(delay)
 
# Also: limit initial processing rate on startup
STARTUP_RATE_LIMIT = 100  # events/sec (vs normal 1000)
WARMUP_PERIOD = 120       # seconds
 
startup_time = time.time()
 
def get_rate_limit():
    elapsed = time.time() - startup_time
    if elapsed < WARMUP_PERIOD:
        return STARTUP_RATE_LIMIT
    return NORMAL_RATE_LIMIT

Architecture Checklist

Before Going to Production

Producer side:

  • Idempotent producer enabled (enable.idempotence=true)
  • acks=all for critical topics
  • Retry logic with exponential backoff
  • Serialization errors caught and logged

Consumer side:

  • Manual offset commit (auto-commit disabled)
  • Idempotent processing logic
  • Dead letter queue configured
  • max.poll.interval.ms tuned to actual processing time
  • Graceful shutdown handling (commit on SIGTERM)

Kafka cluster:

  • Replication factor ≥ 3 for critical topics
  • min.insync.replicas=2
  • Retention set to at least 7 days for replay capability
  • Topic partition count sized for peak throughput

Observability:

  • Consumer lag monitored and alerted
  • DLQ size monitored and alerted
  • End-to-end latency tracked
  • Error rate by type tracked
  • Runbook written for each alert

Key Takeaways

  1. Idempotency first — build it in from day one, not as an afterthought
  2. Never auto-commit offsets — manual commit after successful processing is non-negotiable
  3. DLQs are not optional — every pipeline needs a place to park unprocessable messages
  4. Consumer lag is your heartbeat — if you monitor one thing, monitor this
  5. Silent failures are the worst failures — instrument everything, never swallow exceptions
  6. Design for replay — keep enough retention to reprocess from any point in the last 7 days
  7. Test failure modes explicitly — chaos engineering for streaming is worth the investment

Streaming systems reward careful design and punish shortcuts. The patterns above aren't theoretical — every one of them came from a production incident that cost someone a weekend.


Related: Delta Lake vs Iceberg | Kafka at Scale | Observability for Data Pipelines