Skip to content

Commit 29b4a29

Browse files
artembilangaryrussell
authored andcommitted
INT-4563: Add JMX test for MQTT
JIRA: https://jira.spring.io/browse/INT-4563 * Add a `MqttDslTests` with the JMX configured to be sure that SI managed components are registered in JMX properly. * Also this test covers a Java DSL configuration for MQTT channel adapters * Some polishing for `AbstractMqttMessageHandler` * Document Java DSL configuration for the MQTT channel adapters
1 parent 9832f61 commit 29b4a29

File tree

4 files changed

+192
-7
lines changed

4 files changed

+192
-7
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,8 @@ project('spring-integration-mqtt') {
540540
dependencies {
541541
compile project(":spring-integration-core")
542542
compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:$pahoMqttClientVersion"
543+
544+
testCompile project(":spring-integration-jmx")
543545
}
544546
}
545547

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler implements Lifecycle {
4545

4646
private static final MessageProcessor<String> DEFAULT_TOPIC_PROCESSOR =
47-
m -> (String) m.getHeaders().get(MqttHeaders.TOPIC);
47+
m -> m.getHeaders().get(MqttHeaders.TOPIC, String.class);
4848

4949
private final AtomicBoolean running = new AtomicBoolean();
5050

@@ -265,7 +265,7 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
265265
throw new MessageHandlingException(message,
266266
"No topic could be determined from the message and no default topic defined");
267267
}
268-
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
268+
publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
269269
}
270270

271271
protected abstract void publish(String topic, Object mqttMessage, Message<?> message) throws Exception;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.Set;
22+
23+
import javax.management.MBeanServer;
24+
import javax.management.MalformedObjectNameException;
25+
import javax.management.ObjectName;
26+
27+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
28+
import org.junit.ClassRule;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.beans.factory.annotation.Qualifier;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.integration.config.EnableIntegration;
37+
import org.springframework.integration.dsl.IntegrationFlow;
38+
import org.springframework.integration.dsl.IntegrationFlows;
39+
import org.springframework.integration.jmx.config.EnableIntegrationMBeanExport;
40+
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
41+
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
42+
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
43+
import org.springframework.integration.mqtt.support.MqttHeaders;
44+
import org.springframework.integration.support.MessageBuilder;
45+
import org.springframework.jmx.support.MBeanServerFactoryBean;
46+
import org.springframework.messaging.Message;
47+
import org.springframework.messaging.MessageChannel;
48+
import org.springframework.messaging.PollableChannel;
49+
import org.springframework.test.annotation.DirtiesContext;
50+
import org.springframework.test.context.junit4.SpringRunner;
51+
52+
/**
53+
* @author Artem Bilan
54+
*
55+
* @since 5.1.2
56+
*/
57+
@RunWith(SpringRunner.class)
58+
@DirtiesContext
59+
public class MqttDslTests {
60+
61+
@ClassRule
62+
public static final BrokerRunning brokerRunning = BrokerRunning.isRunning(1883);
63+
64+
@Autowired
65+
@Qualifier("mqttOutFlow.input")
66+
private MessageChannel mqttOutFlowInput;
67+
68+
@Autowired
69+
private PollableChannel fromMqttChannel;
70+
71+
@Autowired
72+
private MBeanServer server;
73+
74+
@Test
75+
public void testMqttChannelAdaptersAndJmx() throws MalformedObjectNameException {
76+
Set<ObjectName> mbeanNames = this.server.queryNames(
77+
new ObjectName("org.springframework.integration:type=ManagedEndpoint,*"), null);
78+
79+
assertThat(mbeanNames.size()).isEqualTo(1);
80+
ObjectName objectName = mbeanNames.iterator().next();
81+
assertThat(objectName.toString()).contains("name=\"mqttInFlow.mqtt:inbound-channel-adapter#0\"");
82+
83+
String testPayload = "foo";
84+
85+
this.mqttOutFlowInput.send(
86+
MessageBuilder.withPayload(testPayload)
87+
.setHeader(MqttHeaders.TOPIC, "jmxTests")
88+
.build());
89+
90+
Message<?> receive = this.fromMqttChannel.receive(10_000);
91+
92+
assertThat(receive).isNotNull();
93+
assertThat(receive.getPayload()).isEqualTo(testPayload);
94+
}
95+
96+
@Configuration
97+
@EnableIntegration
98+
@EnableIntegrationMBeanExport(server = "mbeanServer")
99+
public static class Config {
100+
101+
@Bean
102+
public static MBeanServerFactoryBean mbeanServer() {
103+
return new MBeanServerFactoryBean();
104+
}
105+
106+
@Bean
107+
public DefaultMqttPahoClientFactory pahoClientFactory() {
108+
DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
109+
MqttConnectOptions connectionOptions = new MqttConnectOptions();
110+
connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
111+
pahoClientFactory.setConnectionOptions(connectionOptions);
112+
return pahoClientFactory;
113+
}
114+
115+
@Bean
116+
public IntegrationFlow mqttOutFlow() {
117+
return f -> f.handle(new MqttPahoMessageHandler("jmxTestOut", pahoClientFactory()));
118+
}
119+
120+
@Bean
121+
public IntegrationFlow mqttInFlow() {
122+
return IntegrationFlows.from(
123+
new MqttPahoMessageDrivenChannelAdapter("jmxTestIn",
124+
pahoClientFactory(), "jmxTests"))
125+
.channel(c -> c.queue("fromMqttChannel"))
126+
.get();
127+
}
128+
129+
}
130+
131+
}

