Skip to content

KAFKA-19427: Allow the coordinator to grow its buffer dynamically #20040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: trunk
Choose a base branch
from

Conversation

mingyen066
Copy link
Collaborator

@mingyen066 mingyen066 commented Jun 25, 2025

  • Coordinator starts with a smaller buffer, which can grow as needed.

  • In freeCurrentBatch, release the appropriate buffer:

    • The Coordinator recycles the expanded buffer
      (currentBatch.builder.buffer()), not currentBatch.buffer, because
      MemoryBuilder may allocate a new ByteBuffer if the existing one
      isn't large enough.

    • There are two cases that buffer may exceeds maxMessageSize 1.
      If there's a single record whose size exceeds maxMessageSize (which,
      so far, is derived from max.message.bytes) and the write is in
      non-atomic mode, it's still possible for the buffer to grow beyond
      maxMessageSize. In this case, the Coordinator should revert to using a
      smaller buffer afterward. 2. Coordinator do not recycles the buffer
      that larger than maxMessageSize. If the user dynamically reduces
      maxMessageSize to a value even smaller than INITIAL_BUFFER_SIZE, the
      Coordinator should avoid recycling any buffer larger than
      maxMessageSize so that Coordinator can allocate the smaller buffer in
      the next round.

  • Add tests to verify the above scenarios.

@github-actions github-actions bot added triage PRs from the community small Small PRs labels Jun 25, 2025
@github-actions github-actions bot removed the triage PRs from the community label Jun 26, 2025
Comment on lines 763 to 764
int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
if (currentBatch.builder.buffer().capacity() > maxBatchSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we create the batch, we set maxMessageSize as the limit so it should not create a buffer larger than it as we don't append to the batch if it does not have room for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, or are you doing this in case maxMessageSize is reduced?

Copy link
Collaborator Author

@mingyen066 mingyen066 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I haven’t written the PR description yet.
If there's a single record whose size exceeds maxMessageSize , it's still possible for the buffer to grow larger than maxMessageSize. So in this case, I think we should revert to using a smaller buffer afterward.

Copy link
Collaborator Author

@mingyen066 mingyen066 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my fault. I found there is check to prevent append large record

if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
throw new RecordTooLargeException("Message batch size is " + estimatedSize +
" bytes in append to partition " + tp + " which exceeds the maximum " +
"configured size of " + currentBatch.maxBatchSize + ".");
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mingyen066 the code you attach is an example of atomic write. However, it is possible to write a single large record for the non-atomic writes. Additionally, as @dajac's commented, the maxMessageSize could be configured to a smaller value dynamically. Hence, I think the check is necessary

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For another, the origin buffer currentBatch.buffer could be larger than new maxMessageSize too. Perhaps, we should keep only the buffer which has valid capacity. For example:

            Stream.of(currentBatch.builder.buffer(), currentBatch.buffer)
                    .filter(buf -> buf.capacity() <= maxBatchSize)
                    .max(Comparator.comparing(Buffer::capacity))
                    .ifPresent(bufferSupplier::release);

Copy link
Collaborator Author

@mingyen066 mingyen066 Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dajac and @chia7712. I’ve updated the code to check both currentBatch.buffer and currentBatch.builder.buffer().
A small finding while writing the test: I found MockPartitionWriter#append includes another record size check, but CoordinatorPartitionWriter#append does not. So I'm not sure if it's possible to write a single large record with non-atomic writes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mingyen066 you could simplify override the MockPartitionWriter#append to skip the check in the test case. In production code, the size check happens in UnifiedLog

@mingyen066 mingyen066 marked this pull request as ready for review June 27, 2025 16:58
@github-actions github-actions bot removed the small Small PRs label Jun 28, 2025
@chia7712
Copy link
Member

@mingyen066 this PR also fix the issue that decreasing the message size dynamically does not re-create the buffer.

Hence, could you please add test for the scenario?

Copy link
Member

@FrankYang0529 FrankYang0529 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the nice patch! Overall LGTM. Leave one minor comment. Could we change test case name like following:

  • testCoordinatorDoNotRetainLargeBuffer -> testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize
  • testCoordinatorRetainExpandedBuffer -> testCoordinatorRetainBufferLessOrEqualToMaxMessageSize

Copy link
Collaborator

@Yunyung Yunyung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I left one comment. Also, it would be good to add a test case where maxBatchSize is set smaller than MIN_BUFFER_SIZE.

@mingyen066
Copy link
Collaborator Author

@mingyen066 this PR also fix the issue that decreasing the message size dynamically does not re-create the buffer.
Hence, could you please add test for the scenario?

Thanks @chia7712 , I've added a test to cover the scenario where max.message.bytes is dynamically reduced.

Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

// Verify that there is no cached buffer.
assertEquals(1, ctx.bufferSupplier.get(1).capacity());

// Write #3.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add unit test to ensure the maximum value of capacity of buffer is equal to maxMessageSize?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the previous assertion with assertEquals(mockWriter.config(TP).maxMessageSize(), ctx.bufferSupplier.get(1).capacity()) to ensure that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants