Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

The ManagedBalancedConsumer is not resilient to kafka node failures #517

@thebigw4lrus

Description

@thebigw4lrus

PyKafka version: master ( commit - 9ae97cb )
Kafka version: 0.9.0

We have Kafka configured with a 3 node cluster. Also we have a producer producing messages, and a managed consumer consuming them. It turned out that whenever a kafka node fails for whatever reason(including because we stop one of them due to maintenance), the client is not able to overcome this.

This is the way we reproduce this problem:

from pykafka import PyKafkaClient
from time import sleep
HOSTS = 'server1:9092,server2:9092,server3:9092'

client = PyKafkaClient(HOSTS)
consumer = client.topics['test_topic'].get_balanced_consumer(consumer_group='my_consumer',
                                                             managed=True,
                                                             auto_commit=False)

for message in consumer.messages():
    print(message.value)
    consumer.commit_offsets()
    print(consumer.consumer.held_offsets)
    sleep(4)

And the error that we are actually seeing is:

ERROR:pykafka.balancedconsumer:Exception encountered in worker thread:
  File "/usr/local/lib/python2.7/dist-packages/pykafka/managedbalancedconsumer.py", line 218, in fetcher                                                                  
    self._consumer_id)                                                                                                                   
    ReplicaFetcherManager)
  File "/usr/local/lib/python2.7/dist-packages/pykafka/broker.py", line 458, in heartbeat                                                                                 
  ReplicaFetcherManager)
    return future.get(HeartbeatResponse)                                                                                                                                 
    ReplicaFetcherManager)
  File "/usr/local/lib/python2.7/dist-packages/pykafka/handlers.py", line 65, in get                                                                                    
  ReplicaFetcherManager)
    raise self.error

So it seems that the broker's heartbeat method is not able to handle when a broker goes down. We were expecting the heartbeat method (and the client in general) be resilient to a kafka node failure (having in mind that we have another 2 nodes in a good state).

Also we tried pykafka 2.2.1, and we saw a similar problem, but we got SocketDisconnectedError raised by this method. This we managed to fix by choosing a random broker for each retry. The latest pykafka version, however, deals with broker failures slightly differently.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions