Skip to content

Commit ac7441a

Browse files
committed
GH-3199: FailoverClientCF - Fail Back Option
Resolves: #3199 Previously, the FCCF did not cache a shared connection; if server 1 is down and server 2 is up, this caused an attempt to connect to server 1 every time we got the connection. Add 2 options: `refreshSharedInterval` and `closeOnRefresh`, defaulting to 0 and false respectively, to maintain the same behavior as before the options existed. Disallow caching of the single shared connection if the delegate factories are `CachingClientConnectionFactory` instances. **cherry-pick to 5.2.x** I will backport to 5.1.x, 4.3.x after review/merge. * Polish javadocs and fix typo in docs
1 parent 32c81d1 commit ac7441a

File tree

3 files changed

+153
-12
lines changed

3 files changed

+153
-12
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import java.util.ArrayList;
1920
import java.util.Iterator;
2021
import java.util.List;
2122
import java.util.UUID;
@@ -34,18 +35,70 @@
3435
* Given a list of connection factories, serves up {@link TcpConnection}s
3536
* that can iterate over a connection from each factory until the write
3637
* succeeds or the list is exhausted.
38+
*
3739
* @author Gary Russell
3840
* @since 2.2
3941
*
4042
*/
4143
public class FailoverClientConnectionFactory extends AbstractClientConnectionFactory {
4244

45+
private static final long DEFAULT_REFRESH_SHARED_INTERVAL = 0L;
46+
4347
private final List<AbstractClientConnectionFactory> factories;
4448

49+
private final boolean cachingDelegates;
50+
51+
private long refreshSharedInterval = DEFAULT_REFRESH_SHARED_INTERVAL;
52+
53+
private boolean closeOnRefresh;
54+
55+
private volatile long creationTime;
56+
57+
/**
58+
* Construct an instance with the provided delegate factories.
59+
* @param factories the delegates.
60+
*/
4561
public FailoverClientConnectionFactory(List<AbstractClientConnectionFactory> factories) {
4662
super("", 0);
4763
Assert.notEmpty(factories, "At least one factory is required");
48-
this.factories = factories;
64+
this.factories = new ArrayList<>(factories);
65+
this.cachingDelegates = factories.stream()
66+
.anyMatch(factory -> factory instanceof CachingClientConnectionFactory);
67+
}
68+
69+
/**
70+
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
71+
* specify how long to wait before trying to fail back to start from the beginning of
72+
* the factory list. Default is 0 for backwards compatibility to always try to get a
73+
* connection to the primary server. If you don't want to fail back until the current
74+
* connection is closed, set this to {@link Long#MAX_VALUE}.
75+
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
76+
* @param refreshSharedInterval the interval in milliseconds.
77+
* @since 4.3.22
78+
* @see #setSingleUse(boolean)
79+
* @see #setCloseOnRefresh(boolean)
80+
*/
81+
public void setRefreshSharedInterval(long refreshSharedInterval) {
82+
Assert.isTrue(!this.cachingDelegates,
83+
"'refreshSharedInterval' cannot be changed when using 'CachingClientConnectionFactory` delegates");
84+
this.refreshSharedInterval = refreshSharedInterval;
85+
}
86+
87+
/**
88+
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
89+
* set this to true to close the old shared connection after a refresh. If this is
90+
* false, the connection will remain open, but unused until its connection factory is
91+
* again used to get a connection. Default is false for backwards compatibility.
92+
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
93+
* @param closeOnRefresh true to close.
94+
* @since 4.3.22
95+
* @see #setSingleUse(boolean)
96+
* @see #setRefreshSharedInterval(long)
97+
*/
98+
public void setCloseOnRefresh(boolean closeOnRefresh) {
99+
Assert.isTrue(!this.cachingDelegates,
100+
"'closeOnRefresh' cannot be changed when using 'CachingClientConnectionFactory` delegates");
101+
this.closeOnRefresh = closeOnRefresh;
49102
}
50103

51104
@Override
@@ -93,27 +146,42 @@ public void registerSender(TcpSender sender) {
93146

94147
@Override
95148
protected TcpConnectionSupport obtainConnection() throws InterruptedException {
96-
TcpConnectionSupport connection = this.getTheConnection();
97-
if (connection != null && connection.isOpen()) {
98-
((FailoverTcpConnection) connection).incrementEpoch();
99-
return connection;
149+
FailoverTcpConnection sharedConnection = (FailoverTcpConnection) getTheConnection();
150+
boolean shared = !isSingleUse() && !this.cachingDelegates;
151+
boolean refreshShared = shared
152+
&& sharedConnection != null
153+
&& System.currentTimeMillis() > this.creationTime + this.refreshSharedInterval;
154+
if (sharedConnection != null && sharedConnection.isOpen() && !refreshShared) {
155+
sharedConnection.incrementEpoch();
156+
return sharedConnection;
100157
}
101158
FailoverTcpConnection failoverTcpConnection = new FailoverTcpConnection(this.factories);
102159
if (getListener() != null) {
103160
failoverTcpConnection.registerListener(getListener());
104161
}
105162
failoverTcpConnection.incrementEpoch();
163+
this.creationTime = System.currentTimeMillis();
164+
if (shared) {
165+
/*
166+
* We may have simply wrapped the same connection in a new wrapper; don't close.
167+
*/
168+
if (refreshShared && this.closeOnRefresh
169+
&& !sharedConnection.delegate.equals(failoverTcpConnection.delegate)
170+
&& sharedConnection.isOpen()) {
171+
sharedConnection.close();
172+
}
173+
setTheConnection(failoverTcpConnection);
174+
}
106175
return failoverTcpConnection;
107176
}
108177

109-
110178
@Override
111179
public void start() {
112180
for (AbstractClientConnectionFactory factory : this.factories) {
113181
factory.enableManualListenerRegistration();
114182
factory.start();
115183
}
116-
this.setActive(true);
184+
setActive(true);
117185
super.start();
118186
}
119187

@@ -150,16 +218,16 @@ private final class FailoverTcpConnection extends TcpConnectionSupport implement
150218

151219
private final String connectionId;
152220

221+
private final AtomicLong epoch = new AtomicLong();
222+
153223
private volatile Iterator<AbstractClientConnectionFactory> factoryIterator;
154224

155225
private volatile AbstractClientConnectionFactory currentFactory;
156226

157-
private volatile TcpConnectionSupport delegate;
227+
volatile TcpConnectionSupport delegate; // NOSONAR visibility
158228

159229
private volatile boolean open = true;
160230

161-
private final AtomicLong epoch = new AtomicLong();
162-
163231
private FailoverTcpConnection(List<AbstractClientConnectionFactory> factories) throws InterruptedException {
164232
this.connectionFactories = factories;
165233
this.factoryIterator = factories.iterator();

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,8 +19,11 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.Mockito.doAnswer;
22+
import static org.mockito.Mockito.doReturn;
2223
import static org.mockito.Mockito.doThrow;
24+
import static org.mockito.Mockito.inOrder;
2325
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.never;
2427
import static org.mockito.Mockito.times;
2528
import static org.mockito.Mockito.when;
2629

@@ -39,6 +42,7 @@
3942

4043
import org.junit.Rule;
4144
import org.junit.Test;
45+
import org.mockito.InOrder;
4246
import org.mockito.Mockito;
4347

4448
import org.springframework.beans.factory.BeanFactory;
@@ -111,6 +115,58 @@ public void testFailoverGood() throws Exception {
111115
Mockito.verify(conn2).send(message);
112116
}
113117

118+
@Test
119+
public void testRefreshShared() throws Exception {
120+
testRefreshShared(false);
121+
}
122+
123+
@Test
124+
public void testRefreshSharedCloseOnRefresh() throws Exception {
125+
testRefreshShared(true);
126+
}
127+
128+
private void testRefreshShared(boolean closeOnRefresh) throws Exception {
129+
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
130+
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
131+
List<AbstractClientConnectionFactory> factories = new ArrayList<AbstractClientConnectionFactory>();
132+
factories.add(factory1);
133+
factories.add(factory2);
134+
TcpConnectionSupport conn1 = makeMockConnection();
135+
doReturn("conn1").when(conn1).getConnectionId();
136+
TcpConnectionSupport conn2 = makeMockConnection();
137+
doReturn("conn2").when(conn2).getConnectionId();
138+
doThrow(new UncheckedIOException(new IOException("fail")))
139+
.when(factory1).getConnection();
140+
if (closeOnRefresh) {
141+
when(factory2.getConnection()).thenReturn(conn1, conn2);
142+
}
143+
else {
144+
when(factory2.getConnection()).thenReturn(conn1);
145+
}
146+
when(factory1.isActive()).thenReturn(true);
147+
when(factory2.isActive()).thenReturn(true);
148+
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
149+
failoverFactory.setCloseOnRefresh(closeOnRefresh);
150+
failoverFactory.start();
151+
TcpConnectionSupport connection = failoverFactory.getConnection();
152+
assertThat(TestUtils.getPropertyValue(failoverFactory, "theConnection")).isNotNull();
153+
failoverFactory.setRefreshSharedInterval(10_000);
154+
assertThat(failoverFactory.getConnection()).isSameAs(connection);
155+
failoverFactory.setRefreshSharedInterval(-1);
156+
assertThat(failoverFactory.getConnection()).isNotSameAs(connection);
157+
InOrder inOrder = inOrder(factory1, factory2, conn1);
158+
inOrder.verify(factory1).getConnection();
159+
inOrder.verify(factory2).getConnection();
160+
inOrder.verify(factory1).getConnection();
161+
inOrder.verify(factory2).getConnection();
162+
if (closeOnRefresh) {
163+
inOrder.verify(conn1).close();
164+
}
165+
else {
166+
inOrder.verify(conn1, never()).close();
167+
}
168+
}
169+
114170
@Test(expected = UncheckedIOException.class)
115171
public void testFailoverAllDead() throws Exception {
116172
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);

src/reference/asciidoc/ip.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,23 @@ The following example shows how to configure a failover client connection factor
536536

537537
NOTE: When using the failover connection factory, the `singleUse` property must be consistent between the factory itself and the list of factories it is configured to use.
538538

539+
The connnection factory has two properties when used with a shared connection (`singleUse=false`):
540+
541+
* `refreshSharedInterval`
542+
* `closeOnRefresh`
543+
544+
These are `0` and `false` to retain the same behavior that existed before the properties were added.
545+
546+
Consider the following scenario based on the above configuration:
547+
Let's say `clientFactory1` cannot establish a connection but `clientFactory2` can.
548+
Each time the `failCF` `getConnection()` method is called, we will again attempt to connect using `clientFactory1`; if successful, the "old" connection will remain open and may be reused in future if the first factory fails once more.
549+
550+
Set `refreshSharedInterval` to only attempt to reconnect with the first factory after that time has expired; set it to `Long.MAX_VALUE` if you only want to fail back to the first factory when the current connection fails.
551+
552+
Set `closeOnRefresh` to close the "old" connection after a refresh actually creates a new connection.
553+
554+
IMPORTANT: These properties do not apply if any of the delegate factories is a `CachingClientConnectionFactory` because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.
555+
539556
[[tcp-affinity-cf]]
540557
==== TCP Thread Affinity Connection Factory
541558

0 commit comments

Comments
 (0)