Designing Fault-Tolerant Streaming Systems: Lessons from Production
Streaming systems fail in ways that batch systems never do. The failures are subtle, time-sensitive, and often invisible until they've already caused damage downstream. After running Kafka-based pipelines processing 10M+ events/day across several production systems, here's what I've actually learned.
The Problem with Streaming
Batch jobs fail loudly. A pipeline either finishes or it doesn't. You rerun it, fix the bug, move on.
Streaming systems fail silently. They can appear healthy while:
- Dropping messages without acknowledgment
- Producing duplicate records downstream
- Processing events out of order
- Falling hours behind without alerting anyone
The goal of fault-tolerant design is not to prevent failures — it's to make failures recoverable, observable, and non-destructive.
The Failure Taxonomy
Before designing for fault tolerance, you need to understand what actually fails in streaming systems.
1. Producer Failures
What fails:
- Application crashes mid-batch
- Network timeouts before acknowledgment
- Serialization errors on malformed records
What happens:
Producer → [sends message] → Kafka Broker
← [no ack received] ←
Producer → [retries message] → Kafka Broker
→ DUPLICATE in topic
Real example:
# Dangerous: fire and forget
producer.send("events", value=payload)
# Safe: wait for acknowledgment
future = producer.send("events", value=payload)
try:
record_metadata = future.get(timeout=10)
except KafkaError as e:
# Handle, log, retry with backoff
handle_producer_failure(e, payload)2. Consumer Failures
What fails:
- Consumer crashes before committing offset
- Processing logic throws unhandled exception
- Consumer group rebalance during processing
What happens:
Consumer reads offset 100–110
Consumer processes records
Consumer crashes before committing offset 110
Consumer restarts → reads from offset 100 again
→ Records 100–110 processed TWICE
3. Broker Failures
What fails:
- Kafka broker goes down
- Network partition between brokers
- Disk full on broker node
What happens:
- Leader election takes 30–60 seconds
- Producers/consumers see connection errors
- In-flight messages may be lost if replication is misconfigured
4. Consumer Lag Buildup
What fails:
- Nothing crashes — but consumers fall behind
- Usually caused by slow processing logic
- Or sudden spike in producer throughput
What happens:
Producer rate: 10,000 events/sec
Consumer rate: 8,000 events/sec
Lag growth: 2,000 events/sec
After 1 hour: 7.2M events behind
After 1 day: 172M events behind
Core Fault-Tolerance Patterns
Pattern 1: Idempotent Consumers
The single most important pattern in streaming. If your consumer is idempotent, duplicates don't matter.
Non-idempotent (dangerous):
def process_event(event):
db.execute("""
INSERT INTO orders (id, amount, user_id)
VALUES (%s, %s, %s)
""", (event.id, event.amount, event.user_id))
# Duplicate event = duplicate rowIdempotent (safe):
def process_event(event):
db.execute("""
INSERT INTO orders (id, amount, user_id)
VALUES (%s, %s, %s)
ON CONFLICT (id) DO NOTHING
""", (event.id, event.amount, event.user_id))
# Duplicate event = no-opFor aggregations, use event IDs in a dedup table:
def process_event(event):
if redis.sismember("processed_events", event.id):
return # Already processed, skip
with db.transaction():
# Do the actual work
update_user_balance(event.user_id, event.amount)
# Mark as processed inside the same transaction
db.execute("INSERT INTO processed_events VALUES (%s)", (event.id,))
redis.sadd("processed_events", event.id)
redis.expire("processed_events", 86400) # 24hr TTLPattern 2: Offset Management Strategy
Never let Kafka auto-commit offsets. Always commit manually after successful processing.
Auto-commit (dangerous):
consumer = KafkaConsumer(
"events",
enable_auto_commit=True, # Commits every 5 seconds
auto_commit_interval_ms=5000 # Regardless of processing state
)
for message in consumer:
result = process(message) # If this throws, offset already committed
save_to_db(result) # Data lost if this failsManual commit (safe):
consumer = KafkaConsumer(
"events",
enable_auto_commit=False,
group_id="my-consumer-group"
)
for message in consumer:
try:
result = process(message)
save_to_db(result)
consumer.commit() # Only commit after successful processing
except ProcessingError as e:
log_error(e, message)
send_to_dlq(message) # Dead letter queue
consumer.commit() # Still commit — we handled it
except DatabaseError as e:
log_error(e, message)
# Do NOT commit — let it retry on restart
raisePattern 3: Dead Letter Queues
Not every message can be processed. Malformed data, schema mismatches, and logic errors will happen. Without a DLQ, these messages block your pipeline forever.
DLQ Architecture:
Main Topic → Consumer → Success → Downstream
→ Failure (retry 1)
→ Failure (retry 2)
→ Failure (retry 3)
→ DLQ Topic
Implementation:
MAX_RETRIES = 3
def process_with_dlq(consumer, dlq_producer, message):
retry_count = message.headers.get("retry-count", 0)
try:
result = process(message)
save_to_db(result)
consumer.commit()
except RetryableError as e:
if retry_count < MAX_RETRIES:
# Republish with incremented retry count
dlq_producer.send(
"events-retry",
value=message.value,
headers={
"retry-count": retry_count + 1,
"original-topic": message.topic,
"error": str(e),
"timestamp": str(time.time())
}
)
else:
# Exhausted retries — send to DLQ
dlq_producer.send(
"events-dlq",
value=message.value,
headers={
"original-topic": message.topic,
"final-error": str(e),
"failed-at": str(time.time())
}
)
consumer.commit()
except PoisonPillError as e:
# Unrecoverable — go straight to DLQ
dlq_producer.send("events-dlq", value=message.value)
consumer.commit()Pattern 4: Exactly-Once Semantics
True exactly-once is hard. In Kafka, it requires idempotent producers + transactions + transactional consumers working together.
Kafka Transactions:
producer = KafkaProducer(
transactional_id="my-transactional-producer",
enable_idempotence=True,
acks="all"
)
producer.init_transactions()
def process_and_forward(consumer, producer, messages):
producer.begin_transaction()
try:
for message in messages:
result = transform(message)
producer.send("output-topic", value=result)
# Commit consumer offsets as part of the transaction
producer.send_offsets_to_transaction(
{TopicPartition("input-topic", 0): OffsetAndMetadata(last_offset + 1, "")},
consumer_group_id="my-group"
)
producer.commit_transaction()
except Exception as e:
producer.abort_transaction()
raiseWhen you truly need exactly-once:
- Financial transactions (payments, ledger updates)
- Inventory systems (stock counts)
- Any aggregation where duplicates cause incorrect totals
When at-least-once + idempotency is good enough:
- Analytics events (a duplicate pageview is fine)
- Log aggregation
- Most notification systems
Pattern 5: Backpressure Handling
When consumers can't keep up with producers, you need backpressure — not unbounded queue growth.
Without backpressure:
Producer: 10k events/sec
Consumer: 8k events/sec
Result: Kafka lag grows → OOM → crash → restart loop
With backpressure (rate limiting):
from ratelimit import limits, sleep_and_retry
CALLS_PER_SECOND = 8000
@sleep_and_retry
@limits(calls=CALLS_PER_SECOND, period=1)
def process_event(event):
# Processing logic here
passWith backpressure (dynamic scaling):
# Kubernetes HPA based on Kafka consumer lag
# keda-scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: consumer-scaledobject
spec:
scaleTargetRef:
name: my-consumer-deployment
minReplicaCount: 2
maxReplicaCount: 20
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: my-consumer-group
topic: events
lagThreshold: "1000"Failure Recovery Patterns
Checkpointing
For stateful stream processing, checkpoints are your safety net.
Flink Checkpointing:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 30 seconds
env.enableCheckpointing(30000);
// Set checkpoint mode
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Minimum time between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// Checkpoint must complete within 60 seconds
env.getCheckpointConfig().setCheckpointTimeout(60000);
// Keep last 3 checkpoints on failure
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);Spark Structured Streaming Checkpointing:
stream.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://checkpoints/my-stream/") \
.outputMode("append") \
.trigger(processingTime="10 seconds") \
.start("s3://output/table/")
# On restart, Spark reads checkpoint and resumes from last committed offset
# No data loss, no duplicates (with Delta)Reprocessing from Offset
When you need to replay history after a bug fix:
from kafka import KafkaConsumer, TopicPartition
def reprocess_from_offset(topic, partition, start_offset, end_offset):
consumer = KafkaConsumer(
bootstrap_servers=["kafka:9092"],
auto_offset_reset="earliest",
enable_auto_commit=False,
group_id=None # No group — independent offset control
)
tp = TopicPartition(topic, partition)
consumer.assign([tp])
consumer.seek(tp, start_offset)
for message in consumer:
if message.offset >= end_offset:
break
reprocess_event(message)
consumer.close()Circuit Breaker Pattern
When downstream systems are degraded, fail fast instead of backing up.
import pybreaker
db_breaker = pybreaker.CircuitBreaker(
fail_max=5, # Open after 5 consecutive failures
reset_timeout=60 # Try again after 60 seconds
)
@db_breaker
def save_to_db(event):
db.execute("INSERT INTO events VALUES (%s)", (event,))
def process_event(event):
try:
result = transform(event)
save_to_db(result)
except pybreaker.CircuitBreakerError:
# Circuit is open — downstream is down
# Buffer to Redis or send to DLQ instead
buffer_to_redis(result)
log.warning("Circuit open: buffering event %s", event.id)Observability: What You Must Monitor
A streaming system without observability is a ticking time bomb.
The 5 Metrics That Matter
1. Consumer Lag (most important)
# Alert if lag exceeds 5 minutes of normal throughput
lag_threshold = avg_events_per_second * 300
if consumer_lag > lag_threshold:
alert("Consumer lag critical: {}".format(consumer_lag))2. Processing Rate vs Ingestion Rate
# Prometheus metric
streaming_events_processed_total{topic="events", consumer_group="my-group"}
streaming_events_ingested_total{topic="events"}
# Alert when processing_rate < 0.95 * ingestion_rate for 5 minutes
3. DLQ Message Rate
# Any DLQ messages = something is broken
# Alert immediately on DLQ growth
if dlq_message_count > 0:
alert("DLQ has {} unprocessed messages".format(dlq_message_count))4. End-to-End Latency
# Embed event timestamp in message
event = {
"id": str(uuid4()),
"payload": data,
"produced_at": time.time() # Producer timestamp
}
# Measure on consumer side
def process_event(message):
latency = time.time() - message.value["produced_at"]
metrics.histogram("event_latency_seconds", latency)
if latency > 60: # More than 1 minute behind
alert("High event latency: {}s".format(latency))5. Error Rate by Error Type
# Classify errors for actionable alerts
ERROR_TYPES = {
"deserialization": "schema_mismatch",
"database": "downstream_failure",
"timeout": "processing_slow",
"unknown": "investigate_immediately"
}
def handle_error(e, message):
error_type = classify_error(e)
metrics.increment("processing_errors_total", tags={"type": error_type})
log.error("Event processing failed", extra={
"error_type": error_type,
"topic": message.topic,
"offset": message.offset,
"error": str(e)
})Distributed Tracing for Streaming
Propagate trace IDs through your entire pipeline:
# Producer: inject trace context
from opentelemetry import trace
from opentelemetry.propagate import inject
tracer = trace.get_tracer(__name__)
def produce_event(data):
headers = {}
with tracer.start_as_current_span("produce_event") as span:
inject(headers) # Inject trace context into headers
producer.send("events", value=data, headers=list(headers.items()))
# Consumer: extract and continue trace
from opentelemetry.propagate import extract
def process_event(message):
context = extract(dict(message.headers))
with tracer.start_as_current_span("process_event", context=context) as span:
span.set_attribute("kafka.topic", message.topic)
span.set_attribute("kafka.offset", message.offset)
do_processing(message)Real Production Failures and What Fixed Them
Failure 1: The Silent Data Loss
What happened:
A consumer was processing events and committing offsets. Everything looked healthy. Downstream dashboards showed data gaps. Investigation revealed the DB insert was silently failing due to a schema mismatch — but the exception was caught and swallowed by an overly broad except Exception: pass.
The fix:
# Before (dangerous)
try:
save_to_db(event)
consumer.commit()
except Exception:
pass # NEVER DO THIS
# After (safe)
try:
save_to_db(event)
consumer.commit()
except SchemaValidationError as e:
send_to_dlq(event, error=e)
consumer.commit()
metrics.increment("schema_errors")
except DatabaseError as e:
# Don't commit — let the message retry
log.error("DB error, will retry: %s", e)
raiseFailure 2: The Rebalance Storm
What happened:
A consumer group had 20 consumers. Processing was slow (8 seconds per message). Kafka's default max.poll.interval.ms is 300 seconds. Under load, some consumers hit this limit and were kicked out of the group — triggering rebalances — which caused more consumers to miss their poll interval — which triggered more rebalances.
The fix:
consumer = KafkaConsumer(
"events",
# Increase poll interval to match your slowest processing time
max_poll_interval_ms=600000, # 10 minutes
# Reduce records per poll to process faster
max_poll_records=10, # Down from 500
# Tune session timeout
session_timeout_ms=45000,
heartbeat_interval_ms=15000
)Failure 3: The Thundering Herd on Restart
What happened:
A Kubernetes deployment restarted all 15 consumer pods simultaneously after a config change. All 15 consumers tried to reprocess 2 hours of backlog at maximum speed simultaneously, overloaded the downstream database, caused DB connection pool exhaustion, which caused consumers to fail, which caused Kubernetes to restart them again.
The fix:
# Add jitter to startup processing rate
import random
import time
def startup_delay():
# Each pod waits a random 0–30 seconds before starting
delay = random.uniform(0, 30)
log.info("Startup delay: %.1f seconds", delay)
time.sleep(delay)
# Also: limit initial processing rate on startup
STARTUP_RATE_LIMIT = 100 # events/sec (vs normal 1000)
WARMUP_PERIOD = 120 # seconds
startup_time = time.time()
def get_rate_limit():
elapsed = time.time() - startup_time
if elapsed < WARMUP_PERIOD:
return STARTUP_RATE_LIMIT
return NORMAL_RATE_LIMITArchitecture Checklist
Before Going to Production
Producer side:
- Idempotent producer enabled (
enable.idempotence=true) -
acks=allfor critical topics - Retry logic with exponential backoff
- Serialization errors caught and logged
Consumer side:
- Manual offset commit (auto-commit disabled)
- Idempotent processing logic
- Dead letter queue configured
-
max.poll.interval.mstuned to actual processing time - Graceful shutdown handling (commit on SIGTERM)
Kafka cluster:
- Replication factor ≥ 3 for critical topics
-
min.insync.replicas=2 - Retention set to at least 7 days for replay capability
- Topic partition count sized for peak throughput
Observability:
- Consumer lag monitored and alerted
- DLQ size monitored and alerted
- End-to-end latency tracked
- Error rate by type tracked
- Runbook written for each alert
Key Takeaways
- Idempotency first — build it in from day one, not as an afterthought
- Never auto-commit offsets — manual commit after successful processing is non-negotiable
- DLQs are not optional — every pipeline needs a place to park unprocessable messages
- Consumer lag is your heartbeat — if you monitor one thing, monitor this
- Silent failures are the worst failures — instrument everything, never swallow exceptions
- Design for replay — keep enough retention to reprocess from any point in the last 7 days
- Test failure modes explicitly — chaos engineering for streaming is worth the investment
Streaming systems reward careful design and punish shortcuts. The patterns above aren't theoretical — every one of them came from a production incident that cost someone a weekend.
Related: Delta Lake vs Iceberg | Kafka at Scale | Observability for Data Pipelines