Skip to content

Commit 09f1110

Browse files
committed
INT-4568: Add reactive MongoDbMH
JIRA: https://jira.spring.io/browse/INT-4568 * Refactor `AbstractMessageHandler` and implement `MongodbReactiveMessageHandler` * Rename `AbstractBaseMessageHandler` to `MessageHandlerSupport` * Clean up code style and resolve possible Sonar smells
1 parent 2cbc56c commit 09f1110

File tree

10 files changed

+856
-301
lines changed

10 files changed

+856
-301
lines changed

build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ ext {
8080
log4jVersion = '2.12.1'
8181
micrometerVersion = '1.3.2'
8282
mockitoVersion = '3.2.0'
83+
mongodbReactiveDriverVersion = '1.12.0'
8384
mysqlVersion = '8.0.18'
8485
pahoMqttClientVersion = '1.2.0'
8586
postgresVersion = '42.2.8'
@@ -574,6 +575,8 @@ project('spring-integration-mongodb') {
574575
compile('org.springframework.data:spring-data-mongodb') {
575576
exclude group: 'org.springframework'
576577
}
578+
compile("org.mongodb:mongodb-driver-reactivestreams:$mongodbReactiveDriverVersion", optional)
579+
testCompile 'io.projectreactor:reactor-test'
577580
}
578581
}
579582

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

+20-276
Original file line numberDiff line numberDiff line change
@@ -16,170 +16,52 @@
1616

1717
package org.springframework.integration.handler;
1818

19-
import java.util.Set;
20-
import java.util.concurrent.ConcurrentHashMap;
21-
2219
import org.reactivestreams.Subscription;
2320

24-
import org.springframework.core.Ordered;
25-
import org.springframework.integration.IntegrationPattern;
26-
import org.springframework.integration.IntegrationPatternType;
27-
import org.springframework.integration.context.IntegrationObjectSupport;
28-
import org.springframework.integration.context.Orderable;
2921
import org.springframework.integration.history.MessageHistory;
3022
import org.springframework.integration.support.management.AbstractMessageHandlerMetrics;
31-
import org.springframework.integration.support.management.ConfigurableMetricsAware;
32-
import org.springframework.integration.support.management.DefaultMessageHandlerMetrics;
33-
import org.springframework.integration.support.management.IntegrationManagedResource;
34-
import org.springframework.integration.support.management.TrackableComponent;
35-
import org.springframework.integration.support.management.metrics.MeterFacade;
3623
import org.springframework.integration.support.management.metrics.MetricsCaptor;
3724
import org.springframework.integration.support.management.metrics.SampleFacade;
38-
import org.springframework.integration.support.management.metrics.TimerFacade;
3925
import org.springframework.integration.support.utils.IntegrationUtils;
40-
import org.springframework.lang.Nullable;
4126
import org.springframework.messaging.Message;
4227
import org.springframework.messaging.MessageHandler;
4328
import org.springframework.util.Assert;
4429

4530
import reactor.core.CoreSubscriber;
4631

