Skip to content

Commit 2e7b48e

Browse files
committed
spring-projectsGH-2788: Add MongoDbChangeStreamMessageProducer
Fixes spring-projects#2788 * Introduce a `MessageProducerSupport.subscribeToPublisher(Publisher<Message<?>>)` for components which produces `Flux` for data from their source * Such a component is auto-stopped when subscription to that `Publisher` is canceled * Implement a `MongoDbChangeStreamMessageProducer` based on the reactive support for in Spring Data MongoDb * Implement a Java DSL for `MongoDbChangeStreamMessageProducer` * Disable a test for change stream since it requires server of version 4.x started with 'replSet' option * Add `MongoHeaders` for change stream events
1 parent 6876730 commit 2e7b48e

File tree

8 files changed

+588
-21
lines changed

8 files changed

+588
-21
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.endpoint;
1818

1919
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.Subscription;
2021

2122
import org.springframework.beans.factory.BeanFactory;
2223
import org.springframework.beans.factory.SmartInitializingSingleton;
@@ -39,8 +40,7 @@
3940
import org.springframework.util.Assert;
4041
import org.springframework.util.StringUtils;
4142

42-
import reactor.core.Disposable;
43-
import reactor.core.Disposables;
43+
import reactor.core.publisher.DirectProcessor;
4444
import reactor.core.publisher.Flux;
4545

4646
/**
@@ -54,8 +54,6 @@
5454
public abstract class MessageProducerSupport extends AbstractEndpoint implements MessageProducer, TrackableComponent,
5555
SmartInitializingSingleton, IntegrationPattern {
5656

57-
private final Disposable.Composite reactiveSubscriptions = Disposables.composite();
58-
5957
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
6058

6159
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
@@ -70,6 +68,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements
7068

7169
private boolean shouldTrack = false;
7270

71+
private volatile Subscription subscription;
72+
7373
protected MessageProducerSupport() {
7474
this.setPhase(Integer.MAX_VALUE / 2);
7575
}
@@ -193,11 +193,17 @@ protected void doStart() {
193193
}
194194

195195
/**
196-
* Takes no action by default. Subclasses may override this if they
196+
* Cancels {@link #subscription} if any.
197+
* Subclasses may override this if they
197198
* need lifecycle-managed behavior.
198199
*/
199200
@Override
200201
protected void doStop() {
202+
Subscription subs = this.subscription;
203+
if (subs != null) {
204+
subs.cancel();
205+
this.subscription = null;
206+
}
201207
}
202208

