Skip to content

GH-2788: Add MongoDbChangeStreamMessageProducer #3240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,14 @@

package org.springframework.integration.endpoint;

import org.reactivestreams.Publisher;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationPattern;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.history.MessageHistory;
Expand All @@ -36,6 +39,8 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import reactor.core.publisher.Flux;

/**
* A support class for producer endpoints that provides a setter for the
* output channel and a convenience method for sending Messages.
Expand Down Expand Up @@ -176,15 +181,17 @@ protected void onInit() {
}

/**
* Takes no action by default. Subclasses may override this if they
* Take no action by default.
* Subclasses may override this if they
* need lifecycle-managed behavior. Protected by 'lifecycleLock'.
*/
@Override
protected void doStart() {
}

/**
* Takes no action by default. Subclasses may override this if they
* Take no action by default.
* Subclasses may override this if they
* need lifecycle-managed behavior.
*/
@Override
Expand All @@ -196,13 +203,10 @@ protected void sendMessage(Message<?> messageArg) {
if (message == null) {
throw new MessagingException("cannot send a null message");
}
if (this.shouldTrack) {
message = MessageHistory.write(message, this, getMessageBuilderFactory());
}
message = trackMessageIfAny(message);
try {
MessageChannel messageChannel = getOutputChannel();
Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
this.messagingTemplate.send(messageChannel, message);
MessageChannel outputChannel = getRequiredOutputChannel();
this.messagingTemplate.send(outputChannel, message);
}
catch (RuntimeException ex) {
if (!sendErrorMessageIfNecessary(message, ex)) {
Expand All @@ -211,14 +215,41 @@ protected void sendMessage(Message<?> messageArg) {
}
}

protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
MessageChannel outputChannel = getRequiredOutputChannel();

Flux<? extends Message<?>> messageFlux =
Flux.from(publisher)
.map(this::trackMessageIfAny)
.doOnComplete(this::stop)
.doOnCancel(this::stop)
.takeWhile((message) -> isRunning());

if (outputChannel instanceof ReactiveStreamsSubscribableChannel) {
((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(messageFlux);
}
else {
messageFlux
.doOnNext((message) -> {
try {
sendMessage(message);
}
catch (Exception ex) {
logger.error("Error sending a message: " + message, ex);
}
})
.subscribe();
}
}

/**
* Send an error message based on the exception and message.
* @param message the message.
* @param exception the exception.
* @return true if the error channel is available and message sent.
* @since 4.3.10
*/
protected final boolean sendErrorMessageIfNecessary(Message<?> message, RuntimeException exception) {
protected final boolean sendErrorMessageIfNecessary(Message<?> message, Exception exception) {
MessageChannel channel = getErrorChannel();
if (channel != null) {
this.messagingTemplate.send(channel, buildErrorMessage(message, exception));
Expand All @@ -235,9 +266,8 @@ protected final boolean sendErrorMessageIfNecessary(Message<?> message, RuntimeE
* @return the error message.
* @since 4.3.10
*/
protected final ErrorMessage buildErrorMessage(Message<?> message, RuntimeException exception) {
return this.errorMessageStrategy.buildErrorMessage(exception,
getErrorMessageAttributes(message));
protected final ErrorMessage buildErrorMessage(Message<?> message, Exception exception) {
return this.errorMessageStrategy.buildErrorMessage(exception, getErrorMessageAttributes(message));
}

/**
Expand All @@ -252,4 +282,19 @@ protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
return ErrorMessageUtils.getAttributeAccessor(message, null);
}

private MessageChannel getRequiredOutputChannel() {
MessageChannel messageChannel = getOutputChannel();
Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
return messageChannel;
}

private Message<?> trackMessageIfAny(Message<?> message) {
if (this.shouldTrack) {
return MessageHistory.write(message, this, getMessageBuilderFactory());
}
else {
return message;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.endpoint;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

/**
* @author Artem Bilan
*
* @since 5.3
*/
@SpringJUnitConfig
@DirtiesContext
public class ReactiveMessageProducerTests {

@Autowired
public FluxMessageChannel fluxMessageChannel;

@Autowired
public MessageProducerSupport producer;

@Test
public void test() {
assertThat(this.producer.isRunning()).isTrue();

StepVerifier.create(
Flux.from(this.fluxMessageChannel)
.map(Message::getPayload)
.cast(String.class))
.expectNext("test1", "test2")
.thenCancel()
.verify();

assertThat(this.producer.isRunning()).isFalse();
}

@Configuration
@EnableIntegration
public static class Config {

@Bean
public FluxMessageChannel fluxMessageChannel() {
return new FluxMessageChannel();
}

@Bean
public MessageProducerSupport producer() {
MessageProducerSupport producer =
new MessageProducerSupport() {

@Override
protected void doStart() {
subscribeToPublisher(Flux.just("test1", "test2").map(GenericMessage::new));
}

};
producer.setOutputChannel(fluxMessageChannel());
return producer;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer;

/**
* Factory class for building MongoDb components
Expand Down Expand Up @@ -140,6 +141,18 @@ public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(Rea
return new ReactiveMongoDbMessageSourceSpec(mongoTemplate, new ValueExpression<>(query));
}

/**
* Create a {@link MongoDbChangeStreamMessageProducerSpec} builder instance
* based on the provided {@link ReactiveMongoOperations}.
* @param mongoOperations the {@link ReactiveMongoOperations} to use.
* @return the {@link MongoDbChangeStreamMessageProducerSpec} instance
* @since 5.3
*/
public static MongoDbChangeStreamMessageProducerSpec changeStreamInboundChannelAdapter(
ReactiveMongoOperations mongoOperations) {

return new MongoDbChangeStreamMessageProducerSpec(new MongoDbChangeStreamMessageProducer(mongoOperations));
}

private MongoDb() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mongodb.dsl;

import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.mongodb.inbound.MongoDbChangeStreamMessageProducer;

/**
* A {@link MessageProducerSpec} for tne {@link MongoDbChangeStreamMessageProducer}.
*
* @author Artem Bilan
*
* @since 5.3
*/
public class MongoDbChangeStreamMessageProducerSpec
extends MessageProducerSpec<MongoDbChangeStreamMessageProducerSpec, MongoDbChangeStreamMessageProducer> {

/**
* Construct a builder based on an initial {@link MongoDbChangeStreamMessageProducerSpec}.
* @param producer the {@link MongoDbChangeStreamMessageProducerSpec} to use.
*/
public MongoDbChangeStreamMessageProducerSpec(MongoDbChangeStreamMessageProducer producer) {
super(producer);
}

/**
* Configure a domain type to convert change event body into.
* @param domainType the type to use.
* @return the spec.
*/
public MongoDbChangeStreamMessageProducerSpec domainType(Class<?> domainType) {
this.target.setDomainType(domainType);
return this;
}

/**
* Configure a collection to subscribe for change events.
* @param collection the collection to use.
* @return the spec.
*/
public MongoDbChangeStreamMessageProducerSpec collection(String collection) {
this.target.setCollection(collection);
return this;
}

/**
* Configure a {@link ChangeStreamOptions}.
* @param options the {@link ChangeStreamOptions} to use.
* @return the spec.
*/
public MongoDbChangeStreamMessageProducerSpec options(ChangeStreamOptions options) {
this.target.setOptions(options);
return this;
}

/**
* Configure a flag to extract body from a change event or use event as a payload.
* @param extractBody to extract body or not.
* @return the spec.
*/
public MongoDbChangeStreamMessageProducerSpec extractBody(boolean extractBody) {
this.target.setExtractBody(extractBody);
return this;
}

}
Loading