Back to Blog
kafkadebuggingstreamingpythonproduction

Debugging Kafka Consumer Lag in Production: A Real Case Study

2024-07-2210 min read

Debugging Kafka Consumer Lag in Production: A Real Case Study

Consumer lag is Kafka's most deceptive metric. A number sitting at zero for weeks can spike to millions within minutes — and the root cause is almost never what you first suspect. This is the exact story of how we tracked down 4.2 million message lag that was threatening our SLA, and what we did to fix it permanently.

The Incident

At 03:14 AM on a Tuesday, our alerting fired: consumer lag on the user-events topic had crossed 500,000 messages and was climbing fast. By the time the on-call engineer logged in at 03:22, it was at 2.1 million. By 04:00, it had peaked at 4.2 million.

The pipeline processed clickstream events feeding a real-time recommendation engine. Every 100,000 messages of lag translated to roughly 45 seconds of stale recommendations for 2.3 million active users.

We had one hour before the SLA breach window opened.


Step 1: Establish a Lag Baseline

Before touching anything, get a snapshot of current lag across all consumer groups and partitions:

# View lag for a specific consumer group
kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker:9092 \
  --describe \
  --group user-events-consumer
 
# Output:
# GROUP                TOPIC        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# user-events-consumer user-events  0          8412901         8901234         488333
# user-events-consumer user-events  1          8100442         12543210        4442768
# user-events-consumer user-events  2          9023011         9187443         164432

The smoking gun was immediately visible. Partition 1 had 4.4 million messages of lag while partitions 0 and 2 were nearly caught up. This was not a throughput problem — it was a partition imbalance problem.

# Check all consumer groups at once
kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker:9092 \
  --list | xargs -I {} kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker:9092 \
  --describe \
  --group {}

Step 2: Identify Which Consumer Owns the Hot Partition

# Show member assignments — which consumer instance owns which partition
kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker:9092 \
  --describe \
  --group user-events-consumer \
  --members \
  --verbose
 
# Output:
# CONSUMER-ID                        HOST            PARTITIONS
# consumer-1-abc123                  10.0.1.14       0
# consumer-2-def456                  10.0.1.15       1          <-- lagging
# consumer-3-ghi789                  10.0.1.16       2

Consumer instance consumer-2-def456 on host 10.0.1.15 owned partition 1. Next step: find out why it was processing so slowly.

# Quick lag monitor script — poll every 30 seconds
from kafka.admin import KafkaAdminClient
from kafka import KafkaConsumer, TopicPartition
import time
 
def get_lag(bootstrap_servers, group_id, topic):
    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        enable_auto_commit=False
    )
    partitions = consumer.partitions_for_topic(topic)
    tps = [TopicPartition(topic, p) for p in partitions]
    
    end_offsets = consumer.end_offsets(tps)
    committed = {tp: consumer.committed(tp) for tp in tps}
    
    for tp in tps:
        lag = end_offsets[tp] - (committed[tp] or 0)
        print(f"Partition {tp.partition}: lag={lag:,}")
    
    consumer.close()
 
while True:
    get_lag("kafka-broker:9092", "user-events-consumer", "user-events")
    time.sleep(30)

Step 3: Profile the Consumer

We SSH'd into 10.0.1.15 and looked at what the consumer process was actually doing:

# CPU and memory at a glance
top -p $(pgrep -f user-events-consumer)
 
# Detailed thread activity
jstack <pid> | grep -A 20 "kafka-coordinator"
 
# Check GC pressure (if JVM-based)
jstat -gcutil <pid> 1000 10

For our Python consumer, we used py-spy:

pip install py-spy
py-spy top --pid $(pgrep -f consumer_worker.py)

The output was revealing:

  %Own   %Total  OwnTime  TotalTime  Function (filename)
  78.0%   78.0%   23.50s    23.50s   deserialize (schema_registry_client.py:214)
   8.0%    8.0%    2.40s     2.40s   poll (kafka/consumer/fetcher.py:305)
   4.0%    4.0%    1.20s     1.20s   process_event (consumer_worker.py:87)

78% of CPU time was spent in schema deserialization. The consumer was fetching Avro schemas from the Schema Registry on every single message.


Step 4: Fix the Deserialization Bottleneck

The consumer had a misconfigured schema cache. Every message triggered a network round-trip to the Schema Registry to fetch the schema — at 12,000 messages/second on partition 1, that was 12,000 HTTP calls per second to a service that could handle maybe 500.

Before:

from confluent_kafka.avro import AvroConsumer
 
consumer = AvroConsumer({
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'user-events-consumer',
    'schema.registry.url': 'http://schema-registry:8081',
    # No cache configured — fetches schema on every message
})

After:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka import DeserializingConsumer
from functools import lru_cache
 
schema_registry_conf = {
    'url': 'http://schema-registry:8081',
    'max.schemas.per.subject': 100       # Cache up to 100 schema versions
}
 
sr_client = SchemaRegistryClient(schema_registry_conf)
 
# Cache schema lookups in-process
@lru_cache(maxsize=256)
def get_schema(schema_id: int):
    return sr_client.get_schema(schema_id)
 
avro_deserializer = AvroDeserializer(
    sr_client,
    schema_str=None,   # Resolve from wire format
)
 
consumer = DeserializingConsumer({
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'user-events-consumer',
    'value.deserializer': avro_deserializer,
    'fetch.min.bytes': 1024 * 1024,      # Batch fetch 1MB minimum
    'fetch.max.wait.ms': 500,
})

