Skip to content

Commit 32c81d1

Browse files
garyrussellartembilan
authored andcommitted
disconnectCompletionTimeout Polishing
- rename from `stopCompletionTimeout` - add to outbound adapter - use in both places we disconnect on inbound **cherry-pick forward to 5.2.x, master** # Conflicts: # spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java
1 parent 0de2b1b commit 32c81d1

File tree

2 files changed

+35
-12
lines changed

2 files changed

+35
-12
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,15 @@
5151
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter
5252
implements MqttCallback, ApplicationEventPublisherAware {
5353

54+
/**
55+
* The default completion timeout in milliseconds.
56+
*/
5457
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
5558

56-
public static final long STOP_COMPLETION_TIMEOUT = 5_000L;
59+
/**
60+
* The default disconnect completion timeout in milliseconds.
61+
*/
62+
public static final long DISCONNECT_COMPLETION_TIMEOUT = 5_000L;
5763

5864
private static final int DEFAULT_RECOVERY_INTERVAL = 10_000;
5965

@@ -63,7 +69,7 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv
6369

6470
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
6571

66-
private long stopCompletionTimeout = STOP_COMPLETION_TIMEOUT;
72+
private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
6773

6874
private volatile IMqttClient client;
6975

@@ -127,13 +133,13 @@ public void setCompletionTimeout(long completionTimeout) {
127133
}
128134

129135
/**
130-
* Set the completion timeout wnen stopping. Not settable using the namespace.
131-
* Default {@value #STOP_COMPLETION_TIMEOUT} milliseconds.
136+
* Set the completion timeout when disconnecting. Not settable using the namespace.
137+
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
132138
* @param completionTimeout The timeout.
133-
* @since 5.2.5
139+
* @since 5.1.10
134140
*/
135-
public void setStopCompletionTimeout(long completionTimeout) {
136-
this.stopCompletionTimeout = completionTimeout;
141+
public void setDisconnectCompletionTimeout(long completionTimeout) {
142+
this.disconnectCompletionTimeout = completionTimeout;
137143
}
138144

139145
/**
@@ -184,7 +190,7 @@ protected synchronized void doStop() {
184190
logger.error("Exception while unsubscribing", e);
185191
}
186192
try {
187-
this.client.disconnectForcibly(this.stopCompletionTimeout);
193+
this.client.disconnectForcibly(this.disconnectCompletionTimeout);
188194
}
189195
catch (MqttException e) {
190196
logger.error("Exception while disconnecting", e);
@@ -276,7 +282,7 @@ private synchronized void connectAndSubscribe() throws MqttException {
276282
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
277283
}
278284
logger.error("Error connecting or subscribing to " + Arrays.toString(topics), e);
279-
this.client.disconnectForcibly(this.completionTimeout);
285+
this.client.disconnectForcibly(this.disconnectCompletionTimeout);
280286
try {
281287
this.client.setCallback(null);
282288
this.client.close();

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,10 +50,17 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
5050
/**
5151
* The default completion timeout in milliseconds.
5252
*/
53-
public static final long DEFAULT_COMPLETION_TIMEOUT = 30000L;
53+
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
54+
55+
/**
56+
* The default disconnect completion timeout in milliseconds.
57+
*/
58+
public static final long DISCONNECT_COMPLETION_TIMEOUT = 5_000L;
5459

5560
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
5661

62+
private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
63+
5764
private final MqttPahoClientFactory clientFactory;
5865

5966
private boolean async;
@@ -131,6 +138,16 @@ public void setCompletionTimeout(long completionTimeout) {
131138
this.completionTimeout = completionTimeout;
132139
}
133140

141+
/**
142+
* Set the completion timeout when disconnecting. Not settable using the namespace.
143+
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
144+
* @param completionTimeout The timeout.
145+
* @since 5.1.10
146+
*/
147+
public void setDisconnectCompletionTimeout(long completionTimeout) {
148+
this.disconnectCompletionTimeout = completionTimeout;
149+
}
150+
134151
@Override
135152
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
136153
this.applicationEventPublisher = applicationEventPublisher;
@@ -152,7 +169,7 @@ protected void doStop() {
152169
try {
153170
IMqttAsyncClient theClient = this.client;
154171
if (theClient != null) {
155-
theClient.disconnect().waitForCompletion(this.completionTimeout);
172+
theClient.disconnect().waitForCompletion(this.disconnectCompletionTimeout);
156173
theClient.close();
157174
this.client = null;
158175
}

0 commit comments

Comments
 (0)