Skip to content

Commit fa0caeb

Browse files
committed
GH-3199: FailoverClientCF - Fail Back Option
Resolves: #3199 Previously, the FCCF did not cache a shared connection; if server 1 is down and server 2 is up, this caused an attempt to connect to server 1 every time we got the connection. Add 2 options: `refreshSharedInterval` and `closeOnRefresh`, defaulting to 0 and false respectively, to maintain the same behavior as before the options existed. Disallow caching of the single shared connection if the delegate factories are `CachingClientConnectionFactory` instances. **cherry-pick to 5.2.x** I will backport to 5.1.x, 4.3.x after review/merge. * Polish javadocs and fix typo in docs
1 parent 7ced027 commit fa0caeb

File tree

3 files changed

+162
-12
lines changed

3 files changed

+162
-12
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java

+84-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import java.util.ArrayList;
1920
import java.util.Iterator;
2021
import java.util.List;
2122
import java.util.UUID;
@@ -34,18 +35,76 @@
3435
* Given a list of connection factories, serves up {@link TcpConnection}s
3536
* that can iterate over a connection from each factory until the write
3637
* succeeds or the list is exhausted.
38+
*
3739
* @author Gary Russell
3840
* @since 2.2
3941
*
4042
*/
4143
public class FailoverClientConnectionFactory extends AbstractClientConnectionFactory {
4244

45+
private static final long DEFAULT_REFRESH_SHARED_INTERVAL = 0L;
46+
4347
private final List<AbstractClientConnectionFactory> factories;
4448

49+
private final boolean cachingDelegates;
50+
51+
private long refreshSharedInterval = DEFAULT_REFRESH_SHARED_INTERVAL;
52+
53+
private boolean closeOnRefresh;
54+
55+
private volatile long creationTime;
56+
57+
/**
58+
* Construct an instance with the provided delegate factories.
59+
* @param factories the delegates.
60+
*/
4561
public FailoverClientConnectionFactory(List<AbstractClientConnectionFactory> factories) {
4662
super("", 0);
4763
Assert.notEmpty(factories, "At least one factory is required");
48-
this.factories = factories;
64+
this.factories = new ArrayList<AbstractClientConnectionFactory>(factories);
65+
boolean cachingDelegates = false;
66+
for (AbstractClientConnectionFactory factory : factories) {
67+
if (factory instanceof CachingClientConnectionFactory) {
68+
cachingDelegates = true;
69+
break;
70+
}
71+
}
72+
this.cachingDelegates = cachingDelegates;
73+
}
74+
75+
/**
76+
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
77+
* specify how long to wait before trying to fail back to start from the beginning of
78+
* the factory list. Default is 0 for backwards compatibility to always try to get a
79+
* connection to the primary server. If you don't want to fail back until the current
80+
* connection is closed, set this to {@link Long#MAX_VALUE}.
81+
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
82+
* @param refreshSharedInterval the interval in milliseconds.
83+
* @since 4.3.22
84+
* @see #setSingleUse(boolean)
85+
* @see #setCloseOnRefresh(boolean)
86+
*/
87+
public void setRefreshSharedInterval(long refreshSharedInterval) {
88+
Assert.isTrue(!this.cachingDelegates,
89+
"'refreshSharedInterval' cannot be changed when using 'CachingClientConnectionFactory` delegates");
90+
this.refreshSharedInterval = refreshSharedInterval;
91+
}
92+
93+
/**
94+
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
95+
* set this to true to close the old shared connection after a refresh. If this is
96+
* false, the connection will remain open, but unused until its connection factory is
97+
* again used to get a connection. Default is false for backwards compatibility.
98+
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
99+
* @param closeOnRefresh true to close.
100+
* @since 4.3.22
101+
* @see #setSingleUse(boolean)
102+
* @see #setRefreshSharedInterval(long)
103+
*/
104+
public void setCloseOnRefresh(boolean closeOnRefresh) {
105+
Assert.isTrue(!this.cachingDelegates,
106+
"'closeOnRefresh' cannot be changed when using 'CachingClientConnectionFactory` delegates");
107+
this.closeOnRefresh = closeOnRefresh;
49108
}
50109

51110
@Override
@@ -93,27 +152,42 @@ public void registerSender(TcpSender sender) {
93152

94153
@Override
95154
protected TcpConnectionSupport obtainConnection() throws Exception {
96-
TcpConnectionSupport connection = this.getTheConnection();
97-
if (connection != null && connection.isOpen()) {
98-
((FailoverTcpConnection) connection).incrementEpoch();
99-
return connection;
155+
FailoverTcpConnection sharedConnection = (FailoverTcpConnection) getTheConnection();
156+
boolean shared = !isSingleUse() && !this.cachingDelegates;
157+
boolean refreshShared = shared
158+
&& sharedConnection != null
159+
&& System.currentTimeMillis() > this.creationTime + this.refreshSharedInterval;
160+
if (sharedConnection != null && sharedConnection.isOpen() && !refreshShared) {
161+
sharedConnection.incrementEpoch();
162+
return sharedConnection;
100163
}
101164
FailoverTcpConnection failoverTcpConnection = new FailoverTcpConnection(this.factories);
102165
if (getListener() != null) {
103166
failoverTcpConnection.registerListener(getListener());
104167
}
105168
failoverTcpConnection.incrementEpoch();
169+
if (shared) {
170+
this.creationTime = System.currentTimeMillis();
171+
/*
172+
* We may have simply wrapped the same connection in a new wrapper; don't close.
173+
*/
174+
if (refreshShared && this.closeOnRefresh
175+
&& !sharedConnection.delegate.equals(failoverTcpConnection.delegate)
176+
&& sharedConnection.isOpen()) {
177+
sharedConnection.close();
178+
}
179+
setTheConnection(failoverTcpConnection);
180+
}
106181
return failoverTcpConnection;
107182
}
108183

109-
110184
@Override
111185
public void start() {
112186
for (AbstractClientConnectionFactory factory : this.factories) {
113187
factory.enableManualListenerRegistration();
114188
factory.start();
115189
}
116-
this.setActive(true);
190+
setActive(true);
117191
super.start();
118192
}
119193

@@ -150,16 +224,16 @@ private final class FailoverTcpConnection extends TcpConnectionSupport implement
150224

151225
private final String connectionId;
152226

227+
private final AtomicLong epoch = new AtomicLong();
228+
153229
private volatile Iterator<AbstractClientConnectionFactory> factoryIterator;
154230

155231
private volatile AbstractClientConnectionFactory currentFactory;
156232

157-
private volatile TcpConnectionSupport delegate;
233+
volatile TcpConnectionSupport delegate; // NOSONAR visibility
158234

159235
private volatile boolean open = true;
160236

161-
private final AtomicLong epoch = new AtomicLong();
162-
163237
private FailoverTcpConnection(List<AbstractClientConnectionFactory> factories) throws Exception {
164238
this.factories = factories;
165239
this.factoryIterator = factories.iterator();

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java

+58-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,12 +23,16 @@
2323
import static org.junit.Assert.assertTrue;
2424
import static org.junit.Assert.fail;
2525
import static org.mockito.Mockito.doAnswer;
26+
import static org.mockito.Mockito.doReturn;
2627
import static org.mockito.Mockito.doThrow;
28+
import static org.mockito.Mockito.inOrder;
2729
import static org.mockito.Mockito.mock;
30+
import static org.mockito.Mockito.never;
2831
import static org.mockito.Mockito.times;
2932
import static org.mockito.Mockito.when;
3033

3134
import java.io.IOException;
35+
import java.io.UncheckedIOException;
3236
import java.net.Socket;
3337
import java.nio.channels.SocketChannel;
3438
import java.util.ArrayList;
@@ -44,6 +48,7 @@
4448
import org.apache.log4j.Level;
4549
import org.junit.Rule;
4650
import org.junit.Test;
51+
import org.mockito.InOrder;
4752
import org.mockito.Mockito;
4853
import org.mockito.invocation.InvocationOnMock;
4954
import org.mockito.stubbing.Answer;
@@ -119,6 +124,58 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
119124
Mockito.verify(conn2).send(message);
120125
}
121126

127+
@Test
128+
public void testRefreshShared() throws Exception {
129+
testRefreshShared(false);
130+
}
131+
132+
@Test
133+
public void testRefreshSharedCloseOnRefresh() throws Exception {
134+
testRefreshShared(true);
135+
}
136+
137+
private void testRefreshShared(boolean closeOnRefresh) throws Exception {
138+
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
139+
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
140+
List<AbstractClientConnectionFactory> factories = new ArrayList<AbstractClientConnectionFactory>();
141+
factories.add(factory1);
142+
factories.add(factory2);
143+
TcpConnectionSupport conn1 = makeMockConnection();
144+
doReturn("conn1").when(conn1).getConnectionId();
145+
TcpConnectionSupport conn2 = makeMockConnection();
146+
doReturn("conn2").when(conn2).getConnectionId();
147+
doThrow(new UncheckedIOException(new IOException("fail")))
148+
.when(factory1).getConnection();
149+
if (closeOnRefresh) {
150+
when(factory2.getConnection()).thenReturn(conn1, conn2);
151+
}
152+
else {
153+
when(factory2.getConnection()).thenReturn(conn1);
154+
}
155+
when(factory1.isActive()).thenReturn(true);
156+
when(factory2.isActive()).thenReturn(true);
157+
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
158+
failoverFactory.setCloseOnRefresh(closeOnRefresh);
159+
failoverFactory.start();
160+
TcpConnectionSupport connection = failoverFactory.getConnection();
161+
assertNotNull(TestUtils.getPropertyValue(failoverFactory, "theConnection"));
162+
failoverFactory.setRefreshSharedInterval(10_000);
163+
assertSame(failoverFactory.getConnection(), connection);
164+
failoverFactory.setRefreshSharedInterval(-1);
165+
assertNotSame(failoverFactory.getConnection(), connection);
166+
InOrder inOrder = inOrder(factory1, factory2, conn1);
167+
inOrder.verify(factory1).getConnection();
168+
inOrder.verify(factory2).getConnection();
169+
inOrder.verify(factory1).getConnection();
170+
inOrder.verify(factory2).getConnection();
171+
if (closeOnRefresh) {
172+
inOrder.verify(conn1).close();
173+
}
174+
else {
175+
inOrder.verify(conn1, never()).close();
176+
}
177+
}
178+
122179
@Test(expected = IOException.class)
123180
public void testFailoverAllDead() throws Exception {
124181
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
@@ -665,4 +722,3 @@ private static class Holder {
665722
}
666723

667724
}
668-

src/reference/asciidoc/ip.adoc

+20
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,26 @@ Initially, the first factory in the configured list is used; if a connection sub
361361
</constructor-arg>
362362
</bean>
363363
----
364+
====
365+
366+
NOTE: When using the failover connection factory, the `singleUse` property must be consistent between the factory itself and the list of factories it is configured to use.
367+
368+
The connection factory has two properties when used with a shared connection (`singleUse=false`):
369+
370+
* `refreshSharedInterval`
371+
* `closeOnRefresh`
372+
373+
These are `0` and `false` to retain the same behavior that existed before the properties were added.
374+
375+
Consider the following scenario based on the above configuration:
376+
Let's say `clientFactory1` cannot establish a connection but `clientFactory2` can.
377+
Each time the `failCF` `getConnection()` method is called, we will again attempt to connect using `clientFactory1`; if successful, the "old" connection will remain open and may be reused in future if the first factory fails once more.
378+
379+
Set `refreshSharedInterval` to only attempt to reconnect with the first factory after that time has expired; set it to `Long.MAX_VALUE` if you only want to fail back to the first factory when the current connection fails.
380+
381+
Set `closeOnRefresh` to close the "old" connection after a refresh actually creates a new connection.
382+
383+
IMPORTANT: These properties do not apply if any of the delegate factories is a `CachingClientConnectionFactory` because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.
364384
365385
NOTE: When using the failover connection factory, the singleUse property must be consistent between the factory itself and the list of factories it is configured to use.
366386

0 commit comments

Comments
 (0)