Building Fault-Tolerant Kafka Pipelines
Streaming pipelines fail. Networks partition, services crash, and bad data arrives. The question isn't whether failures will happen, but how your system handles them.
The Challenge
We built a real-time event processing pipeline handling 1M+ events per day. Early versions had three critical problems:
- Duplicate processing during consumer restarts
- Lost events when downstream services failed
- Pipeline stalls from poison messages
Here's how we fixed each issue to achieve 99.99% reliability.
Exactly-Once Semantics
The Problem
# At-least-once: Can process duplicates
while True:
messages = consumer.poll()
process(messages)
consumer.commit() # What if crash happens here?If the consumer crashes after processing but before committing, messages get reprocessed on restart.
The Solution: Idempotent Processing
Pattern 1: Idempotency Keys
from kafka import KafkaConsumer
import psycopg2
consumer = KafkaConsumer(
'events',
enable_auto_commit=False,
isolation_level='read_committed'
)
for message in consumer:
event_id = message.key.decode('utf-8')
# Insert with unique constraint on event_id
try:
cursor.execute(
"INSERT INTO events (id, data) VALUES (%s, %s)",
(event_id, message.value)
)
conn.commit()
consumer.commit()
except psycopg2.IntegrityError:
# Already processed - skip
conn.rollback()
consumer.commit()Pattern 2: Transactional Writes
# Spark Structured Streaming with exactly-once
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
query = df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "processed") \
.option("checkpointLocation", "s3://checkpoints/") \
.start()The checkpoint location tracks processed offsets atomically with output.
Dead Letter Queues
The Problem
A single malformed message can stall your entire pipeline.
The Solution: Graceful Degradation
def process_with_dlq(message):
try:
# Main processing logic
event = parse_event(message.value)
validate_schema(event)
store_in_database(event)
consumer.commit()
except ValidationError as e:
# Send to DLQ for manual inspection
producer.send(
'events-dlq',
key=message.key,
value=message.value,
headers=[
('error_type', 'ValidationError'),
('error_message', str(e)),
('original_topic', 'events'),
('timestamp', str(datetime.now()))
]
)
consumer.commit() # Don't retry validation errors
except TransientError as e:
# Transient errors: retry with backoff
retry_with_exponential_backoff(message, attempt_count)DLQ Monitoring:
# Alert on DLQ buildup
dlq_count = get_dlq_message_count()
if dlq_count > THRESHOLD:
send_alert("DLQ threshold exceeded", dlq_count)Handling Backpressure
When consumers can't keep up with producers:
# Monitor consumer lag
from kafka.admin import KafkaAdminClient
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
def get_consumer_lag():
consumer_groups = admin.list_consumer_groups()
lag_info = {}
for group in consumer_groups:
offsets = admin.list_consumer_group_offsets(group[0])
# Calculate lag: latest_offset - committed_offset
lag_info[group[0]] = calculate_lag(offsets)
return lag_info
# Auto-scale consumers based on lag
if lag > THRESHOLD:
scale_up_consumers()Schema Evolution
Avro with Schema Registry
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# Define schema with defaults for backward compatibility
value_schema = avro.loads('''
{
"type": "record",
"name": "Event",
"fields": [
{"name": "id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "data", "type": "string"},
{"name": "version", "type": "int", "default": 1}
]
}
''')
producer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=value_schema)Compatibility Modes:
- Backward: New consumers can read old data
- Forward: Old consumers can read new data
- Full: Both backward and forward
Circuit Breaker Pattern
Prevent cascade failures when downstream services are unavailable:
from pybreaker import CircuitBreaker
# Configure circuit breaker
db_breaker = CircuitBreaker(
fail_max=5, # Open after 5 failures
timeout_duration=60, # Stay open for 60 seconds
expected_exception=DatabaseError
)
@db_breaker
def write_to_database(event):
# This call is protected by circuit breaker
db.insert(event)
try:
write_to_database(event)
except CircuitBreakerError:
# Database is down - route to backup
write_to_s3_backup(event)
producer.send('events-retry', event)Monitoring & Observability
Key Metrics:
# Producer metrics
- messages_sent_rate
- error_rate
- request_latency_p99
# Consumer metrics
- consumer_lag
- messages_consumed_rate
- processing_time_p95
- dlq_message_count
# Broker metrics
- under_replicated_partitions
- active_controller_count
- offline_partitions_countAlerting Strategy:
alerts = {
'critical': {
'consumer_lag > 10000': 'page',
'offline_partitions > 0': 'page',
'dlq_growth_rate > 100/min': 'page'
},
'warning': {
'consumer_lag > 5000': 'slack',
'processing_latency_p99 > 5s': 'slack'
}
}Production Checklist
✅ Exactly-once semantics via idempotent processing or transactions
✅ Dead letter queue for poison messages
✅ Schema registry for evolution
✅ Circuit breakers for downstream dependencies
✅ Comprehensive monitoring with actionable alerts
✅ Backpressure handling with auto-scaling
✅ Replay capability from any offset
✅ Data encryption at rest and in transit
Real-World Results
After implementing these patterns:
- Availability: 99.9% → 99.99%
- Data loss incidents: 5/month → 0
- Mean time to recovery: 45 min → 5 min
- Duplicate processing rate: 2% → 0.001%
Key Takeaways
- Design for failure from day one
- Idempotency is non-negotiable for exactly-once
- DLQs prevent pipeline stalls
- Monitor consumer lag aggressively
- Test failure scenarios regularly
Build systems that fail gracefully.
Related: