Skip to content

KAFKA-19427: Allow the coordinator to grow its buffer dynamically #20040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ddb8bf5
KAFKA-19427 Allow the coordinator to grow its buffer dynamically
mingyen066 Jun 25, 2025
7f7de43
Enhance comment
mingyen066 Jun 25, 2025
cbc6390
Update CoordinatorRuntime.java
mingyen066 Jun 25, 2025
cec3164
Adjust initial value
mingyen066 Jun 25, 2025
b707e23
adjust variable initial
mingyen066 Jun 25, 2025
2c85c7a
Remove initialization from constructor
mingyen066 Jun 25, 2025
f36a634
Get maxBatchSize
mingyen066 Jun 25, 2025
3d8a018
initialize buffer size with (min(MIN_BUFFER_SIZE, maxBatchSize)
mingyen066 Jun 25, 2025
beed828
Change MIN_BUFFER_SIZE to 512KB
mingyen066 Jun 26, 2025
c5b58b3
Adjust release buffer logic
mingyen066 Jun 26, 2025
335f0f2
Eliminate Stream usage
mingyen066 Jun 27, 2025
7fcb481
Add test
mingyen066 Jun 27, 2025
39fc849
Simplify buffer release logic
mingyen066 Jun 28, 2025
6f045ec
Add testCoordinatorRetainExpandedBuffer
mingyen066 Jun 28, 2025
24eed02
Remove useless Override
mingyen066 Jun 28, 2025
8c631e9
Enhance test name
mingyen066 Jun 28, 2025
86ea92c
Change MIN_BUFFER_SIZE to INITIAL_BUFFER_SIZE.
mingyen066 Jun 28, 2025
ed6ca44
Add testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize
mingyen066 Jun 28, 2025
89604dc
make sure the buffer size is equals to the decreased maxMessageSize t…
mingyen066 Jul 2, 2025
dff08dd
Remove useless semicolon
mingyen066 Jul 9, 2025
7406620
Merge branch 'trunk' into KAFKA-19427-Allow-the-coordinator-to-grow-i…
mingyen066 Jul 9, 2025
bef299a
Add batchSize check
mingyen066 Jul 15, 2025
3c2cc01
adjust assert
mingyen066 Jul 15, 2025
8f916ca
add comment
mingyen066 Jul 15, 2025
da2f395
address comment
mingyen066 Jul 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,16 @@ class CoordinatorContext {
*/
CoordinatorBatch currentBatch;

/**
* Used for initial buffer size for write operations.
*/
final int minBufferSize;

/**
* The max batch size
*/
final int maxBatchSize;

/**
* Constructor.
*
Expand Down Expand Up @@ -627,6 +637,8 @@ private CoordinatorContext(
defaultWriteTimeout
);
this.bufferSupplier = new BufferSupplier.GrowableBufferSupplier();
this.minBufferSize = 16384; // 16KB
this.maxBatchSize = partitionWriter.config(tp).maxMessageSize();
}

/**
Expand Down Expand Up @@ -759,7 +771,13 @@ private void freeCurrentBatch() {
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);

// Release the buffer.
bufferSupplier.release(currentBatch.buffer);
if (currentBatch.builder.buffer().capacity() > maxBatchSize) {
// If the buffer exceeds the maxBatchSize, we should revert to using the original smaller buffer to avoid retaining the larger one.
bufferSupplier.release(currentBatch.buffer);
} else {
bufferSupplier.release(currentBatch.builder.buffer());
}


currentBatch = null;
}
Expand Down Expand Up @@ -856,10 +874,8 @@ private void maybeAllocateNewBatch(
long currentTimeMs
) {
if (currentBatch == null) {
LogConfig logConfig = partitionWriter.config(tp);
int maxBatchSize = logConfig.maxMessageSize();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
ByteBuffer buffer = bufferSupplier.get(minBufferSize);

MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
Expand Down Expand Up @@ -1908,11 +1924,6 @@ public void onHighWatermarkUpdated(
}
}

/**
* 16KB. Used for initial buffer size for write operations.
*/
static final int MIN_BUFFER_SIZE = 16384;

/**
* The log prefix.
*/
Expand Down
Loading