Skip to content

KAFKA-19463: nextFetchOffset does not take ongoing state transition into account #20080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,14 @@ public long nextFetchOffset() {
// Check if the state is maintained per offset or batch. If the offsetState
// is not maintained then the batch state is used to determine the offsets state.
if (entry.getValue().offsetState() == null) {
if (entry.getValue().batchState() == RecordState.AVAILABLE) {
if (entry.getValue().batchState() == RecordState.AVAILABLE && !entry.getValue().batchHasOngoingStateTransition()) {
nextFetchOffset = entry.getValue().firstOffset();
break;
}
} else {
// The offset state is maintained hence find the next available offset.
for (Map.Entry<Long, InFlightState> offsetState : entry.getValue().offsetState().entrySet()) {
if (offsetState.getValue().state == RecordState.AVAILABLE) {
if (offsetState.getValue().state == RecordState.AVAILABLE && !offsetState.getValue().hasOngoingStateTransition()) {
nextFetchOffset = offsetState.getKey();
break;
}
Expand Down Expand Up @@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates(
state.completeStateTransition(true);
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully.
state.cancelAndClearAcquisitionLockTimeoutTask();
if (state.state == RecordState.AVAILABLE) {
findNextFetchOffset.set(true);
}
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
Expand Down
171 changes: 171 additions & 0 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7296,6 +7296,177 @@ public void testFetchLockReleasedByDifferentId() {
assertNull(sharePartition.fetchLock()); // Fetch lock has been released.
}

@Test
public void testAcquireWhenBatchHasOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);

SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();
// Acquire a single batch with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 10
);

// Validate that there is no ongoing transition.
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
// Return a future which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id))));

// Assert the start offset has not moved and batch has ongoing transition.
assertEquals(21L, sharePartition.startOffset());
assertEquals(1, sharePartition.cachedState().size());
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId());

// Acquire the same batch with member-2. This function call will return with 0 records since there is an ongoing
// transition for this batch.
fetchAcquiredRecords(
sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 0
);

assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState());
assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId());

// Complete the future so acknowledge API can be completed, which updates the cache. Now the records can be acquired.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future.complete(writeShareGroupStateResult);

// Acquire the same batch with member-2. 10 records will be acquired.
fetchAcquiredRecords(
sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21,
fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
), 10
);
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState());
assertEquals("member-2", sharePartition.cachedState().get(21L).batchMemberId());
}

@Test
public void testNextFetchOffsetWhenBatchHasOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);

SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();

// Acquire a single batch 0-9 with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM
), 10
);

// Acquire a single batch 10-19 with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10,
fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM
), 10
);

// Validate that there is no ongoing transition.
assertEquals(2, sharePartition.cachedState().size());
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());

// Return futures which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>();
CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>();

// Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for
// offsets 0-9 and 10-19 respectively.
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);

// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RELEASE.id))));
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(10, 19, List.of(AcknowledgeType.RELEASE.id))));

// Complete future2 so second acknowledge API can be completed, which updates the cache.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future2.complete(writeShareGroupStateResult);

// Offsets 0-9 will have ongoing state transition since future1 is not complete yet.
// Offsets 10-19 won't have ongoing state transition since future2 has been completed.
assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());

// nextFetchOffset should return 10 and not 0 because batch 0-9 is undergoing state transition.
assertEquals(10, sharePartition.nextFetchOffset());
}

@Test
public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
Persister persister = Mockito.mock(Persister.class);

SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withPersister(persister)
.build();

// Acquire a single batch 0-50 with member-1.
fetchAcquiredRecords(
sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
fetchPartitionData(memoryRecords(50, 0)), FETCH_ISOLATION_HWM
), 50
);

// Validate that there is no ongoing transition.
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());

// Return futures which will be completed later, so the batch state has ongoing transition.
CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>();
CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>();

// Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for
// offsets 5-9 and 20-24 respectively.
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);

// Acknowledge batch to create ongoing transition.
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 9, List.of(AcknowledgeType.RELEASE.id))));
sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(20, 24, List.of(AcknowledgeType.RELEASE.id))));

// Complete future2 so second acknowledge API can be completed, which updates the cache.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));
future2.complete(writeShareGroupStateResult);

// Offsets 5-9 will have ongoing state transition since future1 is not complete yet.
// Offsets 20-24 won't have ongoing state transition since future2 has been completed.
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(5L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(6L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(7L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(8L).hasOngoingStateTransition());
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(9L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(20L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(21L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(22L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(23L).hasOngoingStateTransition());
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(24L).hasOngoingStateTransition());

// nextFetchOffset should return 20 and not 5 because offsets 5-9 is undergoing state transition.
assertEquals(20, sharePartition.nextFetchOffset());
}

/**
* This function produces transactional data of a given no. of records followed by a transactional marker (COMMIT/ABORT).
*/
Expand Down