Impact: Deserialization CPU dropped from 78% to 6%. Partition 1 consumption rate went from 800 msg/s to 14,200 msg/s.


Step 5: Address the Rebalance Storm

While we were fixing the deserialization issue, we noticed a secondary problem in the logs: consumer group rebalances were happening every 90–120 seconds:

# Stream consumer group logs
kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker:9092\
  --describe \
  --group user-events-consumer \
  --state
 
# State kept flipping between:
# Stable → PreparingRebalance → CompletingRebalance → Stable

The cause: our consumer's max.poll.interval.ms was set to the default (5 minutes), but the slow deserialization was causing poll() calls to take over 6 minutes — Kafka's coordinator assumed the consumer was dead and triggered a rebalance.

consumer = DeserializingConsumer({
    'bootstrap.servers': 'kafka-broker:9092',
    'group.id': 'user-events-consumer',
    'value.deserializer': avro_deserializer,
 
    # Give consumers more time between polls
    'max.poll.interval.ms': 600000,       # 10 minutes
    'session.timeout.ms': 45000,          # 45 seconds
    'heartbeat.interval.ms': 15000,       # 15 seconds (1/3 of session timeout)
 
    # Fetch larger batches to reduce poll frequency
    'max.poll.records': 2000,             # Was 500 (default)
    'fetch.min.bytes': 1024 * 1024,       # 1MB
    'fetch.max.wait.ms': 500,
})

Impact: Rebalances dropped from one every 90 seconds to zero over a 6-hour monitoring window.


Step 6: Scale Consumers to Match Partition Count

Even with the deserialization fix, partition 1 was still recovering lag slower than we needed. We scaled the consumer group to match partition count exactly:

# kubernetes deployment patch
kubectl scale deployment user-events-consumer --replicas=3
 
# Verify each partition has exactly one consumer
kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker:9092 \
  --describe \
  --group user-events-consumer \
  --members

Rule: You can have at most as many active consumers as partitions. Extra consumers sit idle. For maximum parallelism, num_consumers == num_partitions.

If you need more throughput than your partition count allows, increase partitions first:

# Increase partitions from 3 to 12
kafka-topics.sh \
  --bootstrap-server kafka-broker:9092 \
  --alter \
  --topic user-events \
  --partitions 12

Warning: Partition increases are irreversible and will cause a full consumer group rebalance. Do this during a maintenance window.


Permanent Monitoring Setup

After the incident, we wired lag monitoring directly into our alerting stack:

# lag_exporter.py — Prometheus metrics for Kafka consumer lag
from prometheus_client import Gauge, start_http_server
from kafka import KafkaConsumer, TopicPartition
import time
 
LAG_GAUGE = Gauge(
    'kafka_consumer_lag',
    'Consumer lag per partition',
    ['group', 'topic', 'partition']
)
 
def export_lag(bootstrap_servers, group_id, topic, interval=15):
    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=f"{group_id}-monitor",   # Separate monitoring group
        enable_auto_commit=False
    )
    
    partitions = consumer.partitions_for_topic(topic)
    tps = [TopicPartition(topic, p) for p in partitions]
    consumer.assign(tps)
    
    while True:
        end_offsets = consumer.end_offsets(tps)
        
        # Use the real consumer group's committed offsets
        main_consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            enable_auto_commit=False
        )
        
        for tp in tps:
            committed = main_consumer.committed(tp) or 0
            lag = end_offsets[tp] - committed
            LAG_GAUGE.labels(
                group=group_id,
                topic=tp.topic,
                partition=str(tp.partition)
            ).set(lag)
        
        main_consumer.close()
        time.sleep(interval)
 
if __name__ == "__main__":
    start_http_server(8000)
    export_lag("kafka-broker:9092", "user-events-consumer", "user-events")

Alert thresholds we settled on:

  • Warning: lag > 50,000 and growing for 5 minutes
  • Critical: lag > 500,000 or lag doubling every 10 minutes
  • Page: lag > 2,000,000 (SLA breach imminent)

Timeline of the Incident

  • 03:14 — Lag alert fires at 500K messages
  • 03:22 — On-call engineer logs in; lag at 2.1M
  • 03:31 — Partition imbalance identified via kafka-consumer-groups.sh
  • 03:38 — Deserialization bottleneck identified via py-spy
  • 03:52 — Schema cache fix deployed; consumption rate climbs to 14K msg/s
  • 04:09 — Rebalance storm root cause identified and patched
  • 04:23 — Consumer group scaled to 3 replicas matching partition count
  • 04:41 — Lag back to zero; SLA window never breached
  • 04:45 — Lag monitoring and alerting improved

Total time to resolution: 91 minutes.


Real-World Impact

  • Peak lag: 4,200,000 messages
  • Time to resolve: 91 minutes (SLA was 2 hours)
  • Deserialization CPU: 78% → 6%
  • Consumption rate (partition 1): 800 msg/s → 14,200 msg/s
  • Rebalance frequency: Every 90 seconds → Zero
  • Recommendation staleness: Eliminated entirely post-fix

Key Takeaways

  1. Always look at per-partition lag first — imbalance is a different problem than overall throughput
  2. Profile before optimizing — py-spy revealed the real bottleneck in under 5 minutes
  3. Schema Registry without caching will destroy throughput at scale
  4. Rebalance storms and slow consumers are often the same root cause, just seen from different angles
  5. max.poll.records and max.poll.interval.ms need to be tuned together
  6. Never increase partitions without a maintenance window and a rebalance plan

Consumer lag is a symptom. Always dig to the partition level before assuming you need more hardware.


Next Steps: