-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Ensure timeout is checked after each fetch position update #2668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -781,7 +781,7 @@ def position(self, partition, timeout_ms=None): | |||
# batch update fetch positions for any partitions without a valid position | |||
if self._update_fetch_positions(timeout_ms=timer.timeout_ms): | |||
position = self._subscription.assignment[partition].position | |||
elif timer.expired: | |||
if timer.expired: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if position is None and time.expired
?
Do you have any more details on this case? The internal contract is/should be that |
Yes, |
Here is a minimal example. FROM apache/kafka:4.0.0
COPY server.properties /kafka/config/kraft/
EXPOSE 9092 with
build and run it: docker build -t kafka .
docker run --rm --name kafka -p 9092:9092 -it kafka And python code import time
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
prod = KafkaProducer(bootstrap_servers='localhost:9092')
cons = KafkaConsumer(
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
session_timeout_ms=1000,
)
partitions = cons.partitions_for_topic('test')
cons.assign([TopicPartition('test', p) for p in partitions])
end = cons.end_offsets(list(cons.assignment()))
for partition in cons.assignment():
print(1)
while (pos := cons.position(partition, timeout_ms=1000)) \
is not None and pos < end[partition]:
print(2)
prod.send(
topic='test',
value=b'0000',
key=b'1111',
timestamp_ms=int(time.time() * 1000))
prod.flush() First run finishes normally, on 2nd run it hangs on |
Since kafka-python 2.2.0 the api has changed. In KafkaConsumer.position method a while loop was added, so to avoid eternal loop a `timeout_ms` parameter can be passed. I pass 1000 ms timeout, for it is long enough for our back-end to respond and smaller than our 10 s cache response timeout. The timeout parameter was not supported in versions before 2.1.0. However, this fix works until kafka-python 2.2.9, where I believe there is a bug, which will be hopefully resolved soon dpkp/kafka-python#2668 Until then, kafka-python version should be fixed. Change-Id: I45d95e03107904e60412996f64253429dcf7fde4 Reviewed-on: https://forge.frm2.tum.de/review/c/frm2/nicos/nicos/+/36950 Tested-by: Jenkins Automated Tests <[email protected]> Reviewed-by: Bjoern Pedersen <[email protected]>
For
Consumer.position()
I had a case whenself._update_fetch_positions
always resulted inTrue
andposition
was alwaysNone
. This hangs the loop for ~305 seconds, and it appears to me that the lineis never reached.
The root cause is probably how
timeout_ms
value is propagated and checked in the following methods, however I believe my change is still valid as the timeout check should work disregarding ofself._update_fetch_positions
value.