diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 5018c55571..3e765aa90f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -565,6 +566,12 @@ protected void checkTopics() { .stream() .filter(entry -> AdminClientConfig.configNames().contains(entry.getKey())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + Properties overrides = propertiesFromConsumerPropertyOverrides(); + overrides.forEach((key, value) -> { + if (key instanceof String) { + configs.put((String) key, value); + } + }); List missing = null; try (AdminClient client = AdminClient.create(configs)) { // NOSONAR - false positive null check if (client != null) { @@ -740,4 +747,23 @@ protected void publishContainerStoppedEvent() { return this; } + /** + * Make any default consumer override properties explicit properties. + * @return the properties. + * @since 2.9.11 + */ + protected Properties propertiesFromConsumerPropertyOverrides() { + Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties(); + Properties props = new Properties(); + props.putAll(propertyOverrides); + Set stringPropertyNames = propertyOverrides.stringPropertyNames(); + // User might have provided properties as defaults + stringPropertyNames.forEach((name) -> { + if (!props.contains(name)) { + props.setProperty(name, propertyOverrides.getProperty(name)); + } + }); + return props; + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 91dc791308..289a30db6c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -858,7 +858,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ObservationRegistry observationRegistry) { this.observationRegistry = observationRegistry; - Properties consumerProperties = propertiesFromProperties(); + Properties consumerProperties = propertiesFromConsumerPropertyOverrides(); checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory); this.autoCommit = determineAutoCommit(consumerProperties); this.consumer = @@ -1048,20 +1048,6 @@ private CommonErrorHandler determineCommonErrorHandler() { } } - private Properties propertiesFromProperties() { - Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties(); - Properties props = new Properties(); - props.putAll(propertyOverrides); - Set stringPropertyNames = propertyOverrides.stringPropertyNames(); - // User might have provided properties as defaults - stringPropertyNames.forEach((name) -> { - if (!props.contains(name)) { - props.setProperty(name, propertyOverrides.getProperty(name)); - } - }); - return props; - } - String getClientId() { return this.clientId; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index 54747fdd6b..1c8e32cb5b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -254,17 +254,17 @@ public static class Listener { private final String topic; - private final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch1 = new CountDownLatch(1); - private final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); private final KafkaListenerContainerFactory cf; - private List received; + volatile List received; - private List receivedTopics; + volatile List receivedTopics; - private List receivedPartitions; + volatile List receivedPartitions; public Listener(String topic, KafkaListenerContainerFactory cf) { this.topic = topic; @@ -302,9 +302,9 @@ public String getTopic() { public static class Listener3 { - private final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch1 = new CountDownLatch(1); - private List> received; + volatile List> received; @KafkaListener(topics = "blc3", groupId = "blc3") public void listen1(List> foos) { @@ -318,11 +318,11 @@ public void listen1(List> foos) { public static class Listener4 { - private final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch1 = new CountDownLatch(1); - private List received; + volatile List received; - private List replies; + volatile List replies; @KafkaListener(topics = "blc4", groupId = "blc4") @SendTo @@ -351,7 +351,7 @@ public static class Listener5 { final CountDownLatch latch2 = new CountDownLatch(1); - final List received = new ArrayList<>(); + final List received = Collections.synchronizedList(new ArrayList<>()); volatile String dlt; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicCheckOverrideAdminConfigTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicCheckOverrideAdminConfigTests.java new file mode 100644 index 0000000000..d6bfef9a25 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/MissingTopicCheckOverrideAdminConfigTests.java @@ -0,0 +1,101 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +@EmbeddedKafka(topics = "mtccac") +public class MissingTopicCheckOverrideAdminConfigTests { + + @Test + void configOverride(EmbeddedKafkaBroker broker) { + Map consumerProps = KafkaTestUtils.consumerProps("grp", "false", broker); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "junkjunk"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + ContainerProperties props = new ContainerProperties("mtccac"); + props.setMissingTopicsFatal(true); + props.getKafkaConsumerProperties().setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + broker.getBrokersAsString()); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props) { + + @Override + public void checkTopics() { + super.checkTopics(); + } + + }; + LogAccessor logger = spy(new LogAccessor(LogFactory.getLog(getClass()))); + new DirectFieldAccessor(container).setPropertyValue("logger", logger); + assertThatNoException().isThrownBy(() -> container.checkTopics()); + verify(logger, never()).error(any(), anyString()); + } + + @Test + void configOverrideDefault(EmbeddedKafkaBroker broker) { + Map consumerProps = KafkaTestUtils.consumerProps("grp", "false", broker); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "junkjunk"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + ContainerProperties props = new ContainerProperties("mtccac"); + props.setMissingTopicsFatal(true); + /* + * Ensure this works if there are property defaults. + * We have to iterate over the hash table because the user might have + * used put() instead of setProperty(). + */ + Properties defaultProperties = new Properties(); + defaultProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + Properties properties = new Properties(defaultProperties); + props.setKafkaConsumerProperties(properties); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props) { + + @Override + public void checkTopics() { + super.checkTopics(); + } + + }; + LogAccessor logger = spy(new LogAccessor(LogFactory.getLog(getClass()))); + new DirectFieldAccessor(container).setPropertyValue("logger", logger); + assertThatNoException().isThrownBy(() -> container.checkTopics()); + verify(logger, never()).error(any(), anyString()); + } + +}