Skip to content

KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… #19275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 34 commits into
base: trunk
Choose a base branch
from

Conversation

janchilling
Copy link
Contributor

@janchilling janchilling commented Mar 24, 2025

…Thread#runOnceWithoutProcessingThreads flow.

Removed code of the method StreamThread#initializeAndRestorePhase, since
it was only used in the negation flow. Also removed the flag entirely
from the StreamThread#runOnceWithoutProcessingThreads method.

Will remove the flag and the related code entirely from the future
commits!

…Thread#runOnceWithoutProcessingThreads flow.

Removed code of the method StreamThread#initializeAndRestorePhase, since it was only used in the negation flow. Also removed the flag entirely from the StreamThread#runOnceWithoutProcessingThreads method.
@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Mar 24, 2025
@janchilling
Copy link
Contributor Author

janchilling commented Mar 24, 2025

Hi @cadonna ,

Would you be able to verify if the changes I have done are correct, like is this what is required?

Also TaskManager#needsInitializationOrRestoration had only one usage through StreamThread#initializeAndRestorePhase method (which was removed). And TaskManager#tryToCompleteRestoration also had only one usage but lot of test cases around 53 I guess. So wanted to know if I should go levels below and remove the code or just top level code.

I will do the changes to remove the flag from the other places and commit them, in the meantime would be great if you can verify if my approach is correct. Also could you also start the workflow as well?

…Thread#create flow.

Removed the stateUpdaterEnabled dependency completely from the StreamThread#create methods and all the inner methods that use it
@github-actions github-actions bot removed the small Small PRs label Mar 24, 2025
…Thread#maybeGetClientInstanceIds, StreamThread#pollPhase and StreamThread#clientInstanceIds flow.

Removed the stateUpdaterEnabled dependency completely from the StreamThread#maybeGetClientInstanceIds,  StreamThread#pollPhase and StreamThread#clientInstanceIds flow.

Also with this commit, have removed the flag dependency completely from the StreamThread and StreamThreadTest classes
@mjsax mjsax requested a review from cadonna March 27, 2025 23:59
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @janchilling !

I started the review, but was not able to finish it. Nevertheless, I would like to share the comments up until now with you. Most comments are about additional indentation. We use 4 spaces and not 8. There are also some other comments about the code.

I will proceed with the review next week. Feel free to update the PR in the meanwhile.