4732
/**
48-
* Base class for MessageHandler implementations that provides basic validation
49-
* and error handling capabilities. Asserts that the incoming Message is not
50-
* null and that it does not contain a null payload. Converts checked exceptions
51-
* into runtime {@link org.springframework.messaging.MessagingException}s.
33+
* Base class for {@link MessageHandler} implementations.
5234
*
53-
* @author Mark Fisher
54-
* @author Oleg Zhurakousky
55-
* @author Gary Russell
35+
* @author David Turanski
5636
* @author Artem Bilan
57-
* @author Amit Sadafule
5837
*/
59-
@SuppressWarnings("deprecation")
60-
@IntegrationManagedResource
61-
public abstract class AbstractMessageHandler extends IntegrationObjectSupport
62-
implements MessageHandler,
63-
org.springframework.integration.support.management.MessageHandlerMetrics,
64-
ConfigurableMetricsAware<AbstractMessageHandlerMetrics>,
65-
TrackableComponent, Orderable, CoreSubscriber<Message<?>>,
66-
IntegrationPattern {
67-
68-
private final ManagementOverrides managementOverrides = new ManagementOverrides();
69-
70-
private final Set<TimerFacade> timers = ConcurrentHashMap.newKeySet();
71-
72-
private volatile boolean shouldTrack = false;
73-
74-
private volatile int order = Ordered.LOWEST_PRECEDENCE;
75-
76-
private volatile AbstractMessageHandlerMetrics handlerMetrics = new DefaultMessageHandlerMetrics();
77-
78-
private volatile boolean statsEnabled;
79-
80-
private volatile boolean countsEnabled;
81-
82-
private volatile String managedName;
83-
84-
private volatile String managedType;
85-
86-
private volatile boolean loggingEnabled = true;
87-
88-
private MetricsCaptor metricsCaptor;
89-
90-
private TimerFacade successTimer;
91-
92-
@Override
93-
public boolean isLoggingEnabled() {
94-
return this.loggingEnabled;
95-
}
96-
97-
@Override
98-
public void setLoggingEnabled(boolean loggingEnabled) {
99-
this.loggingEnabled = loggingEnabled;
100-
this.managementOverrides.loggingConfigured = true;
101-
}
102-
103-
@Override
104-
public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
105-
this.metricsCaptor = metricsCaptorToRegister;
106-
}
107-
108-
@Nullable
109-
protected MetricsCaptor getMetricsCaptor() {
110-
return this.metricsCaptor;
111-
}
112-
113-
@Override
114-
public void setOrder(int order) {
115-
this.order = order;
116-
}
117-
118-
@Override
119-
public int getOrder() {
120-
return this.order;
121-
}
122-
123-
@Override
124-
public String getComponentType() {
125-
return "message-handler";
126-
}
127-
128-
@Override
129-
public void setShouldTrack(boolean shouldTrack) {
130-
this.shouldTrack = shouldTrack;
131-
}
132-
133-
@Override
134-
public void configureMetrics(AbstractMessageHandlerMetrics metrics) {
135-
Assert.notNull(metrics, "'metrics' must not be null");
136-
this.handlerMetrics = metrics;
137-
this.managementOverrides.metricsConfigured = true;
138-
}
139-
140-
@Override
141-
public ManagementOverrides getOverrides() {
142-
return this.managementOverrides;
143-
}
144-
145-
@Override
146-
public IntegrationPatternType getIntegrationPatternType() {
147-
return IntegrationPatternType.outbound_channel_adapter;
148-
}
149-
150-
@Override
151-
protected void onInit() {
152-
if (this.statsEnabled) {
153-
this.handlerMetrics.setFullStatsEnabled(true);
154-
}
155-
}
38+
public abstract class AbstractMessageHandler extends MessageHandlerSupport
39+
implements MessageHandler, CoreSubscriber<Message<?>> {
15640

157-
@Override
158-
public void handleMessage(Message<?> messageArg) {
159-
Message<?> message = messageArg;
41+
@SuppressWarnings("deprecation")
42+
public void handleMessage(Message<?> message) {
16043
Assert.notNull(message, "Message must not be null");
161-
Assert.notNull(message.getPayload(), "Message payload must not be null"); //NOSONAR - false positive
162-
if (this.loggingEnabled && this.logger.isDebugEnabled()) {
44+
if (isLoggingEnabled() && this.logger.isDebugEnabled()) {
16345
this.logger.debug(this + " received message: " + message);
16446
}
16547
org.springframework.integration.support.management.MetricsContext start = null;
166-
boolean countsAreEnabled = this.countsEnabled;
167-
AbstractMessageHandlerMetrics metrics = this.handlerMetrics;
16848
SampleFacade sample = null;
169-
if (countsAreEnabled && this.metricsCaptor != null) {
170-
sample = this.metricsCaptor.start();
49+
MetricsCaptor metricsCaptor = getMetricsCaptor();
50+
if (metricsCaptor != null && isCountsEnabled()) {
51+
sample = metricsCaptor.start();
17152
}
17253
try {
173-
if (this.shouldTrack) {
54+
if (shouldTrack()) {
17455
message = MessageHistory.write(message, this, getMessageBuilderFactory());
17556
}
176-
if (countsAreEnabled) {
177-
start = metrics.beforeHandle();
57+
AbstractMessageHandlerMetrics handlerMetrics = getHandlerMetrics();
58+
if (isCountsEnabled()) {
59+
start = handlerMetrics.beforeHandle();
17860
handleMessageInternal(message);
17961
if (sample != null) {
18062
sample.stop(sendTimer());
18163
}
182-
metrics.afterHandle(start, true);
64+
handlerMetrics.afterHandle(start, true);
18365
}
18466
else {
18567
handleMessageInternal(message);
@@ -189,44 +71,20 @@ public void handleMessage(Message<?> messageArg) {
18971
if (sample != null) {
19072
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
19173
}
192-
if (countsAreEnabled) {
193-
metrics.afterHandle(start, false);
74+
if (isCountsEnabled()) {
75+
getHandlerMetrics().afterHandle(start, false);
19476
}
19577
throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message,
19678
() -> "error occurred in message handler [" + this + "]", e);
19779
}
19880
}
19981

200-
private TimerFacade sendTimer() {
201-
if (this.successTimer == null) {
202-
this.successTimer = buildSendTimer(true, "none");
203-
}
204-
return this.successTimer;
205-
}
206-
207-
private TimerFacade buildSendTimer(boolean success, String exception) {
208-
TimerFacade timer = this.metricsCaptor.timerBuilder(SEND_TIMER_NAME)
209-
.tag("type", "handler")
210-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
211-
.tag("result", success ? "success" : "failure")
212-
.tag("exception", exception)
213-
.description("Send processing time")
214-
.build();
215-
this.timers.add(timer);
216-
return timer;
217-
}
218-
21982
@Override
22083
public void onSubscribe(Subscription subscription) {
22184
Assert.notNull(subscription, "'subscription' must not be null");
22285
subscription.request(Long.MAX_VALUE);
22386
}
22487

225-
@Override
226-
public void onNext(Message<?> message) {
227-
handleMessage(message);
228-
}
229-
23088
@Override
23189
public void onError(Throwable throwable) {
23290

@@ -237,125 +95,11 @@ public void onComplete() {
23795

23896
}
23997

240-
protected abstract void handleMessageInternal(Message<?> message);
241-
24298
@Override
243-
public void reset() {
244-
this.handlerMetrics.reset();
245-
}
246-
247-
@Override
248-
public long getHandleCountLong() {
249-
return this.handlerMetrics.getHandleCountLong();
250-
}
251-
252-
@Override
253-
public int getHandleCount() {
254-
return this.handlerMetrics.getHandleCount();
255-
}
256-
257-
@Override
258-
public int getErrorCount() {
259-
return this.handlerMetrics.getErrorCount();
260-
}
261-
262-
@Override
263-
public long getErrorCountLong() {
264-
return this.handlerMetrics.getErrorCountLong();
265-
}
266-
267-
@Override
268-
public double getMeanDuration() {
269-
return this.handlerMetrics.getMeanDuration();
270-
}
271-
272-
@Override
273-
public double getMinDuration() {
274-
return this.handlerMetrics.getMinDuration();
275-
}
276-
277-
@Override
278-
public double getMaxDuration() {
279-
return this.handlerMetrics.getMaxDuration();
280-
}
281-
282-
@Override
283-
public double getStandardDeviationDuration() {
284-
return this.handlerMetrics.getStandardDeviationDuration();
285-
}
286-
287-
@Override
288-
public int getActiveCount() {
289-
return this.handlerMetrics.getActiveCount();
290-
}
291-
292-
@Override
293-
public long getActiveCountLong() {
294-
return this.handlerMetrics.getActiveCountLong();
295-
}
296-
297-
@Override
298-
public org.springframework.integration.support.management.Statistics getDuration() {
299-
return this.handlerMetrics.getDuration();
300-
}
301-
302-
@Override
303-
public void setStatsEnabled(boolean statsEnabled) {
304-
if (statsEnabled) {
305-
this.countsEnabled = true;
306-
this.managementOverrides.countsConfigured = true;
307-
}
308-
this.statsEnabled = statsEnabled;
309-
if (this.handlerMetrics != null) {
310-
this.handlerMetrics.setFullStatsEnabled(statsEnabled);
311-
}
312-
this.managementOverrides.statsConfigured = true;
313-
}
314-
315-
@Override
316-
public boolean isStatsEnabled() {
317-
return this.statsEnabled;
318-
}
319-
320-
@Override
321-
public void setCountsEnabled(boolean countsEnabled) {
322-
this.countsEnabled = countsEnabled;
323-
this.managementOverrides.countsConfigured = true;
324-
if (!countsEnabled) {
325-
this.statsEnabled = false;
326-
this.managementOverrides.statsConfigured = true;
327-
}
328-
}
329-
330-
@Override
331-
public boolean isCountsEnabled() {
332-
return this.countsEnabled;
333-
}
334-
335-
@Override
336-
public void setManagedName(String managedName) {
337-
this.managedName = managedName;
338-
}
339-
340-
@Override
341-
public String getManagedName() {
342-
return this.managedName;
343-
}
344-
345-
@Override
346-
public void setManagedType(String managedType) {
347-
this.managedType = managedType;
348-
}
349-
350-
@Override
351-
public String getManagedType() {
352-
return this.managedType;
99+
public void onNext(Message<?> message) {
100+
handleMessage(message);
353101
}
354102

355-
@Override
356-
public void destroy() {
357-
this.timers.forEach(MeterFacade::remove);
358-
this.timers.clear();
359-
}
103+
protected abstract void handleMessageInternal(Message<?> message);
360104

361105
}

0 commit comments

Comments
 (0)