Skip to content

Add RSocketInboundGateway; refactoring #2923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.rsocket;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

/**
* A base connector container for common RSocket client and server functionality.
* <p>
* It accepts {@link IntegrationRSocketEndpoint} instances for mapping registration via an internal
* {@link IntegrationRSocketAcceptor} or performs an auto-detection otherwise, when all bean are ready
* in the application context.
*
* @author Artem Bilan
*
* @since 5.2
*
* @see IntegrationRSocketAcceptor
*/
public abstract class AbstractRSocketConnector
implements ApplicationContextAware, InitializingBean, DisposableBean, SmartInitializingSingleton,
SmartLifecycle {

protected final IntegrationRSocketAcceptor rsocketAcceptor; // NOSONAR - final

private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;

private RSocketStrategies rsocketStrategies =
RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory())
.build();

private volatile boolean running;

private ApplicationContext applicationContext;

protected AbstractRSocketConnector(IntegrationRSocketAcceptor rsocketAcceptor) {
this.rsocketAcceptor = rsocketAcceptor;
}

public void setDataMimeType(MimeType dataMimeType) {
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
this.dataMimeType = dataMimeType;
}

protected MimeType getDataMimeType() {
return this.dataMimeType;
}

public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
this.rsocketStrategies = rsocketStrategies;
}

public RSocketStrategies getRSocketStrategies() {
return this.rsocketStrategies;
}

public void setEndpoints(IntegrationRSocketEndpoint... endpoints) {
Assert.notNull(endpoints, "'endpoints' must not be null");
for (IntegrationRSocketEndpoint endpoint : endpoints) {
addEndpoint(endpoint);
}
}

public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
this.rsocketAcceptor.addEndpoint(endpoint);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
this.rsocketAcceptor.setApplicationContext(applicationContext);
}

protected ApplicationContext getApplicationContext() {
return this.applicationContext;
}

@Override
public void afterPropertiesSet() {
this.rsocketAcceptor.setDefaultDataMimeType(this.dataMimeType);
this.rsocketAcceptor.setRSocketStrategies(this.rsocketStrategies);
this.rsocketAcceptor.afterPropertiesSet();
}

@Override
public void afterSingletonsInstantiated() {
this.rsocketAcceptor.detectEndpoints();
}

@Override
public void start() {
if (!this.running) {
this.running = true;
doStart();
}
}

protected abstract void doStart();

@Override
public void stop() {
this.running = false;
}

@Override
public boolean isRunning() {
return this.running;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@

import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

import io.rsocket.Payload;
import io.rsocket.RSocket;
Expand All @@ -39,7 +35,11 @@
import reactor.core.publisher.Mono;

/**
* A client connector to the RSocket server.
* A client {@link AbstractRSocketConnector} extension to the RSocket server.
* <p>
* Note: the {@link RSocketFactory.ClientRSocketFactory#acceptor(Function)}
* in the provided {@link #factoryConfigurer} is overridden with an internal {@link IntegrationRSocketAcceptor}
* for the proper Spring Integration channel adapter mappings.
*
* @author Artem Bilan
*
Expand All @@ -48,17 +48,17 @@
* @see RSocketFactory.ClientRSocketFactory
* @see RSocketRequester
*/
public class ClientRSocketConnector implements InitializingBean, DisposableBean {
public class ClientRSocketConnector extends AbstractRSocketConnector {

private final ClientTransport clientTransport;

private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };

private Payload connectPayload = EmptyPayload.INSTANCE;
private String connectRoute;

private RSocketStrategies rsocketStrategies = RSocketStrategies.builder().build();
private String connectData = "";

private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
private boolean autoConnect;

private Mono<RSocket> rsocketMono;

Expand All @@ -71,47 +71,51 @@ public ClientRSocketConnector(URI uri) {
}

public ClientRSocketConnector(ClientTransport clientTransport) {
super(new IntegrationRSocketAcceptor());
Assert.notNull(clientTransport, "'clientTransport' must not be null");
this.clientTransport = clientTransport;
}

public void setDataMimeType(MimeType dataMimeType) {
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
this.dataMimeType = dataMimeType;
}

public void setFactoryConfigurer(Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer) {
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
this.factoryConfigurer = factoryConfigurer;
}

public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
this.rsocketStrategies = rsocketStrategies;
public void setConnectRoute(String connectRoute) {
this.connectRoute = connectRoute;
}

public void setConnectRoute(String connectRoute) {
this.connectPayload = DefaultPayload.create("", connectRoute);
public void setConnectData(String connectData) {
Assert.notNull(connectData, "'connectData' must not be null");
this.connectData = connectData;
}

@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
RSocketFactory.ClientRSocketFactory clientFactory =
RSocketFactory.connect()
.dataMimeType(this.dataMimeType.toString());
.dataMimeType(getDataMimeType().toString());
this.factoryConfigurer.accept(clientFactory);
clientFactory.setupPayload(this.connectPayload);
clientFactory.acceptor(this.rsocketAcceptor);
Payload connectPayload = EmptyPayload.INSTANCE;
if (this.connectRoute != null) {
connectPayload = DefaultPayload.create(this.connectData, this.connectRoute);
}
clientFactory.setupPayload(connectPayload);
this.rsocketMono = clientFactory.transport(this.clientTransport).start().cache();
}

public void connect() {
this.rsocketMono.subscribe();
@Override
public void afterSingletonsInstantiated() {
this.autoConnect = this.rsocketAcceptor.detectEndpoints();
}

public Mono<RSocketRequester> getRSocketRequester() {
return this.rsocketMono
.map(rsocket -> RSocketRequester.wrap(rsocket, this.dataMimeType, this.rsocketStrategies))
.cache();
@Override
protected void doStart() {
if (this.autoConnect) {
connect();
}
}

@Override
Expand All @@ -121,4 +125,17 @@ public void destroy() {
.subscribe();
}

/**
* Perform subscription into the RSocket server for incoming requests.
*/
public void connect() {
this.rsocketMono.subscribe();
}

public Mono<RSocketRequester> getRSocketRequester() {
return this.rsocketMono
.map(rsocket -> RSocketRequester.wrap(rsocket, getDataMimeType(), getRSocketStrategies()))
.cache();
}

}
Loading