@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {

final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also remove the internal config _state.updater.enabled_ and all corresponding code from StreamsConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will attend to this within this week

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a system test that uses the config. It is streams_upgrade_test.test_upgrade_downgrade_state_updater(). There is a comment that says:

Once same-thread state restoration is removed from the code, this test 
should use different versions of the code.

I guess it means to only use a version before 3.8 (e.g. LATEST_3_7) for the from_version and DEV_VERSION for the to_version. You need to choose a version before 3.8 because before 3.8 the state updater was not enabled by default.
@lucasbru did I correctly interpret your comment?

@github-actions github-actions bot removed the triage PRs from the community label Mar 29, 2025
@janchilling
Copy link
Contributor Author

janchilling commented Mar 31, 2025

Hi @cadonna ,

Thank you for the review!

Really sorry that you had to go through the indentation thing, I must have selected the whole file by accident and clicked ctrl+alt+l!

Anyway the indentation thing is now resolved in all places and some other reviews has also been resolved now. I'll attend to the rest(Removing the flag from the rest of the codebase) within this week.

…er to createAndStartStateUpdater since it now always creates and starts the state updater
…pdated a redundant part of a condition check and an indentation update
…er and StateChangeLogReaderTest classes.

Removed the boolean for useNonBlockingPoll since it will always be false(!stateUpdaterEnabled is always false). Therefore, the poll method will always use the polltime. Test cases in the StateChangelogReaderTest has also been updated to match the above scenario. In the Test class the methods shouldPollWithRightTimeoutWithoutStateUpdater and shouldPollWithRightTimeout(final boolean stateUpdaterEnabled, final Task.TaskType type) is redundant since the current default behavior of stateUpdater always true will be checked by shouldPollWithRightTimeoutWithStateUpdaterDefault method.
… the StateChangeLogReader and StateChangeLogReaderTest classes
Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janchilling Thanks for the updates!

Here my comments.

Comment on lines 619 to 603
if (!stateUpdaterEnabled && changelogReader != null) {
if (changelogReader != null) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since stateUpdaterEnabled is basically always true if we remove the flag, the if-condition should always be false and the code guarded by the if-condition should never be executed.

IMO, the changelog reader can be removed from the ProcessorStateManager since registering changelog topics is done in the state updater. Only the old code path that did not use the state updater needed the changelog reader here.

If the ProcessorStateManager does not need the changelog reader, the active task creator and the standby task creator do also not need the changelog reader.

Comment on lines 667 to 652
if (!stateUpdaterEnabled && changelogReader != null) {
if (changelogReader != null) {
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my above comment.

final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING);
final String restorationThreadId = stateUpdaterEnabled ? stateUpdaterId : threadId;
final String restorationThreadId = stateUpdaterId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could directly use stateUpdaterId instead of restorationThreadId since the distinction between state updater ID and thread ID for restoration does not hold anymore.

Comment on lines 1225 to 1148
if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) {
if (state == State.PARTITIONS_ASSIGNED) {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get rid of this branch. Polling with duration zero during PARTITIONS_ASSIGNED only applies to the old code path. With the state updater polling should use the configured poll time as stated on line 1153.

properties.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
shouldPollWithRightTimeout(properties, type);
}

@ParameterizedTest
@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
public void shouldPollWithRightTimeoutWithStateUpdaterDefault(final Task.TaskType type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please include shouldPollWithRightTimeout() into this test and rename this test to shouldPollWithRightTimeout()?

Copy link
Contributor Author

@janchilling janchilling Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna Not clear. You mean just change the name of shouldPollWithRightTimeoutWithStateUpdaterDefault() to shouldPollWithRightTimeout() ? or bring back shouldPollWithRightTimeout() as it was earlier?

The reason I removed shouldPollWithRightTimeout() method was since shouldPollWithRightTimeoutWithStateUpdaterDefault() also has the same behavior and they were checking the same thing basically. So redundant code.

Copy link
Member

@cadonna cadonna Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to copy the content of shouldPollWithRightTimeout(final Properties properties, final Task.TaskType type) (below this method) into this method since shouldPollWithRightTimeout() is only called in this method. Then rename this method from shouldPollWithRightTimeoutWithStateUpdaterDefault() to shouldPollWithRightTimeout().

Comment on lines 1703 to 1708
() -> thread.taskManager().checkStateUpdater(
mockTime.milliseconds(),
topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1))
),
10 * 1000,
"State updater never returned tasks.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the indentation here.

Comment on lines 1722 to 1766
() -> thread.taskManager().checkStateUpdater(
mockTime.milliseconds(),
topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1))
),
10 * 1000,
"State updater never returned tasks.");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above about indentation.

Comment on lines 2471 to 2472
() -> mockRestoreConsumer.assignment().size() == 0,
"Never get the assignment");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indentation and could you please also fix the typo? It should be Never got the assignment.

Changes - Indentation fixes in the StoreChangelogReaderTest class
         - Removed restorationThreadId from the StreamThread, instead of it stateUpdaterId used since distinction between state updater ID and thread ID for restoration does not hold anymore.
         - Removed the `if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled)` from the pollPhase() method in the StreamThread since with state updater polling should use the configured poll time.
         - Merged the shouldPollWithRightTimeout test case with shouldPollWithRightTimeoutWithStateUpdaterDefault to shouldPollWithRightTimeout.
… the TaskManager since the method is not used anymore as the initializeAndRestorePhase() method was removed from the StreamThread.
@janchilling
Copy link
Contributor Author

Hi @cadonna ,

I have removed the stateUpdaterEnabled flag and the related code completely from the codebase, and only remaining part is to remove it from a readme file and a python test case. And sorry for the force pushes and the PR being a bit messy!

