-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16243: Make sure that we do not exceed max poll interval inside poll #15372
Conversation
… poll The consumer keeps a poll timer which is used to ensure liveness of the application thread. The poll timer automatically updates while the `Consumer.poll(Duration)` method is blocked, while the newer consumer only updates the poll timer when a new call to `Consumer.poll(Duration)` is issued. This means that the kafka-console-consumer.sh tools, which uses a very long timeout by default, works differently with the new consumer, with the consumer proactively rejoining the group during long poll timeouts. We solve the problem by (a) repeatedly sending `PollApplicationEvents` to the background thread, not just on the first call of `poll` and (b) making sure that the application thread doesn't block for so long that it runs out of `max.poll.interval`. An integration test is added to make sure that we do not rejoin the group when a long poll timeout is used with a low `max.poll.interval`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@cadonna could you please have a look? |
|
||
val initialAssignedCalls = listener.callsToAssigned | ||
|
||
consumer.poll(Duration.ofMillis(2000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: semicolon not needed here, same in the next 2 lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
thanks for the changes! lgtm. |
I like the last commit msg :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
consumer.poll(Duration.ofMillis(2000)) | ||
|
||
// Give enough time to rejoin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
This comment is a bit confusing. What is it supposed to clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
consumer.poll(Duration.ofMillis(500)) | ||
consumer.poll(Duration.ofMillis(500)) | ||
|
||
// Check that we did not rejoin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this comment? I think it would be better to delete it and to rename initialAssignedCalls
to something more meaningful like callsToAssignedAfterFirstRebalance
or callsToAssignedBeforePolls
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
) { | ||
return 0L; | ||
} | ||
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own education: why / 2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's somewhat arbitrary. We need to make sure that the application thread doesn't block so long that the poll timer expires. We want to let it unblock sometime before the timer expires, and send an event to the background thread that it's still polling, and give the background thread time to process the event and reset the poll timer.
… poll (apache#15372) The consumer keeps a poll timer, which is used to ensure liveness of the application thread. The poll timer automatically updates while the Consumer.poll(Duration) method is blocked, while the newer consumer only updates the poll timer when a new call to Consumer.poll(Duration) is issued. This means that the kafka-console-consumer.sh tools, which uses a very long timeout by default, works differently with the new consumer, with the consumer proactively rejoining the group during long poll timeouts. This change solves the problem by (a) repeatedly sending PollApplicationEvents to the background thread, not just on the first call of poll and (b) making sure that the application thread doesn't block for so long that it runs out of max.poll.interval. An integration test is added to make sure that we do not rejoin the group when a long poll timeout is used with a low max.poll.interval. Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield <[email protected]>, Bruno Cadonna <[email protected]>
… poll (apache#15372) The consumer keeps a poll timer, which is used to ensure liveness of the application thread. The poll timer automatically updates while the Consumer.poll(Duration) method is blocked, while the newer consumer only updates the poll timer when a new call to Consumer.poll(Duration) is issued. This means that the kafka-console-consumer.sh tools, which uses a very long timeout by default, works differently with the new consumer, with the consumer proactively rejoining the group during long poll timeouts. This change solves the problem by (a) repeatedly sending PollApplicationEvents to the background thread, not just on the first call of poll and (b) making sure that the application thread doesn't block for so long that it runs out of max.poll.interval. An integration test is added to make sure that we do not rejoin the group when a long poll timeout is used with a low max.poll.interval. Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield <[email protected]>, Bruno Cadonna <[email protected]>
The consumer keeps a poll timer, which is used to ensure liveness of the application thread. The poll timer automatically updates while the
Consumer.poll(Duration)
method is blocked, while the newer consumer only updates the poll timer when a new call toConsumer.poll(Duration)
is issued. This means that the kafka-console-consumer.sh tools, which uses a very long timeout by default, works differently with the new consumer, with the consumer proactively rejoining the group during long poll timeouts.This change solves the problem by (a) repeatedly sending
PollApplicationEvents
to the background thread, not just on the first call ofpoll
and (b) making sure that the application thread doesn't block for so long that it runs out ofmax.poll.interval
.An integration test is added to make sure that we do not rejoin the group when a long poll timeout is used with a low
max.poll.interval
.Committer Checklist (excluded from commit message)