Building Fault-Tolerant Kafka Pipelines
After running Kafka pipelines processing 10M+ events/day for 2 years, here's what makes them resilient in production.
The Failure Modes
Kafka pipelines fail in predictable ways:
- Consumer lag spikes - Can't keep up with producer rate
- Poison messages - Malformed data crashes consumers
- Network partitions - Brokers become unreachable
- Processing errors - Downstream systems fail
- Data skew - One partition overwhelms a consumer
Architecture for Resilience
1. Producer Reliability
Configuration:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
# Durability
acks='all', # Wait for all replicas
retries=10, # Retry on transient failures
max_in_flight_requests_per_connection=1, # Maintain order
# Idempotence
enable_idempotence=True, # Prevent duplicates
# Serialization
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8'),
# Batching for throughput
batch_size=16384,
linger_ms=10,
compression_type='snappy'
)Send with error handling:
def send_with_retry(topic, key, value, max_retries=3):
for attempt in range(max_retries):
try:
future = producer.send(
topic,
key=key,
value=value
)
# Block until sent
metadata = future.get(timeout=10)
return metadata
except Exception as e:
if attempt == max_retries - 1:
# Dead letter queue
send_to_dlq(topic, key, value, error=str(e))
raise
time.sleep(2 ** attempt) # Exponential backoff2. Consumer Resilience
Configuration:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'events',
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
group_id='events-processor',
# Offset management
enable_auto_commit=False, # Manual commit for reliability
auto_offset_reset='earliest', # Don't lose data
# Session management
session_timeout_ms=30000, # 30s before rebalance
heartbeat_interval_ms=10000, # 10s heartbeat
max_poll_interval_ms=300000, # 5 min max processing time
# Fetch settings
max_poll_records=500, # Batch size
fetch_min_bytes=1024, # Wait for data
fetch_max_wait_ms=500,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)Processing loop with error handling:
def process_messages():
while True:
try:
messages = consumer.poll(timeout_ms=1000, max_records=500)
for topic_partition, records in messages.items():
for message in records:
try:
process_message(message.value)
# Commit after successful processing
consumer.commit({
topic_partition: OffsetAndMetadata(
message.offset + 1,
None
)
})
except ProcessingError as e:
# Poison message - send to DLQ
send_to_dlq(message, error=str(e))
# Commit to skip this message
consumer.commit({
topic_partition: OffsetAndMetadata(
message.offset + 1,
None
)
})
except Exception as e:
# Unknown error - don't commit, will retry
logger.error(f"Processing failed: {e}")
raise
except Exception as e:
logger.error(f"Consumer error: {e}")
time.sleep(5) # Backoff before retry3. Dead Letter Queue (DLQ)
Handle poison messages:
dlq_producer = KafkaProducer(
bootstrap_servers=['kafka1:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def send_to_dlq(message, error):
dlq_message = {
'original_topic': message.topic,
'original_partition': message.partition,
'original_offset': message.offset,
'original_key': message.key,
'original_value': message.value,
'error': str(error),
'timestamp': datetime.utcnow().isoformat()
}
dlq_producer.send('dlq.events', value=dlq_message)
# Alert on DLQ threshold
dlq_count = get_dlq_count()
if dlq_count > 100:
send_alert(f"High DLQ volume: {dlq_count} messages")4. Backpressure Handling
Monitor consumer lag:
from kafka import KafkaAdminClient
from kafka.admin import OffsetSpec
def check_consumer_lag():
admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])
# Get current offsets
consumer_offsets = admin.list_consumer_group_offsets('events-processor')
# Get latest offsets
topics = {('events', partition): OffsetSpec.latest()
for partition in range(12)}
latest_offsets = admin.list_offsets(topics)
total_lag = 0
for (topic, partition), offset_info in latest_offsets.items():
current = consumer_offsets[(topic, partition)].offset
latest = offset_info.offset
lag = latest - current
total_lag += lag
# Alert if lag > threshold
if total_lag > 1000000: # 1M messages behind
scale_consumers(current_count=5, target_count=10)
return total_lagAuto-scaling consumers:
def scale_consumers(current_count, target_count):
if target_count > current_count:
for i in range(target_count - current_count):
start_new_consumer()
logger.info(f"Scaled consumers: {current_count} → {target_count}")5. Exactly-Once Semantics
Transactional producer:
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092'],
transactional_id='events-processor-0', # Unique per instance
enable_idempotence=True,
acks='all'
)
# Initialize transactions
producer.init_transactions()
def process_with_transactions(messages):
try:
producer.begin_transaction()
for message in messages:
# Process
result = transform(message)
# Produce to output topic
producer.send('processed-events', value=result)
# Commit transaction
producer.commit_transaction()
except Exception as e:
# Abort on failure
producer.abort_transaction()
raise6. Monitoring & Alerting
Key metrics to track:
metrics = {
'consumer_lag': check_consumer_lag(),
'dlq_count': count_dlq_messages(),
'processing_rate': get_processing_rate(),
'error_rate': get_error_rate(),
'rebalance_count': get_rebalance_count()
}
# Alert thresholds
if metrics['consumer_lag'] > 1000000:
alert("High consumer lag")
if metrics['dlq_count'] > 100:
alert("High DLQ volume")
if metrics['error_rate'] > 0.01: # 1%
alert("High error rate")7. Circuit Breaker Pattern
Protect downstream services:
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
logger.error(f"Circuit breaker opened: {e}")
raise
# Usage
db_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
def process_message(message):
# Use circuit breaker for external calls
result = db_breaker.call(save_to_database, message)
return resultProduction Checklist
✅ Producer:
acks=allfor durabilityenable_idempotence=True- Retry configuration
- Error handling + DLQ
✅ Consumer:
- Manual offset commits
- Session timeout tuning
- Error handling per message
- DLQ for poison messages
✅ Monitoring:
- Consumer lag alerts
- DLQ volume tracking
- Error rate monitoring
- Rebalance frequency
✅ Resilience:
- Multiple brokers (3+ replicas)
- Circuit breakers for downstream
- Auto-scaling on lag
- Graceful shutdown
Common Pitfalls
❌ Auto-committing offsets - Lose data on crashes
❌ No DLQ - Poison messages block pipeline
❌ Single broker - No fault tolerance
❌ Ignoring consumer lag - Delays grow unbounded
❌ No backpressure handling - Cascading failures
❌ Missing circuit breakers - Downstream failures propagate
Performance vs Reliability
| Configuration | Throughput | Reliability |
|---|---|---|
acks=0 | Highest | Lowest |
acks=1 | Medium | Medium |
acks=all | Lowest | Highest ✅ |
Our choice: acks=all + batching for throughput
Key Takeaways
- Exactly-once semantics - Use transactions for critical data
- Dead letter queues - Handle poison messages gracefully
- Circuit breakers - Protect downstream systems
- Monitor lag - Alert and auto-scale
- Manual commits - Control exactly when offsets advance
- Graceful degradation - DLQ over pipeline failure
Building fault-tolerant Kafka pipelines requires thinking about every failure mode upfront, not after production incidents.
Related: Kafka vs Kinesis | Scaling Spark