Skip to content

Commit 070667a

Browse files
garyrussellartembilan
authored andcommitted
GH-3215: MQTT Event for failed connection outbound
Resolves #3215 * Add docs; publish an event for an initial connection failure too. **Cherry-pick to 5.2.x**
1 parent 438e221 commit 070667a

File tree

3 files changed

+28
-1
lines changed

3 files changed

+28
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

+7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.context.ApplicationEventPublisherAware;
2828
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
2929
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
30+
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
3031
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
3132
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
3233
import org.springframework.integration.mqtt.support.MqttMessageConverter;
@@ -201,6 +202,9 @@ private synchronized IMqttAsyncClient checkConnection() throws MqttException {
201202
this.client.close();
202203
this.client = null;
203204
}
205+
if (this.applicationEventPublisher != null) {
206+
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
207+
}
204208
throw new MessagingException("Failed to connect", e);
205209
}
206210
}
@@ -247,6 +251,9 @@ public synchronized void connectionLost(Throwable cause) {
247251
// NOSONAR
248252
}
249253
this.client = null;
254+
if (this.applicationEventPublisher != null) {
255+
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
256+
}
250257
}
251258
}
252259

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

+9
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ public void testOutboundOptionsApplied() throws Exception {
198198

199199
verify(client, times(1)).connect(any(MqttConnectOptions.class));
200200
assertThat(connectCalled.get()).isTrue();
201+
AtomicReference<Object> failed = new AtomicReference<>();
202+
handler.setApplicationEventPublisher(event -> failed.set(event));
203+
handler.connectionLost(new IllegalStateException());
204+
assertThat(failed.get()).isInstanceOf(MqttConnectionFailedEvent.class);
205+
handler.stop();
201206
}
202207

203208
@Test
@@ -410,6 +415,10 @@ public void testReconnect() throws Exception {
410415
Thread.sleep(1000);
411416
// the following assertion should be equalTo, but leq to protect against a slow CI server
412417
assertThat(attemptingReconnectCount.get()).isLessThanOrEqualTo(2);
418+
AtomicReference<Object> failed = new AtomicReference<>();
419+
adapter.setApplicationEventPublisher(event -> failed.set(event));
420+
adapter.connectionLost(new IllegalStateException());
421+
assertThat(failed.get()).isInstanceOf(MqttConnectionFailedEvent.class);
413422
adapter.stop();
414423
taskScheduler.destroy();
415424
}

src/reference/asciidoc/mqtt.adoc

+12-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ The default is `headers['mqtt_topic']`.
278278
<11> When `true`, the caller does not block.
279279
Rather, it waits for delivery confirmation when a message is sent.
280280
The default is `false` (the send blocks until delivery is confirmed).
281-
<12> When `async` and `async-events` are both `true`, an `MqttMessageSentEvent` is emitted.
281+
<12> When `async` and `async-events` are both `true`, an `MqttMessageSentEvent` is emitted (See <<events>>).
282282
It contains the message, the topic, the `messageId` generated by the client library, the `clientId`, and the `clientInstance` (incremented each time the client is connected).
283283
When the delivery is confirmed by the client library, an `MqttMessageDeliveredEvent` is emitted.
284284
It contains the the `messageId`, the `clientId`, and the `clientInstance`, enabling delivery to be correlated with the send.
@@ -372,3 +372,14 @@ public class MqttJavaApplication {
372372
}
373373
----
374374
====
375+
376+
[[events]]
377+
=== Events
378+
379+
Certain application events are published by the adapters.
380+
381+
* `MqttConnectionFailedEvent` - published by both adapters if we fail to connect or a connection is subsequently lost.
382+
* `MqttMessageSentEvent` - published by the outbound adapter when a message has been sent, if running in asynchronous mode.
383+
* `MqttMessageDeliveredEvent` - published by the outbound adapter when the client indicates that a message has been delivered, if running in asynchronous mode.
384+
385+
These events can be received by an `ApplicationListener<MqttIntegrationEvent>` or with an `@EventListener` method.

0 commit comments

Comments
 (0)