-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19272: Add initPid Response handling when keepPrepared is set to true (KIP-939) #20039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19272: Add initPid Response handling when keepPrepared is set to true (KIP-939) #20039
Conversation
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
@@ -29,6 +29,7 @@ | |||
import org.apache.kafka.clients.producer.internals.BufferPool; | |||
import org.apache.kafka.clients.producer.internals.BuiltInPartitioner; | |||
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics; | |||
import org.apache.kafka.clients.producer.internals.PreparedTxnState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is public class, it shouldn't be in internals.
@@ -349,7 +348,7 @@ public synchronized void prepareTransaction() { | |||
maybeFailWithError(); | |||
transitionTo(State.PREPARED_TRANSACTION); | |||
this.preparedTxnState = new PreparedTxnState( | |||
this.producerIdAndEpoch.producerId + ":" + | |||
this.producerIdAndEpoch.producerId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just change the type of preparedTxnState
member to be private volatile ProducerIdAndEpoch preparedTxnState
then we can just assign numbers to it (it even has a convenient NONE
constant). Then the KafkaProducer
would create the corresponding PreparedTxnState
object.
When initPid(keepPrepared = true) is called after a client crashes, several situations should be considered.
When there's an ongoing transaction, we can transition it to the newly added PREPARED_TRANSACTION state. However, what if there's no ongoing transaction?
Another scenario could be:
Solution:
This is a perfectly valid scenario as the external transaction coordinator for the 2PC transaction will keep committing participants, and the participants need to eventually return success (that's a guarantee for a prepared transaction).
Rejected Alt 1 -> Return an InvalidTxnStateException : Returning an error would break the above scenario.
Rejected Alt 2 -> Then the next thought is that we should somehow validate if the state is expected, but we don't have data to validate the result against.
Final Solution: Just returning the success and transitioning to READY is the proper handling of this condition.