Back to Blog

Building Fault-Tolerant Kafka Pipelines

2023-12-0511 min read

Building Fault-Tolerant Kafka Pipelines

After running Kafka pipelines processing 10M+ events/day for 2 years, here's what makes them resilient in production.

The Failure Modes

Kafka pipelines fail in predictable ways:

  1. Consumer lag spikes - Can't keep up with producer rate
  2. Poison messages - Malformed data crashes consumers
  3. Network partitions - Brokers become unreachable
  4. Processing errors - Downstream systems fail
  5. Data skew - One partition overwhelms a consumer

Architecture for Resilience

1. Producer Reliability

Configuration:

from kafka import KafkaProducer
import json
 
producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    
    # Durability
    acks='all',  # Wait for all replicas
    retries=10,  # Retry on transient failures
    max_in_flight_requests_per_connection=1,  # Maintain order
    
    # Idempotence
    enable_idempotence=True,  # Prevent duplicates
    
    # Serialization
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: str(k).encode('utf-8'),
    
    # Batching for throughput
    batch_size=16384,
    linger_ms=10,
    compression_type='snappy'
)

Send with error handling:

def send_with_retry(topic, key, value, max_retries=3):
    for attempt in range(max_retries):
        try:
            future = producer.send(
                topic,
                key=key,
                value=value
            )
            # Block until sent
            metadata = future.get(timeout=10)
            return metadata
        except Exception as e:
            if attempt == max_retries - 1:
                # Dead letter queue
                send_to_dlq(topic, key, value, error=str(e))
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

2. Consumer Resilience

Configuration:

from kafka import KafkaConsumer
 
consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
    group_id='events-processor',
    
    # Offset management
    enable_auto_commit=False,  # Manual commit for reliability
    auto_offset_reset='earliest',  # Don't lose data
    
    # Session management
    session_timeout_ms=30000,  # 30s before rebalance
    heartbeat_interval_ms=10000,  # 10s heartbeat
    max_poll_interval_ms=300000,  # 5 min max processing time
    
    # Fetch settings
    max_poll_records=500,  # Batch size
    fetch_min_bytes=1024,  # Wait for data
    fetch_max_wait_ms=500,
    
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

Processing loop with error handling:

def process_messages():
    while True:
        try:
            messages = consumer.poll(timeout_ms=1000, max_records=500)
            
            for topic_partition, records in messages.items():
                for message in records:
                    try:
                        process_message(message.value)
                        
                        # Commit after successful processing
                        consumer.commit({
                            topic_partition: OffsetAndMetadata(
                                message.offset + 1, 
                                None
                            )
                        })
                        
                    except ProcessingError as e:
                        # Poison message - send to DLQ
                        send_to_dlq(message, error=str(e))
                        
                        # Commit to skip this message
                        consumer.commit({
                            topic_partition: OffsetAndMetadata(
                                message.offset + 1,
                                None
                            )
                        })
                        
                    except Exception as e:
                        # Unknown error - don't commit, will retry
                        logger.error(f"Processing failed: {e}")
                        raise
                        
        except Exception as e:
            logger.error(f"Consumer error: {e}")
            time.sleep(5)  # Backoff before retry

3. Dead Letter Queue (DLQ)

Handle poison messages:

dlq_producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
 
def send_to_dlq(message, error):
    dlq_message = {
        'original_topic': message.topic,
        'original_partition': message.partition,
        'original_offset': message.offset,
        'original_key': message.key,
        'original_value': message.value,
        'error': str(error),
        'timestamp': datetime.utcnow().isoformat()
    }
    
    dlq_producer.send('dlq.events', value=dlq_message)
    
    # Alert on DLQ threshold
    dlq_count = get_dlq_count()
    if dlq_count > 100:
        send_alert(f"High DLQ volume: {dlq_count} messages")

4. Backpressure Handling

Monitor consumer lag:

from kafka import KafkaAdminClient
from kafka.admin import OffsetSpec
 
