Skip to content

Commit 14ea11d

Browse files
jiafu1115stroller.fu
andauthored
KAFKA-19371: Don't create the __remote_log_metadata topic when it already exists during broker restarts (#19899)
* The CREATE_TOPIC request gets issued only when it is clear that the topic does not exist in the cluster. * When the request to describe the topic gets timed-out or any exception thrown other than UnknownTopicOrPartitionException, then the same gets re-thrown and the describe/create topic request gets retried in the next iteration until the initializationRetryMaxTimeoutMs gets breached. Fixes: https://issues.apache.org/jira/browse/KAFKA-19371 Reviewers: Luke Chen <[email protected]>, Kamal Chandraprakash <[email protected]> --------- Co-authored-by: stroller.fu <[email protected]>
1 parent eaa55c4 commit 14ea11d

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.common.config.TopicConfig;
2727
import org.apache.kafka.common.errors.RetriableException;
2828
import org.apache.kafka.common.errors.TopicExistsException;
29+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
2930
import org.apache.kafka.common.internals.FatalExitError;
3031
import org.apache.kafka.common.utils.KafkaThread;
3132
import org.apache.kafka.common.utils.Time;
@@ -468,7 +469,7 @@ private void initializeResources() {
468469
}
469470
}
470471

471-
boolean doesTopicExist(Admin adminClient, String topic) {
472+
boolean doesTopicExist(Admin adminClient, String topic) throws ExecutionException, InterruptedException {
472473
try {
473474
TopicDescription description = adminClient.describeTopics(Set.of(topic))
474475
.topicNameValues()
@@ -477,13 +478,14 @@ boolean doesTopicExist(Admin adminClient, String topic) {
477478
if (description != null) {
478479
log.info("Topic {} exists. TopicId: {}, numPartitions: {}, ", topic,
479480
description.topicId(), description.partitions().size());
480-
} else {
481-
log.info("Topic {} does not exist.", topic);
482481
}
483-
return description != null;
482+
return true;
484483
} catch (ExecutionException | InterruptedException ex) {
485-
log.info("Topic {} does not exist. Error: {}", topic, ex.getCause().getMessage());
486-
return false;
484+
if (ex.getCause() instanceof UnknownTopicOrPartitionException) {
485+
log.info("Topic {} does not exist", topic);
486+
return false;
487+
}
488+
throw ex;
487489
}
488490
}
489491

@@ -544,7 +546,7 @@ private boolean createTopic(Admin adminClient, NewTopic newTopic) {
544546
log.info("Topic [{}] already exists", topic);
545547
doesTopicExist = true;
546548
} else {
547-
log.error("Encountered error while creating {} topic.", topic, e);
549+
log.error("Encountered error while querying or creating {} topic.", topic, e);
548550
}
549551
}
550552
return doesTopicExist;

storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package org.apache.kafka.server.log.remote.metadata.storage;
1818

1919
import org.apache.kafka.clients.admin.Admin;
20+
import org.apache.kafka.clients.admin.DescribeTopicsResult;
2021
import org.apache.kafka.clients.admin.NewTopic;
22+
import org.apache.kafka.clients.admin.TopicDescription;
23+
import org.apache.kafka.common.KafkaFuture;
2124
import org.apache.kafka.common.TopicIdPartition;
2225
import org.apache.kafka.common.TopicPartition;
2326
import org.apache.kafka.common.Uuid;
@@ -46,9 +49,12 @@
4649
import static org.junit.jupiter.api.Assertions.assertThrows;
4750
import static org.junit.jupiter.api.Assertions.assertTrue;
4851
import static org.mockito.ArgumentMatchers.any;
52+
import static org.mockito.ArgumentMatchers.anySet;
4953
import static org.mockito.Mockito.doAnswer;
54+
import static org.mockito.Mockito.mock;
5055
import static org.mockito.Mockito.spy;
5156
import static org.mockito.Mockito.verify;
57+
import static org.mockito.Mockito.when;
5258

5359
@ClusterTestDefaults(brokers = 3)
5460
public class TopicBasedRemoteLogMetadataManagerTest {
@@ -89,14 +95,33 @@ public void testDoesTopicExist() throws ExecutionException, InterruptedException
8995
}
9096

9197
@ClusterTest
92-
public void testTopicDoesNotExist() {
98+
public void testTopicDoesNotExist() throws ExecutionException, InterruptedException {
9399
try (Admin admin = clusterInstance.admin()) {
94100
String topic = "dummy-test-topic";
95101
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic);
96102
assertFalse(doesTopicExist);
97103
}
98104
}
99105

106+
@ClusterTest
107+
public void testDoesTopicExistWithAdminClientExecutionError() throws ExecutionException, InterruptedException {
108+
// Create a mock Admin client that throws an ExecutionException (not UnknownTopicOrPartitionException)
109+
Admin mockAdmin = mock(Admin.class);
110+
DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class);
111+
KafkaFuture<TopicDescription> mockFuture = mock(KafkaFuture.class);
112+
113+
String topic = "test-topic";
114+
115+
// Set up the mock to throw a RuntimeException wrapped in ExecutionException
116+
when(mockAdmin.describeTopics(anySet())).thenReturn(mockDescribeTopicsResult);
117+
when(mockDescribeTopicsResult.topicNameValues()).thenReturn(Map.of(topic, mockFuture));
118+
when(mockFuture.get()).thenThrow(new ExecutionException("Admin client connection error", new RuntimeException("Connection failed")));
119+
120+
// The method should re-throw the ExecutionException since it's not an UnknownTopicOrPartitionException
121+
TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
122+
assertThrows(ExecutionException.class, () -> rlmm.doesTopicExist(mockAdmin, topic));
123+
}
124+
100125
@ClusterTest
101126
public void testWithNoAssignedPartitions() {
102127
// This test checks simple lifecycle of TopicBasedRemoteLogMetadataManager with out assigning any leader/follower partitions.

0 commit comments

Comments
 (0)