-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19463: nextFetchOffset does not take ongoing state transition into account #20080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minor comments.
if (state.state != RecordState.ARCHIVED) { | ||
findNextFetchOffset.set(true); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change here makes sense, but do we need to remove findNextFetchOffset.set(true);
at other places where we just started the transaction i.e. in acknowledgement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that makes sense. I have created https://issues.apache.org/jira/browse/KAFKA-19464 to handle it for the future.
@@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates( | |||
state.completeStateTransition(true); | |||
// Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. | |||
state.cancelAndClearAcquisitionLockTimeoutTask(); | |||
if (state.state != RecordState.ARCHIVED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be:
if (state.state != RecordState.ARCHIVED) { | |
if (state.state == AVAILABLE) { |
// Mocking the persister write state RPC to return future 1 when acknowledgement occurs for offsets 0-9. | ||
// Mocking the persister write state RPC to return future 2 when acknowledgement occurs for offsets 10-19. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Mocking the persister write state RPC to return future 1 when acknowledgement occurs for offsets 0-9. | |
// Mocking the persister write state RPC to return future 2 when acknowledgement occurs for offsets 10-19. | |
// Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for offsets 0-9 and 10-19 respectively. |
// Mocking the persister write state RPC to return future 1 when acknowledgement occurs for offsets 5-9. | ||
// Mocking the persister write state RPC to return future 2 when acknowledgement occurs for offsets 20-24. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, LGTM!
…nto account (apache#20080) ### About `nextFetchOffset` function in `SharePartition` updates the fetch offsets without considering batches/offsets which might be undergoing state transition. This can cause problems in updating to the right fetch offset. ### Testing The new code added has been tested with the help of unit tests. Reviewers: Apoorv Mittal <[email protected]>
About
nextFetchOffset
function inSharePartition
updates the fetch offsets without considering batches/offsets which might be undergoing state transition. This can cause problems in updating to the right fetch offset.Testing
The new code added has been tested with the help of unit tests.
Reviewers: Apoorv Mittal [email protected]