def check_consumer_lag():
    admin = KafkaAdminClient(bootstrap_servers=['kafka1:9092'])
    
    # Get current offsets
    consumer_offsets = admin.list_consumer_group_offsets('events-processor')
    
    # Get latest offsets
    topics = {('events', partition): OffsetSpec.latest() 
              for partition in range(12)}
    latest_offsets = admin.list_offsets(topics)
    
    total_lag = 0
    for (topic, partition), offset_info in latest_offsets.items():
        current = consumer_offsets[(topic, partition)].offset
        latest = offset_info.offset
        lag = latest - current
        total_lag += lag
        
    # Alert if lag > threshold
    if total_lag > 1000000:  # 1M messages behind
        scale_consumers(current_count=5, target_count=10)
        
    return total_lag

Auto-scaling consumers:

def scale_consumers(current_count, target_count):
    if target_count > current_count:
        for i in range(target_count - current_count):
            start_new_consumer()
        logger.info(f"Scaled consumers: {current_count}{target_count}")

5. Exactly-Once Semantics

Transactional producer:

producer = KafkaProducer(
    bootstrap_servers=['kafka1:9092'],
    transactional_id='events-processor-0',  # Unique per instance
    enable_idempotence=True,
    acks='all'
)
 
# Initialize transactions
producer.init_transactions()
 
def process_with_transactions(messages):
    try:
        producer.begin_transaction()
        
        for message in messages:
            # Process
            result = transform(message)
            
            # Produce to output topic
            producer.send('processed-events', value=result)
        
        # Commit transaction
        producer.commit_transaction()
        
    except Exception as e:
        # Abort on failure
        producer.abort_transaction()
        raise

6. Monitoring & Alerting

Key metrics to track:

metrics = {
    'consumer_lag': check_consumer_lag(),
    'dlq_count': count_dlq_messages(),
    'processing_rate': get_processing_rate(),
    'error_rate': get_error_rate(),
    'rebalance_count': get_rebalance_count()
}
 
# Alert thresholds
if metrics['consumer_lag'] > 1000000:
    alert("High consumer lag")
 
if metrics['dlq_count'] > 100:
    alert("High DLQ volume")
    
if metrics['error_rate'] > 0.01:  # 1%
    alert("High error rate")

7. Circuit Breaker Pattern

Protect downstream services:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
        
    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure > self.timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            if self.state == 'HALF_OPEN':
                self.state = 'CLOSED'
                self.failure_count = 0
            return result
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = 'OPEN'
                logger.error(f"Circuit breaker opened: {e}")
            
            raise
 
# Usage
db_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
 
def process_message(message):
    # Use circuit breaker for external calls
    result = db_breaker.call(save_to_database, message)
    return result

Production Checklist

Producer:

  • acks=all for durability
  • enable_idempotence=True
  • Retry configuration
  • Error handling + DLQ

Consumer:

  • Manual offset commits
  • Session timeout tuning
  • Error handling per message
  • DLQ for poison messages

Monitoring:

  • Consumer lag alerts
  • DLQ volume tracking
  • Error rate monitoring
  • Rebalance frequency

Resilience:

  • Multiple brokers (3+ replicas)
  • Circuit breakers for downstream
  • Auto-scaling on lag
  • Graceful shutdown

Common Pitfalls

Auto-committing offsets - Lose data on crashes
No DLQ - Poison messages block pipeline
Single broker - No fault tolerance
Ignoring consumer lag - Delays grow unbounded
No backpressure handling - Cascading failures
Missing circuit breakers - Downstream failures propagate

Performance vs Reliability

ConfigurationThroughputReliability
acks=0HighestLowest
acks=1MediumMedium
acks=allLowestHighest ✅

Our choice: acks=all + batching for throughput

Key Takeaways

  1. Exactly-once semantics - Use transactions for critical data
  2. Dead letter queues - Handle poison messages gracefully
  3. Circuit breakers - Protect downstream systems
  4. Monitor lag - Alert and auto-scale
  5. Manual commits - Control exactly when offsets advance
  6. Graceful degradation - DLQ over pipeline failure

Building fault-tolerant Kafka pipelines requires thinking about every failure mode upfront, not after production incidents.


Related: Kafka vs Kinesis | Scaling Spark