203209
protected void sendMessage(Message<?> messageArg) {
@@ -219,24 +225,28 @@ protected void sendMessage(Message<?> messageArg) {
219225

220226
protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
221227
MessageChannel outputChannel = getRequiredOutputChannel();
222-
Flux<Message<?>> messageFlux =
228+
229+
Flux<? extends Message<?>> messageFlux =
223230
Flux.from(publisher)
224-
.map(this::trackMessageIfAny);
231+
.map(this::trackMessageIfAny)
232+
.doOnSubscribe(subscription -> this.subscription = subscription)
233+
.doOnComplete(this::stop)
234+
.doOnCancel(this::stop);
235+
225236
if (outputChannel instanceof ReactiveStreamsSubscribableChannel) {
226237
((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(messageFlux);
227238
}
228239
else {
229-
this.reactiveSubscriptions.add(
230-
messageFlux
231-
.doOnNext((message) -> {
232-
try {
233-
sendMessage(message);
234-
}
235-
catch (Exception ex) {
236-
logger.error("Error sending a message: " + message, ex);
237-
}
238-
})
239-
.subscribe());
240+
messageFlux
241+
.doOnNext((message) -> {
242+
try {
243+
sendMessage(message);
244+
}
245+
catch (Exception ex) {
246+
logger.error("Error sending a message: " + message, ex);
247+
}
248+
})
249+
.subscribe();
240250
}
241251
}
242252

@@ -297,7 +307,11 @@ private Message<?> trackMessageIfAny(Message<?> message) {
297307

298308
@Override
299309
public void destroy() {
300-
this.reactiveSubscriptions.dispose();
310+
Subscription subs = this.subscription;
311+
if (subs != null) {
312+
subs.cancel();
313+
this.subscription = null;
314+
}
301315
super.destroy();
302316
}
303317

spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
1921
import org.junit.jupiter.api.Test;
2022

2123
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,15 +45,22 @@ public class ReactiveMessageProducerTests {
4345
@Autowired
4446
public FluxMessageChannel fluxMessageChannel;
4547

48+
@Autowired
49+
public MessageProducerSupport producer;
50+
4651
@Test
4752
public void test() {
53+
assertThat(this.producer.isRunning()).isTrue();
54+
4855
StepVerifier.create(
4956
Flux.from(this.fluxMessageChannel)
5057
.map(Message::getPayload)
5158
.cast(String.class))
5259
.expectNext("test1", "test2")
5360
.thenCancel()
5461
.verify();
62+
63+
assertThat(this.producer.isRunning()).isFalse();
5564
}
5665

5766
@Configuration

spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/dsl/MongoDb.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.data.mongodb.core.query.Query;
2525
import org.springframework.expression.common.LiteralExpression;
2626
import org.springframework.integration.expression.ValueExpression;
27+
import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer;
2728

2829
/**
2930
* Factory class for building MongoDb components
@@ -140,6 +141,18 @@ public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(Rea
140141
return new ReactiveMongoDbMessageSourceSpec(mongoTemplate, new ValueExpression<>(query));
141142
}
142143

144+
/**
145+
* Create a {@link MongoDbChangeStreamMessageProducerSpec} builder instance
146+
* based on the provided {@link ReactiveMongoOperations}.
147+
* @param mongoOperations the {@link ReactiveMongoOperations} to use.
148+
* @return the {@link MongoDbChangeStreamMessageProducerSpec} instance
149+
* @since 5.3
150+
*/
151+
public static MongoDbChangeStreamMessageProducerSpec changeStreamInboundChannelAdapter(
152+
ReactiveMongoOperations mongoOperations) {
153+
154+
return new MongoDbChangeStreamMessageProducerSpec(new MongoDbChangeStreamMessageProducer(mongoOperations));
155+
}
143156

144157
private MongoDb() {
145158
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mongodb.dsl;
18+
19+
import org.springframework.data.mongodb.core.ChangeStreamOptions;
20+
import org.springframework.integration.dsl.MessageProducerSpec;
21+
import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer;
22+
23+
/**
24+
* A {@link MessageProducerSpec} for tne {@link MongoDbChangeStreamMessageProducer}.
25+
*
26+
* @author Artem Bilan
27+
*
28+
* @since 5.3
29+
*/
30+
public class MongoDbChangeStreamMessageProducerSpec
31+
extends MessageProducerSpec<MongoDbChangeStreamMessageProducerSpec, MongoDbChangeStreamMessageProducer> {
32+
33+
/**
34+
* Construct a builder based on an initial {@link MongoDbChangeStreamMessageProducerSpec}.
35+
* @param producer the {@link MongoDbChangeStreamMessageProducerSpec} to use.
36+
*/
37+
public MongoDbChangeStreamMessageProducerSpec(MongoDbChangeStreamMessageProducer producer) {
38+
super(producer);
39+
}
40+
41+
/**
42+
* Configure a domain type to convert change event body into.
43+
* @param domainType the type to use.
44+
* @return the spec.
45+
*/
46+
public MongoDbChangeStreamMessageProducerSpec domainType(Class<?> domainType) {
47+
this.target.setDomainType(domainType);
48+
return this;
49+
}
50+
51+
/**
52+
* Configure a collection to subscribe for change events.
53+
* @param collection the collection to use.
54+
* @return the spec.
55+
*/
56+
public MongoDbChangeStreamMessageProducerSpec collection(String collection) {
57+
this.target.setCollection(collection);
58+
return this;
59+
}
60+
61+
/**
62+
* Configure a {@link ChangeStreamOptions}.
63+
* @param options the {@link ChangeStreamOptions} to use.
64+
* @return the spec.
65+
*/
66+
public MongoDbChangeStreamMessageProducerSpec options(ChangeStreamOptions options) {
67+
this.target.setOptions(options);
68+
return this;
69+
}
70+
71+
/**
72+
* Configure a flag to extract body from a change event or use event as a payload.
73+
* @param extractBody to extract body or not.
74+
* @return the spec.
75+
*/
76+
public MongoDbChangeStreamMessageProducerSpec extractBody(boolean extractBody) {
77+
this.target.setExtractBody(extractBody);
78+
return this;
79+
}
80+
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mongodb.inbound;
18+
19+
import org.bson.Document;
20+
import org.reactivestreams.Publisher;
21+
22+
import org.springframework.data.mongodb.core.ChangeStreamEvent;
23+
import org.springframework.data.mongodb.core.ChangeStreamOptions;
24+
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
25+
import org.springframework.integration.endpoint.MessageProducerSupport;
26+
import org.springframework.integration.mongodb.support.MongoHeaders;
27+
import org.springframework.integration.support.MessageBuilder;
28+
import org.springframework.lang.Nullable;
29+
import org.springframework.messaging.Message;
30+
import org.springframework.util.Assert;
31+
32+
import reactor.core.publisher.Flux;
33+
34+
/**
35+
* A {@link MessageProducerSupport} for MongoDB Change Stream implementation.
36+
* The functionality is based on the
37+
* {@link ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class)}
38+
* and {@link MessageProducerSupport#subscribeToPublisher(Publisher)} consumption.
39+
*
40+
* @author Artem Bilan
41+
*
42+
* @since 5.3
43+
*/
44+
public class MongoDbChangeStreamMessageProducer extends MessageProducerSupport {
45+
46+
private final ReactiveMongoOperations mongoOperations;
47+
48+
private Class<?> domainType = Document.class;
49+
50+
@Nullable
51+
private String collection;
52+
53+
private ChangeStreamOptions options = ChangeStreamOptions.empty();
54+
55+
private boolean extractBody = true;
56+
57+
/**
58+
* Create an instance based on the provided {@link ReactiveMongoOperations}.
59+
* @param mongoOperations the {@link ReactiveMongoOperations} to use.
60+
* @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class)
61+
*/
62+
public MongoDbChangeStreamMessageProducer(ReactiveMongoOperations mongoOperations) {
63+
Assert.notNull(mongoOperations, "'mongoOperations' must not be null");
64+
this.mongoOperations = mongoOperations;
65+
}
66+
67+
/**
68+
* Specify an object type to convert an event body to.
69+
* Defaults to {@link Document} class.
70+
* @param domainType the class for event body conversion.
71+
* @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class)
72+
*/
73+
public void setDomainType(Class<?> domainType) {
74+
Assert.notNull(domainType, "'domainType' must not be null");
75+
this.domainType = domainType;
76+
}
77+
78+
/**
79+
* Specify a collection name to track change events from.
80+
* By default tracks all the collection in the {@link #mongoOperations} configured database.
81+
* @param collection a collection to use.
82+
* @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class)
83+
*/
84+
public void setCollection(String collection) {
85+
this.collection = collection;
86+
}
87+
88+
/**
89+
* Specify a {@link ChangeStreamOptions}.
90+
* @param options the {@link ChangeStreamOptions} to use.
91+
* @see ReactiveMongoOperations#changeStream(String, ChangeStreamOptions, Class)
92+
*/
93+
public void setOptions(ChangeStreamOptions options) {
94+
Assert.notNull(options, "'options' must not be null");
95+
this.options = options;
96+
}
97+
98+
/**
99+
* Configure this channel adapter to build a {@link Message} to produce
100+
* with a payload based on a {@link ChangeStreamEvent#getBody()} (by default)
101+
* or use a whole {@link ChangeStreamEvent} as a payload.
102+
* @param extractBody to extract {@link ChangeStreamEvent#getBody()} or not.
103+
*/
104+
public void setExtractBody(boolean extractBody) {
105+
this.extractBody = extractBody;
106+
}
107+
108+
@Override
109+
public String getComponentType() {
110+
return "mongo:change-stream-inbound-channel-adapter";
111+
}
112+
113+
@Override
114+
protected void doStart() {
115+
Flux<Message<?>> changeStreamFlux =
116+
this.mongoOperations.changeStream(this.collection, this.options, this.domainType)
117+
.map(event ->
118+
MessageBuilder
119+
.withPayload(
120+
!this.extractBody || event.getBody() == null
121+
? event
122+
: event.getBody())
123+
.setHeader(MongoHeaders.COLLECTION_NAME, event.getCollectionName())
124+
.setHeader(MongoHeaders.CHANGE_STREAM_OPERATION_TYPE, event.getOperationType())
125+
.setHeader(MongoHeaders.CHANGE_STREAM_TIMESTAMP, event.getTimestamp())
126+
.setHeader(MongoHeaders.CHANGE_STREAM_RESUME_TOKEN, event.getResumeToken())
127+
.build());
128+
subscribeToPublisher(changeStreamFlux);
129+
}
130+
131+
}

0 commit comments

Comments
 (0)