Skip to content

Commit 1fea71b

Browse files
author
fuchaohong
committed
RBF: RouterObserverReadProxyProvider should perform an msync before executing the first read.
1 parent b60497f commit 1fea71b

File tree

2 files changed

+83
-1
lines changed

2 files changed

+83
-1
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ public class RouterObserverReadProxyProvider<T> extends AbstractNNFailoverProxyP
8282
*/
8383
private volatile long lastMsyncTimeMs = -1;
8484

85+
/**
86+
* A client using RouterObserverReadProxyProvider should first sync with the
87+
* Active NameNode on startup. This ensures that the client reads data which
88+
* is consistent with the state of the world as of the time of its
89+
* instantiation. This variable will be true after this initial sync has
90+
* been performed.
91+
*/
92+
private volatile boolean msynced = false;
93+
8594
public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class<T> xface,
8695
HAProxyFactory<T> factory) {
8796
this(conf, uri, xface, factory, new IPFailoverProxyProvider<>(conf, uri, xface, factory));
@@ -154,6 +163,23 @@ private ClientProtocol getProxyAsClientProtocol(T proxy) {
154163
return (ClientProtocol) proxy;
155164
}
156165

166+
/**
167+
* This will call {@link ClientProtocol#msync()} on the active NameNode
168+
* (via the {@link #innerProxy}) to initialize the state of this client.
169+
* Calling it multiple times is a no-op; only the first will perform an
170+
* msync.
171+
*
172+
* @see #msynced
173+
*/
174+
private synchronized void initializeMsync() throws IOException {
175+
if (msynced) {
176+
return; // No need for an msync
177+
}
178+
getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync();
179+
msynced = true;
180+
lastMsyncTimeMs = Time.monotonicNow();
181+
}
182+
157183
/**
158184
* This will call {@link ClientProtocol#msync()} on the active NameNode
159185
* (via the {@link #innerProxy}) to update the state of this client, only
@@ -209,7 +235,13 @@ public void close() throws IOException {
209235
@Override
210236
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
211237
if (observerReadEnabled && isRead(method)) {
212-
autoMsyncIfNecessary();
238+
if (!msynced) {
239+
// An msync() must first be performed to ensure that this client is
240+
// up-to-date with the active's state. This will only be done once.
241+
initializeMsync();
242+
} else {
243+
autoMsyncIfNecessary();
244+
}
213245
}
214246

215247
try {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,56 @@ public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSett
815815
initialLengthOfRootListing + 10, rootFolderAfterMkdir.length);
816816
}
817817

818+
@EnumSource(ConfigSetting.class)
819+
@ParameterizedTest
820+
public void testAutoMsyncDefault(ConfigSetting configSetting) throws Exception {
821+
Configuration clientConfiguration = getConfToEnableObserverReads(configSetting);
822+
fileSystem = routerContext.getFileSystem(clientConfiguration);
823+
824+
List<? extends FederationNamenodeContext> namenodes = routerContext
825+
.getRouter().getNamenodeResolver()
826+
.getNamenodesForNameserviceId(cluster.getNameservices().get(0), true);
827+
assertEquals("First namenode should be observer", namenodes.get(0).getState(),
828+
FederationNamenodeServiceState.OBSERVER);
829+
Path path = new Path("/");
830+
831+
long rpcCountForActive;
832+
long rpcCountForObserver;
833+
834+
// Send read requests
835+
int numListings = 15;
836+
for (int i = 0; i < numListings; i++) {
837+
fileSystem.listFiles(path, false);
838+
}
839+
fileSystem.close();
840+
841+
rpcCountForActive = routerContext.getRouter().getRpcServer()
842+
.getRPCMetrics().getActiveProxyOps();
843+
844+
rpcCountForObserver = routerContext.getRouter().getRpcServer()
845+
.getRPCMetrics().getObserverProxyOps();
846+
847+
switch (configSetting) {
848+
case USE_NAMENODE_PROXY_FLAG:
849+
// First read goes to active.
850+
assertEquals("Calls sent to the active", 1, rpcCountForActive);
851+
// The rest of the reads are sent to the observer.
852+
assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver);
853+
break;
854+
case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER:
855+
case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER:
856+
// An msync is sent to each active namenode for each read.
857+
// Total msyncs will be (1 * num_of_nameservices).
858+
assertEquals("Msyncs sent to the active namenodes",
859+
NUM_NAMESERVICES * 1, rpcCountForActive);
860+
// All reads should be sent of the observer.
861+
assertEquals("Reads sent to observer", numListings, rpcCountForObserver);
862+
break;
863+
default:
864+
Assertions.fail("Unknown config setting: " + configSetting);
865+
}
866+
}
867+
818868
@EnumSource(ConfigSetting.class)
819869
@ParameterizedTest
820870
public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception {

0 commit comments

Comments
 (0)