src/reference/asciidoc/mqtt.adoc

+57-5
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ The following listing shows the available attributes:
8080
<1> The client ID.
8181
<2> The broker URL.
8282
<3> A comma-separated list of topics from which this adapter receives messages.
83-
<4> A commap-separated list of QoS values.
83+
<4> A comma-separated list of QoS values.
8484
It can be a single value that is applied to all topics or a value for each topic (in which case, the lists must be the same length).
8585
<5> An `MqttMessageConverter` (optional).
8686
By default, the default `DefaultPahoMessageConverter` produces a message with a `String` payload with the following headers:
@@ -102,13 +102,11 @@ NOTE: Starting with version 4.1, you can omit the URL.
102102
Instead, you can provide the server URIs in the `serverURIs` property of the `DefaultMqttPahoClientFactory`.
103103
Doing so enables, for example, connection to a highly available (HA) cluster.
104104

105-
Starting with version 4.2.2, an `MqttSubscribedEvent` is published when the adapter successfully subscribes to the
106-
topics.
105+
Starting with version 4.2.2, an `MqttSubscribedEvent` is published when the adapter successfully subscribes to the topics.
107106
`MqttConnectionFailedEvent` events are published when the connection or subscription fails.
108107
These events can be received by a bean that implements `ApplicationListener`.
109108

110-
Also, a new property called `recoveryInterval` controls the interval at which the adapter attempts to reconnect after
111-
a failure.
109+
Also, a new property called `recoveryInterval` controls the interval at which the adapter attempts to reconnect after a failure.
112110
It defaults to `10000ms` (ten seconds).
113111

114112
[NOTE]
@@ -196,6 +194,35 @@ public class MqttJavaApplication {
196194
----
197195
====
198196

197+
==== Configuring with the Java DSL
198+
199+
The following Spring Boot application provides an example of configuring the inbound adapter with the Java DSL:
200+
201+
====
202+
[source, java]
203+
----
204+
@SpringBootApplication
205+
public class MqttJavaApplication {
206+
207+
public static void main(String[] args) {
208+
new SpringApplicationBuilder(MqttJavaApplication.class)
209+
.web(false)
210+
.run(args);
211+
}
212+
213+
@Bean
214+
public IntegrationFlow mqttInbound() {
215+
return IntegrationFlows.from(
216+
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
217+
"testClient", "topic1", "topic2");)
218+
.handle(m -> System.out.println(m.getPayload()))
219+
.get();
220+
}
221+
222+
}
223+
----
224+
====
225+
199226
[[mqtt-outbound]]
200227
=== Outbound Channel Adapter
201228

@@ -315,3 +342,28 @@ public class MqttJavaApplication {
315342
}
316343
----
317344
====
345+
346+
==== Configuring with the Java DSL
347+
348+
The following Spring Boot application provides an example of configuring the outbound adapter with the Java DSL:
349+
350+
====
351+
[source, java]
352+
----
353+
@SpringBootApplication
354+
public class MqttJavaApplication {
355+
356+
public static void main(String[] args) {
357+
new SpringApplicationBuilder(MqttJavaApplication.class)
358+
.web(false)
359+
.run(args);
360+
}
361+
362+
@Bean
363+
public IntegrationFlow mqttOutboundFlow() {
364+
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
365+
}
366+
367+
}
368+
----
369+
====

0 commit comments

Comments
 (0)