Skip to content

Add ability for a user to set mqtt's quiescentTimeout for forceable shutdowns #10210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T

private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;

private long quiescentTimeout = ClientManager.QUIESCENT_TIMEOUT;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -139,6 +141,20 @@ protected long getDisconnectCompletionTimeout() {
return this.disconnectCompletionTimeout;
}

/**
* Set the quiescentTimeout timeout when disconnecting.
* Default is {@link ClientManager#QUIESCENT_TIMEOUT} milliseconds.
* @param quiescentTimeout The timeout.
* @since 7.0.0
*/
public void setQuiescentTimeout(long quiescentTimeout) {
this.quiescentTimeout = quiescentTimeout;
}

protected long getQuiescentTimeout() {
return this.quiescentTimeout;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
*/
long DEFAULT_COMPLETION_TIMEOUT = 30_000L;

Long QUIESCENT_TIMEOUT = 30_000L;

/**
* The default disconnect completion timeout in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void stop() {
return;
}
try {
client.disconnectForcibly(getDisconnectCompletionTimeout());
client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;

import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.support.MqttUtils;
Expand Down Expand Up @@ -153,7 +154,8 @@ public void stop() {
}

try {
client.disconnectForcibly(getDisconnectCompletionTimeout());
client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout(),
MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* @author Mikhail Polivakha
* @author Artem Vozhdayenko
* @author Jiri Soucek
* @author Glenn Renfro
*
* @since 4.0
*
Expand All @@ -73,6 +74,8 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T, C> extends Mess

private long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;

private long quiescentTimeout = ClientManager.QUIESCENT_TIMEOUT;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;
Expand Down Expand Up @@ -199,6 +202,20 @@ protected long getDisconnectCompletionTimeout() {
return this.disconnectCompletionTimeout;
}

/**
* Set the quiescentTimeout timeout when disconnecting.
* Default is {@link ClientManager#QUIESCENT_TIMEOUT} milliseconds.
* @param quiescentTimeout The timeout.
* @since 7.0.0
*/
public void setQuiescentTimeout(long quiescentTimeout) {
this.quiescentTimeout = quiescentTimeout;
}

protected long getQuiescentTimeout() {
return this.quiescentTimeout;
}

@Override
protected void onInit() {
super.onInit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Artem Vozhdayenko
* @author Glenn Renfro
*
* @since 4.0
*
Expand Down Expand Up @@ -227,7 +228,7 @@ protected void doStop() {
}

try {
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
this.client.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(this.client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;

import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.ApplicationEventPublisher;
Expand Down Expand Up @@ -81,6 +82,7 @@
* @author Lucas Bowler
* @author Artem Vozhdayenko
* @author Matthias Thoma
* @author Glenn Renfro
*
* @since 5.5.5
*
Expand Down Expand Up @@ -296,7 +298,8 @@ protected void doStop() {

}
if (getClientManager() == null) {
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
this.mqttClient.disconnectForcibly(getQuiescentTimeout(), getDisconnectCompletionTimeout(),
MqttReturnCode.RETURN_CODE_SUCCESS, new MqttProperties());
if (getConnectionInfo().isAutomaticReconnect()) {
MqttUtils.stopClientReconnectCycle(this.mqttClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Glenn Renfro
*
* @since 4.0
*
Expand All @@ -73,6 +74,10 @@
@DirtiesContext
public class BackToBackAdapterTests implements MosquittoContainerTest {

private static final long QUIESCENT_TIMEOUT = 1;

private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L;

@TempDir
static File folder;

Expand Down Expand Up @@ -108,9 +113,7 @@ public void testSingleTopic() {
MqttPahoMessageDrivenChannelAdapter inbound =
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
QueueChannel outputChannel = new QueueChannel();
inbound.setOutputChannel(outputChannel);
inbound.setTaskScheduler(taskScheduler);
inbound.setBeanFactory(mock(BeanFactory.class));
initializeInboundAdapter(inbound, outputChannel);
inbound.afterPropertiesSet();
inbound.start();
adapter.handleMessage(new GenericMessage<>("foo"));
Expand Down Expand Up @@ -147,9 +150,7 @@ private void testJsonCommon(String... trusted) {
MqttPahoMessageDrivenChannelAdapter inbound =
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
QueueChannel outputChannel = new QueueChannel();
inbound.setOutputChannel(outputChannel);
inbound.setTaskScheduler(taskScheduler);
inbound.setBeanFactory(mock(BeanFactory.class));
initializeInboundAdapter(inbound, outputChannel);
inbound.setConverter(converter);
inbound.afterPropertiesSet();
inbound.start();
Expand Down Expand Up @@ -178,9 +179,7 @@ public void testAddRemoveTopic() {
MqttPahoMessageDrivenChannelAdapter inbound =
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in");
QueueChannel outputChannel = new QueueChannel();
inbound.setOutputChannel(outputChannel);
inbound.setTaskScheduler(taskScheduler);
inbound.setBeanFactory(mock(BeanFactory.class));
initializeInboundAdapter(inbound, outputChannel);
inbound.afterPropertiesSet();
inbound.start();
inbound.addTopic("mqtt-foo");
Expand Down Expand Up @@ -226,9 +225,7 @@ public void testTwoTopics() {
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(),
"si-test-in", "mqtt-foo", "mqtt-bar");
QueueChannel outputChannel = new QueueChannel();
inbound.setOutputChannel(outputChannel);
inbound.setTaskScheduler(taskScheduler);
inbound.setBeanFactory(mock(BeanFactory.class));
initializeInboundAdapter(inbound, outputChannel);
inbound.afterPropertiesSet();
inbound.start();
adapter.handleMessage(new GenericMessage<>("foo"));
Expand Down Expand Up @@ -261,9 +258,7 @@ public void testAsync() throws Exception {
MqttPahoMessageDrivenChannelAdapter inbound =
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
QueueChannel outputChannel = new QueueChannel();
inbound.setOutputChannel(outputChannel);
inbound.setTaskScheduler(taskScheduler);
inbound.setBeanFactory(mock(BeanFactory.class));
initializeInboundAdapter(inbound, outputChannel);
inbound.afterPropertiesSet();
inbound.start();
GenericMessage<String> message = new GenericMessage<>("foo");
Expand Down Expand Up @@ -299,9 +294,7 @@ public void testAsyncPersisted() throws Exception {
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(),
"si-test-in", "mqtt-foo", "mqtt-bar");
QueueChannel outputChannel = new QueueChannel();
inbound.setOutputChannel(outputChannel);
inbound.setTaskScheduler(taskScheduler);
inbound.setBeanFactory(mock(BeanFactory.class));
initializeInboundAdapter(inbound, outputChannel);
inbound.afterPropertiesSet();
inbound.start();
Message<String> message1 = new GenericMessage<>("foo");
Expand Down Expand Up @@ -396,6 +389,14 @@ public void onApplicationEvent(MqttSubscribedEvent event) {

}

private static void initializeInboundAdapter(MqttPahoMessageDrivenChannelAdapter inbound, QueueChannel outputChannel) {
inbound.setOutputChannel(outputChannel);
inbound.setTaskScheduler(taskScheduler);
inbound.setQuiescentTimeout(QUIESCENT_TIMEOUT);
inbound.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
inbound.setBeanFactory(mock(BeanFactory.class));
}

private class EventPublisher implements ApplicationEventPublisher {

private volatile MqttMessageDeliveredEvent delivered;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
*/
class ClientManagerBackToBackTests implements MosquittoContainerTest {

private static final long QUIESCENT_TIMEOUT = 1L;

private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L;

@Test
void testSameV3ClientIdWorksForPubAndSub() throws Exception {
testSubscribeAndPublish(Mqttv3Config.class, Mqttv3Config.TOPIC_NAME, Mqttv3Config.subscribedLatch);
Expand Down Expand Up @@ -191,7 +195,10 @@ public Mqttv3ClientManager mqttv3ClientManager() {
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
connectionOptions.setAutomaticReconnect(true);
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
result.setQuiescentTimeout(QUIESCENT_TIMEOUT);
result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
return result;
}

@Bean
Expand Down Expand Up @@ -234,7 +241,10 @@ public Mqttv3ClientManager mqttv3ClientManager() {
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
connectionOptions.setAutomaticReconnect(true);
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3-reconnect");
Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3-reconnect");
result.setQuiescentTimeout(QUIESCENT_TIMEOUT);
result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
return result;
}

@Bean
Expand Down Expand Up @@ -269,7 +279,10 @@ public Mqttv3ClientManager mqttv3ClientManager() {
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
connectionOptions.setAutomaticReconnect(true);
return new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
Mqttv3ClientManager result = new Mqttv3ClientManager(connectionOptions, "client-manager-client-id-v3");
result.setQuiescentTimeout(QUIESCENT_TIMEOUT);
result.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
return result;
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Artem Vozhdayenko
* @author Glenn Renfro
*
* @since 4.0
*
Expand Down Expand Up @@ -519,7 +520,7 @@ public void testDifferentQos() throws Exception {

new DirectFieldAccessor(adapter).setPropertyValue("running", Boolean.TRUE);
adapter.stop();
verify(client).disconnectForcibly(5_000L);
verify(client).disconnectForcibly(30_000L, 5_000L);
}

@Test
Expand Down Expand Up @@ -589,14 +590,14 @@ private void verifyUnsubscribe(IMqttAsyncClient client) throws Exception {
verify(client).connect(any(MqttConnectOptions.class));
verify(client).subscribe(any(String[].class), any(int[].class), any());
verify(client).unsubscribe(any(String[].class));
verify(client).disconnectForcibly(anyLong());
verify(client).disconnectForcibly(anyLong(), anyLong());
}

private void verifyNotUnsubscribe(IMqttAsyncClient client) throws Exception {
verify(client).connect(any(MqttConnectOptions.class));
verify(client).subscribe(any(String[].class), any(int[].class), any());
verify(client, never()).unsubscribe(any(String[].class));
verify(client).disconnectForcibly(anyLong());
verify(client).disconnectForcibly(anyLong(), anyLong());
}

@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Mikhail Polivakha
* @author Glenn Renfro
*
* @since 5.5.5
*
Expand All @@ -65,6 +66,10 @@
@DirtiesContext
public class Mqttv5BackToBackTests implements MosquittoContainerTest {

private static final long QUIESCENT_TIMEOUT = 1;

private static final long DISCONNECT_COMPLETION_TIMEOUT = 1L;

@Autowired
@Qualifier("mqttOutFlow.input")
private MessageChannel mqttOutFlowInput;
Expand Down Expand Up @@ -94,7 +99,7 @@ public void testNoNpeIsNotThrownInCaseDoInitIsNotInvokedBeforeTopicRemoval() {

@Test
public void testSimpleMqttv5Interaction() {
String testPayload = "foo";
String testPayload = "datakey";

this.mqttOutFlowInput.send(
MessageBuilder.withPayload(testPayload)
Expand Down Expand Up @@ -213,6 +218,8 @@ public IntegrationFlow mqttInFlow() {
new Mqttv5PahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "mqttv5SIin",
mqttSubscription);
messageProducer.setPayloadType(String.class);
messageProducer.setQuiescentTimeout(QUIESCENT_TIMEOUT);
messageProducer.setDisconnectCompletionTimeout(DISCONNECT_COMPLETION_TIMEOUT);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);

Expand Down
6 changes: 6 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ See xref:redis.adoc#redis-lock-registry[Redis Lock Registry] for more informatio
=== Hazelcast Changes

Previously deprecated classes in the `spring-integation-hazelcast` module, such as `LeaderInitiator`, `HazelcastMembershipListener`, `HazelcastLocalInstanceRegistrar` and `HazelcastLockRegistry`, are now removed due to not supported CP-subsystem in Hazelcast library for Open Source.

[[x7.0-mqtt-changes]]
=== MQTT Changes

The `AbstractMqttMessageDrivenChannelAdapter` and `ClientManager` implementations now expose a `quiescentTimeout` option which is propagated in their `stop()` method down to the `disconnectForcibly()` API of the MQTT Paho clients.
See xref:mqtt.adoc[] for more information.