@@ -99,8 +99,8 @@ SmokeTestDriver.VerificationResult result() {
// We set 2 timeout condition to fail the test before passing the verification:
// (1) 10 min timeout, (2) 30 tries of polling without getting any data
@ParameterizedTest
@CsvSource({"false, false", "true, false"})
public void shouldWorkWithRebalance(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException {
@CsvSource({"true, false"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look right!
It should be

Suggested change
@CsvSource({"true, false"})
@CsvSource({"true", "false"})

You can see it when you run the test. It should have two runs -- one for processing threads enabled and one for processing threads disabled, but it only has one with processing threads enabled.
With my suggestion, you get the two runs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you might be even able to use @ValueSource(booleans = {true, false}) instead of @CsvSource.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @janchilling !

I had some comments. I also noticed that there are some if-conditions still in the code base like:

if (stateUpdater != null) {
...

Could you please also get rid of those?

There are also conflicts that needs to be resolved. Some (if not all) are caused by one of my commits. I am sorry!

private Properties props(final boolean stateUpdaterEnabled) {
return props(mkObjectProperties(mkMap(mkEntry(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled))));
return Stream.of(Arguments.of("simple"),
Arguments.of("complex"));
}

private Properties props(final Properties extraProperties) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method is exclusively called with null now, you can remove the parameter.

@@ -264,9 +261,9 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except

@ParameterizedTest
@MethodSource("singleAndMultiTaskParameters")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be transformed to @ValueSource(strings = {"simple", "complex"})

Comment on lines 158 to 160
if (extraProperties != null) {
streamsConfiguration.putAll(extraProperties);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing null could you please do the following:

    private Properties props(final Properties extraProperties) {
        Properties streamsConfiguration = props();
        streamsConfiguration.putAll(extraProperties);
        return streamsConfiguration;
    }

    private Properties props() {
        final Properties streamsConfiguration = new Properties();

        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        streamsConfigurations.add(streamsConfiguration);

        return streamsConfiguration;
    }

@@ -899,6 +840,7 @@ private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final S
return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer);
}

// This can also be removed, but what about the test cases?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the production code is removed also the test cases are not needed anymore.
If you have comments or questions please add them to the PR instead of inline comments in code. Inline comments are easy to overlook and then we have comments that nobody understands in the codebase.

@@ -364,8 +361,8 @@ public void onChange(final Thread thread,

@ParameterizedTest
@MethodSource("data")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could change this to @ValueSource(booleans = {true, false}).

@@ -981,7 +933,7 @@ public void shouldCreateStandbyTaskDuringAssignment() {
}

@Test
public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled() {
public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdater() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdater() {
public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInit() {

Here and elsewhere.

@cadonna
Copy link
Member

cadonna commented Apr 11, 2025

FYI: I will not be online for the next 1.5 weeks.

…KA-18913/Old-code-removal-gaurded-by-stateUpdaterEnabled-flag

# Conflicts:
#	streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
#	streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
#	streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
…KA-18913/Old-code-removal-gaurded-by-stateUpdaterEnabled-flag

# Conflicts:
#	streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@cadonna
Copy link
Member

cadonna commented May 6, 2025

@janchilling Is the PR ready for re-review?

@janchilling
Copy link
Contributor Author

janchilling commented May 6, 2025

@janchilling Is the PR ready for re-review?

Hi @cadonna Not yet, I am having some university exams and assignments to complete, so could not focus on this for the past few weeks. But I'll make sure this is ready by the end of this week.

Have a few questions also, some test cases in the StreamThreadTest is failing due to this line, https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java/#L252 .
Basically since in some test cases we don't directly create a thread
(https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java/#L3647 ). And I am a bit unsure about what can be done here, so maybe can you give me a pointer?

@cadonna
Copy link
Member

cadonna commented May 9, 2025

Hi @cadonna Not yet, I am having some university exams and assignments to complete, so could not focus on this for the past few weeks. But I'll make sure this is ready by the end of this week.

No worries!

…t, StreamThreadTest, TaskManagerTest classes and TaskManager class.

ProcessorStateManagerTest class changes :
 - Removed the use of ChangelogReader from the class, since it is not used in the StateUpdater enabled default flow
 - Removed ProcessorStateManagerTest#shouldNotRegisterNonLoggedStore and ProcessorStateManagerTest#shouldUnregisterChangelogsDuringClose, since ChangelogReader was removed.
 - Changed the ProcessorStateManagerTest#shouldRegisterNonPersistentStore and ProcessorStateManagerTest#shouldRegisterPersistentStores test cases to verify the registerStore method.

TaskManagerTest class changes :
 - Removed test cases that verified the old code flow which used the TaskManager#handleAssignment with TaskManager#tryToCompleteRestoration since it follows the old code flow.

TaskManager class changes :
 - Removed TaskManager#tryToCompleteRestoration method since it is a part of the old code flow without the StateUpdater.

StreamThreadTest class changes :
 - Changes @MethodSource("data") to @valuesource(booleans = {true, false}) since only now true and false only needs to be passed to the test methods since state updater is default true.
 - Removed tests shouldOnlyCompleteShutdownAfterRebalanceNotInProgress, shouldUpdateStandbyTask, shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater and shouldNotUpdateStandbyTaskWhenPaused test cases since they needed the StateUpdater to be false.

 Note - testNamedTopologyWithStreamsProtocol and testStreamsRebalanceDataWithExtraCopartition test will fail since those verify an IllegalStateException when creating a Stream Thread. But during the StreamThread.create() method we create a StateUpdater thread regardless of whether a StreamThread is created or not. Therefore, during the teardown process, a StateUpdater thread is left hanging(This is a bug). Will correct this using a separate PR.
…KA-18913/Old-code-removal-gaurded-by-stateUpdaterEnabled-flag

# Conflicts:
#	streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
#	streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@janchilling
Copy link
Contributor Author

janchilling commented May 16, 2025

Hi @cadonna ,

I think I have completed all, sorry for the delay though.

Only one problem remains. Which is the StreamThreadTest#testNamedTopologyWithStreamsProtocol and StreamThreadTest#testStreamsRebalanceDataWithExtraCopartition tests will fail since there will be a StateUpdater thread running, which is created through the StreamThread#createAndStartStateUpdater in the StreamThread#create. These tests fails during the teardown process, due to the StateUpdater thread being left to run even though an IllegalStateException thrown before the StreamThread is created from the above 2 test cases. I guess this is a bug since there cannot be a StateUpdater Thread running when a StreamThread is not created.

I will create a separate PR to the above issue with the solution, I guess then we'll have to merge that PR before merging this.

@janchilling janchilling requested a review from cadonna May 16, 2025 15:49
@cadonna
Copy link
Member

cadonna commented May 26, 2025

Hi @janchilling ,

Sorry for the long silence but I was quite busy recently.

Yeah, I agree with you about the bug regarding the state updater thread that is not torn down after the IllegalStateException. However, rather than surrounding the code in the StreamThread#create() with a try-catch-clause, I propose to create an init() method in the TaskManager that starts the state updater. The init() method is then called in the beginning of the run() method of the stream thread. In such a way we have starting and shutting down the state updater thread encapsulated in the TaskManager which ensures that the state updater thread it not started before the stream thread is started.

Could you open a separate PR just for that?
If that solves the issue, we would first merge that PR and then merge this PR on top of the that PR.

Copy link
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @janchilling!
I really appreciate your patience and endurance!

Here my feedback!

Most of the tests in task manager test that you deleted need a rewrite instead of just deleting them.

@@ -446,29 +446,15 @@ private List<String> getTaskIdsAsStrings(final KafkaStreams streams) {

private static Stream<Arguments> singleAndMultiTaskParameters() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name does not really fit anymore. I propose to rename this method to topologyComplexityAndRebalanceProtocol.

@@ -484,7 +470,7 @@ private Properties props(final Properties extraProperties) {
streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, TestClientSupplier.class);
streamsConfiguration.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, TestConsumerWrapper.class);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.putAll(extraProperties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem right. On line 472 the group protocol config is passed to props(), but here it is ignored.

@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) {

final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a system test that uses the config. It is streams_upgrade_test.test_upgrade_downgrade_state_updater(). There is a comment that says:

Once same-thread state restoration is removed from the code, this test 
should use different versions of the code.

I guess it means to only use a version before 3.8 (e.g. LATEST_3_7) for the from_version and DEV_VERSION for the to_version. You need to choose a version before 3.8 because before 3.8 the state updater was not enabled by default.
@lucasbru did I correctly interpret your comment?

failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
}

handleTasksWithStateUpdater(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rename this method to handleTasks(). We do not need to distinguish the cases with and without state updater.

tasks.removeTask(oldTask);
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
} else {
tasks.replaceActiveWithStandby(standbyTask);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method from the TaskRegistry interface is not used anymore. Could you please remove it and its implementations?

Comment on lines -1324 to -1332
public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException {
// The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume()
// is not called. This is not true when the state updater is enabled which leads to
// java.lang.IllegalStateException: No current assignment for partition topic1-2.
// Since this tests verifies an aspect that is independent from the state updater, it is OK to disable
// the state updater and leave the rewriting of the test to later, when the code path for disabled state updater
// is removed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe, you need to rewrite this test as the comment says. Let me know, if you need some help with that.

@@ -1851,29 +1736,7 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater()
}

@Test
public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add the following test as a replacement for the this test:

    @Test
    public void shouldComputeOffsetSumForRunningStatefulTask() {
        final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
            .inState(State.RUNNING).build();
        final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
        when(runningStatefulTask.changelogOffsets())
            .thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask)));
        final TasksRegistry tasks = mock(TasksRegistry.class);
        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));

        assertThat(
            taskManager.taskOffsetSums(),
            is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask)))
        );
    }

@@ -1909,7 +1772,7 @@ public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro
}

@Test
public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please change

final long changelogOffsetOfRunningTask = 42L;

to

final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;

to make the case more real?

@@ -1940,57 +1803,6 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat
);
}

@Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please replace this test with the following:

    @Test
    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
        final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
            .inState(State.RESTORING).build();
        final long changelogOffsetOfRestoringStandbyTask = 84L;
        when(restoringStatefulTask.changelogOffsets())
            .thenReturn(mkMap(
                mkEntry(t1p1changelog, changelogOffsetOfRestoringStandbyTask),
                mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
            ));
        final TasksRegistry tasks = mock(TasksRegistry.class);
        final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
        when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));

        assertThat(
            taskManager.taskOffsetSums(),
            is(mkMap(
                mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
            ))
        );
    }

where

    private final TopicPartition t1p1changelog2 = new TopicPartition("changelog2", 1);

@@ -2097,105 +1909,9 @@ public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
}

@Test
public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you cannot just delete all the tests that contain tryToCompleteRetoration(). You need to rewrite them. Let me know if you need help.

@lucasbru
Copy link
Member

Hey @janchilling are you still working on this? Just checking.

@cadonna
Copy link
Member

cadonna commented Jun 10, 2025

@lucasbru Yes, they are. They were sidetracked by the following PR: #19889 (comment)
See also the comment about availability in the next two weeks.

…-removal-gaurded-by-stateUpdaterEnabled-flag

# Conflicts:
#	streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
#	streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
…n StreamThreadTest fix

