Skip to content

Commit 0de2b1b

Browse files
garyrussellartembilan
authored andcommitted
Upgrade to MQTT Paho 1.2.2
- fix mock tests for internal client changes - reduce stop wait for completion time **cherry-pick to 5.2.x** * Remove stack trace from test and convert to assertJ # Conflicts: # build.gradle
1 parent ca4c85b commit 0de2b1b

File tree

3 files changed

+41
-18
lines changed

3 files changed

+41
-18
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ ext {
8181
micrometerVersion = '1.3.5'
8282
mockitoVersion = '3.2.4'
8383
mysqlVersion = '8.0.18'
84-
pahoMqttClientVersion = '1.2.0'
84+
pahoMqttClientVersion = '1.2.2'
8585
postgresVersion = '42.2.9'
8686
reactorVersion = 'Dysprosium-SR5'
8787
resilience4jVersion = '1.1.0'

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,15 +51,19 @@
5151
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter
5252
implements MqttCallback, ApplicationEventPublisherAware {
5353

54-
public static final long DEFAULT_COMPLETION_TIMEOUT = 30000L;
54+
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
5555

56-
private static final int DEFAULT_RECOVERY_INTERVAL = 10000;
56+
public static final long STOP_COMPLETION_TIMEOUT = 5_000L;
57+
58+
private static final int DEFAULT_RECOVERY_INTERVAL = 10_000;
5759

5860
private final MqttPahoClientFactory clientFactory;
5961

6062
private int recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
6163

62-
private volatile long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
64+
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
65+
66+
private long stopCompletionTimeout = STOP_COMPLETION_TIMEOUT;
6367

6468
private volatile IMqttClient client;
6569

@@ -122,6 +126,16 @@ public void setCompletionTimeout(long completionTimeout) {
122126
this.completionTimeout = completionTimeout;
123127
}
124128

129+
/**
130+
* Set the completion timeout wnen stopping. Not settable using the namespace.
131+
* Default {@value #STOP_COMPLETION_TIMEOUT} milliseconds.
132+
* @param completionTimeout The timeout.
133+
* @since 5.2.5
134+
*/
135+
public void setStopCompletionTimeout(long completionTimeout) {
136+
this.stopCompletionTimeout = completionTimeout;
137+
}
138+
125139
/**
126140
* The time (ms) to wait between reconnection attempts.
127141
* Default {@value #DEFAULT_RECOVERY_INTERVAL}.
@@ -170,7 +184,7 @@ protected synchronized void doStop() {
170184
logger.error("Exception while unsubscribing", e);
171185
}
172186
try {
173-
this.client.disconnectForcibly(this.completionTimeout);
187+
this.client.disconnectForcibly(this.stopCompletionTimeout);
174188
}
175189
catch (MqttException e) {
176190
logger.error("Exception while disconnecting", e);

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

+21-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,10 +17,12 @@
1717
package org.springframework.integration.mqtt;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.assertj.core.api.Assertions.fail;
2122
import static org.mockito.ArgumentMatchers.any;
2223
import static org.mockito.ArgumentMatchers.anyLong;
2324
import static org.mockito.ArgumentMatchers.anyString;
25+
import static org.mockito.ArgumentMatchers.isNull;
2426
import static org.mockito.BDDMockito.given;
2527
import static org.mockito.BDDMockito.willAnswer;
2628
import static org.mockito.BDDMockito.willReturn;
@@ -48,8 +50,10 @@
4850

4951
import org.aopalliance.intercept.MethodInterceptor;
5052
import org.apache.commons.logging.Log;
53+
import org.assertj.core.api.Condition;
5154
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
5255
import org.eclipse.paho.client.mqttv3.IMqttClient;
56+
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
5357
import org.eclipse.paho.client.mqttv3.IMqttToken;
5458
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
5559
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -435,11 +439,13 @@ public void testSubscribeFailure() throws Exception {
435439
new DirectFieldAccessor(client).setPropertyValue("aClient", aClient);
436440
willAnswer(new CallsRealMethods()).given(client).connect(any(MqttConnectOptions.class));
437441
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class));
442+
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class),
443+
(IMqttMessageListener[]) isNull());
438444
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());
439445

440446
IMqttToken token = mock(IMqttToken.class);
441447
given(token.getGrantedQos()).willReturn(new int[] { 0x80 });
442-
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), any(), any());
448+
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());
443449

444450
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
445451
"baz", "fix");
@@ -449,15 +455,12 @@ public void testSubscribeFailure() throws Exception {
449455
method.set(m);
450456
}, m -> m.getName().equals("connectAndSubscribe"));
451457
assertThat(method.get()).isNotNull();
452-
try {
453-
method.get().invoke(adapter);
454-
fail("Expected InvocationTargetException");
455-
}
456-
catch (InvocationTargetException e) {
457-
assertThat(e.getCause()).isInstanceOf(MqttException.class);
458-
assertThat(((MqttException) e.getCause()).getReasonCode())
459-
.isEqualTo((int) MqttException.REASON_CODE_SUBSCRIBE_FAILED);
460-
}
458+
Condition<InvocationTargetException> subscribeFailed = new Condition<>(ex ->
459+
((MqttException) ex.getCause()).getReasonCode() == MqttException.REASON_CODE_SUBSCRIBE_FAILED,
460+
"expected the reason code to be REASON_CODE_SUBSCRIBE_FAILED");
461+
assertThatExceptionOfType(InvocationTargetException.class).isThrownBy(() -> method.get().invoke(adapter))
462+
.withCauseInstanceOf(MqttException.class)
463+
.is(subscribeFailed);
461464
}
462465

463466
@Test
@@ -485,11 +488,13 @@ public void testDifferentQos() throws Exception {
485488
new DirectFieldAccessor(client).setPropertyValue("aClient", aClient);
486489
willAnswer(new CallsRealMethods()).given(client).connect(any(MqttConnectOptions.class));
487490
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class));
491+
willAnswer(new CallsRealMethods()).given(client).subscribe(any(String[].class), any(int[].class),
492+
(IMqttMessageListener[]) isNull());
488493
willReturn(alwaysComplete).given(aClient).connect(any(MqttConnectOptions.class), any(), any());
489494

490495
IMqttToken token = mock(IMqttToken.class);
491496
given(token.getGrantedQos()).willReturn(new int[] { 2, 0 });
492-
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), any(), any());
497+
willReturn(token).given(aClient).subscribe(any(String[].class), any(int[].class), isNull(), isNull(), any());
493498

494499
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
495500
"baz", "fix");
@@ -506,6 +511,10 @@ public void testDifferentQos() throws Exception {
506511
verify(logger, atLeastOnce())
507512
.warn("Granted QOS different to Requested QOS; topics: [baz, fix] requested: [1, 1] granted: [2, 0]");
508513
verify(client).setTimeToWait(30_000L);
514+
515+
new DirectFieldAccessor(adapter).setPropertyValue("running", Boolean.TRUE);
516+
adapter.stop();
517+
verify(client).disconnectForcibly(5_000L);
509518
}
510519

511520
private MqttPahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttClient client, Boolean cleanSession,

0 commit comments

Comments
 (0)