Skip to content

GH-3199: FailoverClientCF - Fail Back Option #3200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.integration.ip.tcp.connection;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
Expand All @@ -34,18 +35,70 @@
* Given a list of connection factories, serves up {@link TcpConnection}s
* that can iterate over a connection from each factory until the write
* succeeds or the list is exhausted.
*
* @author Gary Russell
* @since 2.2
*
*/
public class FailoverClientConnectionFactory extends AbstractClientConnectionFactory {

private static final long DEFAULT_REFRESH_SHARED_INTERVAL = 0L;

private final List<AbstractClientConnectionFactory> factories;

private final boolean cachingDelegates;

private long refreshSharedInterval = DEFAULT_REFRESH_SHARED_INTERVAL;

private boolean closeOnRefresh;

private volatile long creationTime;

/**
* Construct an instance with the provided delegate factories.
* @param factories the delegates.
*/
public FailoverClientConnectionFactory(List<AbstractClientConnectionFactory> factories) {
super("", 0);
Assert.notEmpty(factories, "At least one factory is required");
this.factories = factories;
this.factories = new ArrayList<>(factories);
this.cachingDelegates = factories.stream()
.anyMatch(factory -> factory instanceof CachingClientConnectionFactory);
}

/**
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
* specify how long to wait before trying to fail back to start from the beginning of
* the factory list. Default is 0 for backwards compatibility to always try to get a
* connection to the primary server. If you don't want to fail back until the current
* connection is closed, set this to {@link Long#MAX_VALUE}.
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
* @param refreshSharedInterval the interval in milliseconds.
* @since 4.3.22
* @see #setSingleUse(boolean)
* @see #setCloseOnRefresh(boolean)
*/
public void setRefreshSharedInterval(long refreshSharedInterval) {
Assert.isTrue(!this.cachingDelegates,
"'refreshSharedInterval' cannot be changed when using 'CachingClientConnectionFactory` delegates");
this.refreshSharedInterval = refreshSharedInterval;
}

/**
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
* set this to true to close the old shared connection after a refresh. If this is
* false, the connection will remain open, but unused until its connection factory is
* again used to get a connection. Default is false for backwards compatibility.
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
* @param closeOnRefresh true to close.
* @since 4.3.22
* @see #setSingleUse(boolean)
* @see #setRefreshSharedInterval(long)
*/
public void setCloseOnRefresh(boolean closeOnRefresh) {
Assert.isTrue(!this.cachingDelegates,
"'closeOnRefresh' cannot be changed when using 'CachingClientConnectionFactory` delegates");
this.closeOnRefresh = closeOnRefresh;
}

@Override
Expand Down Expand Up @@ -93,27 +146,42 @@ public void registerSender(TcpSender sender) {

@Override
protected TcpConnectionSupport obtainConnection() throws InterruptedException {
TcpConnectionSupport connection = this.getTheConnection();
if (connection != null && connection.isOpen()) {
((FailoverTcpConnection) connection).incrementEpoch();
return connection;
FailoverTcpConnection sharedConnection = (FailoverTcpConnection) getTheConnection();
boolean shared = !isSingleUse() && !this.cachingDelegates;
boolean refreshShared = shared
&& sharedConnection != null
&& System.currentTimeMillis() > this.creationTime + this.refreshSharedInterval;
if (sharedConnection != null && sharedConnection.isOpen() && !refreshShared) {
sharedConnection.incrementEpoch();
return sharedConnection;
}
FailoverTcpConnection failoverTcpConnection = new FailoverTcpConnection(this.factories);
if (getListener() != null) {
failoverTcpConnection.registerListener(getListener());
}
failoverTcpConnection.incrementEpoch();
this.creationTime = System.currentTimeMillis();
if (shared) {
/*
* We may have simply wrapped the same connection in a new wrapper; don't close.
*/
if (refreshShared && this.closeOnRefresh
&& !sharedConnection.delegate.equals(failoverTcpConnection.delegate)
&& sharedConnection.isOpen()) {
sharedConnection.close();
}
setTheConnection(failoverTcpConnection);
}
return failoverTcpConnection;
}


@Override
public void start() {
for (AbstractClientConnectionFactory factory : this.factories) {
factory.enableManualListenerRegistration();
factory.start();
}
this.setActive(true);
setActive(true);
super.start();
}

Expand Down Expand Up @@ -150,16 +218,16 @@ private final class FailoverTcpConnection extends TcpConnectionSupport implement

private final String connectionId;

private final AtomicLong epoch = new AtomicLong();

private volatile Iterator<AbstractClientConnectionFactory> factoryIterator;

private volatile AbstractClientConnectionFactory currentFactory;

private volatile TcpConnectionSupport delegate;
volatile TcpConnectionSupport delegate; // NOSONAR visibility

private volatile boolean open = true;

private final AtomicLong epoch = new AtomicLong();

private FailoverTcpConnection(List<AbstractClientConnectionFactory> factories) throws InterruptedException {
this.connectionFactories = factories;
this.factoryIterator = factories.iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

Expand All @@ -39,6 +42,7 @@

import org.junit.Rule;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -111,6 +115,58 @@ public void testFailoverGood() throws Exception {
Mockito.verify(conn2).send(message);
}

@Test
public void testRefreshShared() throws Exception {
testRefreshShared(false);
}

@Test
public void testRefreshSharedCloseOnRefresh() throws Exception {
testRefreshShared(true);
}

private void testRefreshShared(boolean closeOnRefresh) throws Exception {
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
List<AbstractClientConnectionFactory> factories = new ArrayList<AbstractClientConnectionFactory>();
factories.add(factory1);
factories.add(factory2);
TcpConnectionSupport conn1 = makeMockConnection();
doReturn("conn1").when(conn1).getConnectionId();
TcpConnectionSupport conn2 = makeMockConnection();
doReturn("conn2").when(conn2).getConnectionId();
doThrow(new UncheckedIOException(new IOException("fail")))
.when(factory1).getConnection();
if (closeOnRefresh) {
when(factory2.getConnection()).thenReturn(conn1, conn2);
}
else {
when(factory2.getConnection()).thenReturn(conn1);
}
when(factory1.isActive()).thenReturn(true);
when(factory2.isActive()).thenReturn(true);
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
failoverFactory.setCloseOnRefresh(closeOnRefresh);
failoverFactory.start();
TcpConnectionSupport connection = failoverFactory.getConnection();
assertThat(TestUtils.getPropertyValue(failoverFactory, "theConnection")).isNotNull();
failoverFactory.setRefreshSharedInterval(10_000);
assertThat(failoverFactory.getConnection()).isSameAs(connection);
failoverFactory.setRefreshSharedInterval(-1);
assertThat(failoverFactory.getConnection()).isNotSameAs(connection);
InOrder inOrder = inOrder(factory1, factory2, conn1);
inOrder.verify(factory1).getConnection();
inOrder.verify(factory2).getConnection();
inOrder.verify(factory1).getConnection();
inOrder.verify(factory2).getConnection();
if (closeOnRefresh) {
inOrder.verify(conn1).close();
}
else {
inOrder.verify(conn1, never()).close();
}
}

@Test(expected = UncheckedIOException.class)
public void testFailoverAllDead() throws Exception {
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
Expand Down
17 changes: 17 additions & 0 deletions src/reference/asciidoc/ip.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,23 @@ The following example shows how to configure a failover client connection factor

NOTE: When using the failover connection factory, the `singleUse` property must be consistent between the factory itself and the list of factories it is configured to use.

The connnection factory has two properties when used with a shared connection (`singleUse=false`):

* `refreshSharedInterval`
* `closeOnRefresh`

These are `0` and `false` to retain the same behavior that existed before the properties were added.

Consider the following scenario based on the above configuration:
Let's say `clientFactory1` cannot establish a connection but `clientFactory2` can.
Each time the `failCF` `getConnection()` method is called, we will again attempt to connect using `clientFactory1`; if successful, the "old" connection will remain open and may be reused in future if the first factory fails once more.

Set `refreshSharedInterval` to only attempt to reconnect with the first factory after that time has expired; set it to `Long.MAX_VALUE` if you only want to fail back to the first factory when the current connection fails.

Set `closeOnRefresh` to close the "old" connection after a refresh actually creates a new connection.

IMPORTANT: These properties do not apply if any of the delegate factories is a `CachingClientConnectionFactory` because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.

[[tcp-affinity-cf]]
==== TCP Thread Affinity Connection Factory

Expand Down