Skip to content

Commit

Permalink
KAFKA-16243: Make sure that we do not exceed max poll interval inside…
Browse files Browse the repository at this point in the history
… poll (#15372)

The consumer keeps a poll timer, which is used to ensure liveness of the application thread. The poll timer automatically updates while the Consumer.poll(Duration) method is blocked, while the newer consumer only updates the poll timer when a new call to Consumer.poll(Duration) is issued. This means that the kafka-console-consumer.sh tools, which uses a very long timeout by default, works differently with the new consumer, with the consumer proactively rejoining the group during long poll timeouts.

This change solves the problem by (a) repeatedly sending PollApplicationEvents to the background thread, not just on the first call of poll and (b) making sure that the application thread doesn't block for so long that it runs out of max.poll.interval.

An integration test is added to make sure that we do not rejoin the group when a long poll timeout is used with a low max.poll.interval.

Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield <[email protected]>, Bruno Cadonna <[email protected]>
  • Loading branch information
lucasbru authored Feb 20, 2024
1 parent a26a1d8 commit 5854139
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,9 +698,11 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

applicationEventHandler.add(new PollApplicationEvent(timer.currentTimeMs()));

do {

// Make sure to let the background thread know that we are still polling.
applicationEventHandler.add(new PollApplicationEvent(timer.currentTimeMs()));

// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
// of the fetches. A wakeup between returned fetches and returning records would lead to never
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,22 @@ public MembershipManager membershipManager() {
* are sent, so blocking for longer than the heartbeat interval might mean the application thread is not
* responsive to changes.
*
* Similarly, we may have to unblock the application thread to send a `PollApplicationEvent` to make sure
* our poll timer will not expire while we are polling.
*
* <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat
* delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive.
*/
@Override
public long maximumTimeToWait(long currentTimeMs) {
boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight();
return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
pollTimer.update(currentTimeMs);
if (
pollTimer.isExpired() ||
(membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight())
) {
return 0L;
}
return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
ensureNoRebalance(consumer, listener)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString)
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)

val consumer = createConsumer()
val listener = new TestConsumerReassignmentListener
consumer.subscribe(List(topic).asJava, listener)

// rebalance to get the initial assignment
awaitRebalance(consumer, listener)

val callsToAssignedAfterFirstRebalance = listener.callsToAssigned

consumer.poll(Duration.ofMillis(2000))

// If the poll poll above times out, it would trigger a rebalance.
// Leave some time for the rebalance to happen and check for the rebalance event.
consumer.poll(Duration.ofMillis(500))
consumer.poll(Duration.ofMillis(500))

assertEquals(callsToAssignedAfterFirstRebalance, listener.callsToAssigned)
}


@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = {
Expand Down

0 comments on commit 5854139

Please sign in to comment.