Updated the StreamThread#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress test case and some other changes that were requested.
@janchilling
Copy link
Contributor Author

Hi @cadonna ,

I rewrote the StreamThreadTest#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress . Can you check if what I have written is correct? I specifically had to check if the StateUpdater thread is running, because otherwise the test fail. Therefore I checked if a StateUpdater thread is present from the Thread Stacktraces.

Also, can you help me identify the test cases that needs a rewrite from the TaskManagerTest class (This would be of great help). I will be available from today onwards and will give full focus to this and try to finish this by next week.

… and updated to test cases in the ProcessorStateManagerTest class

StreamThreadTest changes : Removed two unused methods addStandbyRecordsToRestoreConsumer() and addActiveRecordsToRestoreConsumer()

TaskManagerTest changes : Added methods shouldComputeOffsetSumForRunningStatefulTask() as a replacement for shouldReportLatestOffsetAsOffsetSumForRunningTask() and shouldSkipUnknownOffsetsWhenComputingOffsetSum() as a replacement for shouldSkipUnknownOffsetsWhenComputingOffsetSum().

TasksRegistry and Tasks changes : Removed unused method replaceActiveWithStandby()

ProcessorStateManagerTest changes : Updated the method shouldUnregisterChangelogsDuringClose(), shouldRecycleStoreAndReregisterChangelog() to shouldCloseStateStoresOnStateManagerClose(), shouldRecycleAndReinitializeStore() respectively since the changelogReader was removed by the ProcessorStateManager.
@janchilling
Copy link
Contributor Author

Hi @cadonna ,

I kinda need help with updating the Test cases in the TaskManagerTest class.

@cadonna
Copy link
Member

cadonna commented Jul 15, 2025

@janchilling Thank you for your patience!
I am sorry, I am quite busy at the moment and I am not able to carve out some time to review this PR. \cc @lucasbru @mjsax @bbejeck

@mjsax
Copy link
Member

mjsax commented Jul 30, 2025

@janchilling -- Had a very brief look into this PR, and it is very large. I would propose to split it up, into multiple smaller PRs to allow us to make incremental progress. For example, a first PR could just update test code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants