Skip to content

Commit bd40a93

Browse files
committed
Ensure Environment.shutdown() in Reactor2TcpClient
Issue: SPR-14229
1 parent 240f254 commit bd40a93

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> {
8484

8585
private final EventLoopGroup eventLoopGroup;
8686

87+
private final Environment environment;
88+
8789
private final TcpClientFactory<Message<P>, Message<P>> tcpClientSpecFactory;
8890

8991
private final List<TcpClient<Message<P>, Message<P>>> tcpClients =
@@ -108,12 +110,13 @@ public Reactor2TcpClient(final String host, final int port, final Codec<Buffer,
108110
// Reactor 2.0.5 requires NioEventLoopGroup vs 2.0.6+ requires EventLoopGroup
109111
final NioEventLoopGroup nioEventLoopGroup = initEventLoopGroup();
110112
this.eventLoopGroup = nioEventLoopGroup;
113+
this.environment = new Environment(new SynchronousDispatcherConfigReader());
111114

112115
this.tcpClientSpecFactory = new TcpClientFactory<Message<P>, Message<P>>() {
113116
@Override
114117
public TcpClientSpec<Message<P>, Message<P>> apply(TcpClientSpec<Message<P>, Message<P>> spec) {
115118
return spec
116-
.env(new Environment(new SynchronousDispatcherConfigReader()))
119+
.env(environment)
117120
.codec(codec)
118121
.connect(host, port)
119122
.options(createClientSocketOptions());
@@ -139,6 +142,7 @@ public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecF
139142
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
140143
this.tcpClientSpecFactory = tcpClientSpecFactory;
141144
this.eventLoopGroup = null;
145+
this.environment = null;
142146
}
143147

144148

@@ -269,6 +273,15 @@ public void operationComplete(Future<Object> future) throws Exception {
269273
promise = eventLoopPromise;
270274
}
271275

276+
if (this.environment != null) {
277+
promise.onComplete(new Consumer<Promise<Void>>() {
278+
@Override
279+
public void accept(Promise<Void> voidPromise) {
280+
environment.shutdown();
281+
}
282+
});
283+
}
284+
272285
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
273286
}
274287

0 commit comments

Comments
 (0)