Skip to content

Commit 7631440

Browse files
committed
HDFS-17659.[ARR]Router Quota supports asynchronous rpc.
1 parent 97b37c0 commit 7631440

File tree

4 files changed

+173
-4
lines changed

4 files changed

+173
-4
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/AsyncQuota.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.CompletionException;
2828

2929
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
30-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
3130
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
3231

3332
public class AsyncQuota extends Quota {
@@ -48,7 +47,7 @@ public QuotaUsage getQuotaUsage(String path) throws IOException {
4847
asyncApply(o -> {
4948
Map<RemoteLocation, QuotaUsage> results = (Map<RemoteLocation, QuotaUsage>) o;
5049
try {
51-
return AsyncQuota.super.aggregateQuota(path, results);
50+
return aggregateQuota(path, results);
5251
} catch (IOException e) {
5352
throw new CompletionException(e);
5453
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
45+
4446
/**
4547
* Service to periodically update the {@link RouterQuotaUsage}
4648
* cached information in the {@link Router}.
@@ -100,7 +102,7 @@ protected void periodicInvoke() {
100102
// For other mount entry get current quota usage
101103
HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
102104
if (rpcServer.isAsync()) {
103-
ret = (HdfsFileStatus) syncReturn(HdfsFileStatus.class);
105+
ret = syncReturn(HdfsFileStatus.class);
104106
}
105107
if (ret == null || ret.getModificationTime() == 0) {
106108
long[] zeroConsume = new long[StorageType.values().length];
@@ -142,6 +144,8 @@ protected void periodicInvoke() {
142144
}
143145
} catch (IOException e) {
144146
LOG.error("Quota cache updated error.", e);
147+
} catch (Exception e) {
148+
LOG.error(e.toString());
145149
}
146150
}
147151

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FSDataOutputStream;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.fs.QuotaUsage;
25+
import org.apache.hadoop.fs.StorageType;
26+
import org.apache.hadoop.fs.permission.FsPermission;
27+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
28+
import org.apache.hadoop.hdfs.server.federation.MockResolver;
29+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
30+
import org.apache.hadoop.ipc.CallerContext;
31+
import org.junit.After;
32+
import org.junit.AfterClass;
33+
import org.junit.Assert;
34+
import org.junit.Before;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
import org.mockito.Mockito;
38+
39+
import java.io.IOException;
40+
import java.util.concurrent.TimeUnit;
41+
42+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
43+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
44+
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
45+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
46+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
47+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
48+
import static org.junit.Assert.assertTrue;
49+
50+
public class TestRouterAsyncQuota {
51+
private static Configuration routerConf;
52+
/** Federated HDFS cluster. */
53+
private static MiniRouterDFSCluster cluster;
54+
private static String ns0;
55+
56+
/** Random Router for this federated cluster. */
57+
private MiniRouterDFSCluster.RouterContext router;
58+
private FileSystem routerFs;
59+
private RouterRpcServer routerRpcServer;
60+
private AsyncQuota asyncQuota;
61+
62+
private final String testfilePath = "/testdir/testAsyncQuota.file";
63+
64+
@BeforeClass
65+
public static void setUpCluster() throws Exception {
66+
cluster = new MiniRouterDFSCluster(true, 1, 2,
67+
DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
68+
cluster.setNumDatanodesPerNameservice(3);
69+
cluster.setRacks(
70+
new String[] {"/rack1", "/rack2", "/rack3"});
71+
cluster.startCluster();
72+
73+
// Making one Namenode active per nameservice
74+
if (cluster.isHighAvailability()) {
75+
for (String ns : cluster.getNameservices()) {
76+
cluster.switchToActive(ns, NAMENODES[0]);
77+
cluster.switchToStandby(ns, NAMENODES[1]);
78+
}
79+
}
80+
// Start routers with only an RPC service
81+
routerConf = new RouterConfigBuilder()
82+
.rpc()
83+
.quota(true)
84+
.build();
85+
86+
// Reduce the number of RPC clients threads to overload the Router easy
87+
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
88+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
89+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
90+
// We decrease the DN cache times to make the test faster
91+
routerConf.setTimeDuration(
92+
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
93+
routerConf.setBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY, true);
94+
cluster.addRouterOverrides(routerConf);
95+
// Start routers with only an RPC service
96+
cluster.startRouters();
97+
98+
// Register and verify all NNs with all routers
99+
cluster.registerNamenodes();
100+
cluster.waitNamenodeRegistration();
101+
cluster.waitActiveNamespaces();
102+
ns0 = cluster.getNameservices().get(0);
103+
}
104+
105+
@AfterClass
106+
public static void shutdownCluster() throws Exception {
107+
if (cluster != null) {
108+
cluster.shutdown();
109+
}
110+
}
111+
112+
@Before
113+
public void setUp() throws IOException {
114+
router = cluster.getRandomRouter();
115+
routerFs = router.getFileSystem();
116+
routerRpcServer = router.getRouterRpcServer();
117+
routerRpcServer.initAsyncThreadPool();
118+
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
119+
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
120+
routerRpcServer.getRPCMonitor(),
121+
routerRpcServer.getRouterStateIdContext());
122+
RouterRpcServer spy = Mockito.spy(routerRpcServer);
123+
Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
124+
asyncQuota = new AsyncQuota(router.getRouter(), spy);
125+
126+
// Create mock locations
127+
MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
128+
resolver.addLocation("/", ns0, "/");
129+
FsPermission permission = new FsPermission("705");
130+
routerFs.mkdirs(new Path("/testdir"), permission);
131+
FSDataOutputStream fsDataOutputStream = routerFs.create(
132+
new Path(testfilePath), true);
133+
fsDataOutputStream.write(new byte[1024]);
134+
fsDataOutputStream.close();
135+
}
136+
137+
@After
138+
public void tearDown() throws IOException {
139+
// clear client context
140+
CallerContext.setCurrent(null);
141+
boolean delete = routerFs.delete(new Path("/testdir"));
142+
assertTrue(delete);
143+
if (routerFs != null) {
144+
routerFs.close();
145+
}
146+
}
147+
148+
@Test
149+
public void testRouterAsyncGetQuotaUsage() throws Exception {
150+
asyncQuota.getQuotaUsage("/testdir");
151+
QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
152+
// 3-replication.
153+
Assert.assertEquals(3 * 1024, quotaUsage.getSpaceConsumed());
154+
// We have one directory and one file.
155+
Assert.assertEquals(2, quotaUsage.getFileAndDirectoryCount());
156+
}
157+
158+
@Test
159+
public void testRouterAsyncSetQuotaUsage() throws Exception {
160+
asyncQuota.setQuota("/testdir", Long.MAX_VALUE, 8096, StorageType.DISK, false);
161+
syncReturn(void.class);
162+
asyncQuota.getQuotaUsage("/testdir");
163+
QuotaUsage quotaUsage = syncReturn(QuotaUsage.class);
164+
Assert.assertEquals(8096, quotaUsage.getTypeQuota(StorageType.DISK));
165+
}
166+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ static INodeDirectory unprotectedSetQuota(
342342
if ((type != null) && (!fsd.isQuotaByStorageTypeEnabled() ||
343343
nsQuota != HdfsConstants.QUOTA_DONT_SET)) {
344344
throw new UnsupportedActionException(
345-
"Failed to set quota by storage type because either" +
345+
"Failed to set quota by storage type because either " +
346346
DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY + " is set to " +
347347
fsd.isQuotaByStorageTypeEnabled() + " or nsQuota value is illegal " +
348348
nsQuota);

0 commit comments

Comments
 (0)