Back to Blog
kafkastreamingreliabilityfault-tolerance

Building Fault-Tolerant Kafka Pipelines

2024-02-2810 min read

Building Fault-Tolerant Kafka Pipelines

Streaming pipelines fail. Networks partition, services crash, and bad data arrives. The question isn't whether failures will happen, but how your system handles them.

The Challenge

We built a real-time event processing pipeline handling 1M+ events per day. Early versions had three critical problems:

  1. Duplicate processing during consumer restarts
  2. Lost events when downstream services failed
  3. Pipeline stalls from poison messages

Here's how we fixed each issue to achieve 99.99% reliability.

Exactly-Once Semantics

The Problem

# At-least-once: Can process duplicates
while True:
    messages = consumer.poll()
    process(messages)
    consumer.commit()  # What if crash happens here?

If the consumer crashes after processing but before committing, messages get reprocessed on restart.

The Solution: Idempotent Processing

Pattern 1: Idempotency Keys

from kafka import KafkaConsumer
import psycopg2
 
consumer = KafkaConsumer(
    'events',
    enable_auto_commit=False,
    isolation_level='read_committed'
)
 
for message in consumer:
    event_id = message.key.decode('utf-8')
    
    # Insert with unique constraint on event_id
    try:
        cursor.execute(
            "INSERT INTO events (id, data) VALUES (%s, %s)",
            (event_id, message.value)
        )
        conn.commit()
        consumer.commit()
    except psycopg2.IntegrityError:
        # Already processed - skip
        conn.rollback()
        consumer.commit()

Pattern 2: Transactional Writes

# Spark Structured Streaming with exactly-once
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()
 
query = df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "processed") \
    .option("checkpointLocation", "s3://checkpoints/") \
    .start()

The checkpoint location tracks processed offsets atomically with output.

Dead Letter Queues

The Problem

A single malformed message can stall your entire pipeline.

The Solution: Graceful Degradation

def process_with_dlq(message):
    try:
        # Main processing logic
        event = parse_event(message.value)
        validate_schema(event)
        store_in_database(event)
        consumer.commit()
        
    except ValidationError as e:
        # Send to DLQ for manual inspection
        producer.send(
            'events-dlq',
            key=message.key,
            value=message.value,
            headers=[
                ('error_type', 'ValidationError'),
                ('error_message', str(e)),
                ('original_topic', 'events'),
                ('timestamp', str(datetime.now()))
            ]
        )
        consumer.commit()  # Don't retry validation errors
        
    except TransientError as e:
        # Transient errors: retry with backoff
        retry_with_exponential_backoff(message, attempt_count)

DLQ Monitoring:

# Alert on DLQ buildup
dlq_count = get_dlq_message_count()
if dlq_count > THRESHOLD:
    send_alert("DLQ threshold exceeded", dlq_count)

Handling Backpressure

When consumers can't keep up with producers:

# Monitor consumer lag
from kafka.admin import KafkaAdminClient
 
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
 
def get_consumer_lag():
    consumer_groups = admin.list_consumer_groups()
    lag_info = {}
    
    for group in consumer_groups:
        offsets = admin.list_consumer_group_offsets(group[0])
        # Calculate lag: latest_offset - committed_offset
        lag_info[group[0]] = calculate_lag(offsets)
    
    return lag_info
 
# Auto-scale consumers based on lag
if lag > THRESHOLD:
    scale_up_consumers()

Schema Evolution

Avro with Schema Registry

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
 
# Define schema with defaults for backward compatibility
value_schema = avro.loads('''
{
    "type": "record",
    "name": "Event",
    "fields": [
        {"name": "id", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "data", "type": "string"},
        {"name": "version", "type": "int", "default": 1}
    ]
}
''')
 
producer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=value_schema)

Compatibility Modes:

  • Backward: New consumers can read old data
  • Forward: Old consumers can read new data
  • Full: Both backward and forward

Circuit Breaker Pattern

Prevent cascade failures when downstream services are unavailable:

from pybreaker import CircuitBreaker
 
# Configure circuit breaker
db_breaker = CircuitBreaker(
    fail_max=5,           # Open after 5 failures
    timeout_duration=60,  # Stay open for 60 seconds
    expected_exception=DatabaseError
)
 
@db_breaker
def write_to_database(event):
    # This call is protected by circuit breaker
    db.insert(event)
 
try:
    write_to_database(event)
except CircuitBreakerError:
    # Database is down - route to backup
    write_to_s3_backup(event)
    producer.send('events-retry', event)

Monitoring & Observability

Key Metrics:

# Producer metrics
- messages_sent_rate
- error_rate
- request_latency_p99
 
# Consumer metrics
- consumer_lag
- messages_consumed_rate
- processing_time_p95
- dlq_message_count
 
# Broker metrics
- under_replicated_partitions
- active_controller_count
- offline_partitions_count

Alerting Strategy:

alerts = {
    'critical': {
        'consumer_lag > 10000': 'page',
        'offline_partitions > 0': 'page',
        'dlq_growth_rate > 100/min': 'page'
    },
    'warning': {
        'consumer_lag > 5000': 'slack',
        'processing_latency_p99 > 5s': 'slack'
    }
}

Production Checklist

Exactly-once semantics via idempotent processing or transactions
Dead letter queue for poison messages
Schema registry for evolution
Circuit breakers for downstream dependencies
Comprehensive monitoring with actionable alerts
Backpressure handling with auto-scaling
Replay capability from any offset
Data encryption at rest and in transit

Real-World Results

After implementing these patterns:

  • Availability: 99.9% → 99.99%
  • Data loss incidents: 5/month → 0
  • Mean time to recovery: 45 min → 5 min
  • Duplicate processing rate: 2% → 0.001%

Key Takeaways

  1. Design for failure from day one
  2. Idempotency is non-negotiable for exactly-once
  3. DLQs prevent pipeline stalls
  4. Monitor consumer lag aggressively
  5. Test failure scenarios regularly

Build systems that fail gracefully.


Related: