Skip to content

Commit 7c1ceda

Browse files
committed
Fix:rebase HDFS-17531
1 parent 8a61436 commit 7c1ceda

File tree

3 files changed

+294
-12
lines changed

3 files changed

+294
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.federation.router;
20+
21+
import org.apache.hadoop.fs.CacheFlag;
22+
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
23+
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
24+
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
25+
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
26+
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
27+
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
28+
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
29+
30+
import java.io.IOException;
31+
import java.util.EnumSet;
32+
import java.util.Map;
33+
34+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
35+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
36+
37+
/**
38+
* Module that implements all the asynchronous RPC calls in
39+
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to Cache Admin
40+
* in the {@link RouterRpcServer}.
41+
*/
42+
public class RouterAsyncCacheAdmin extends RouterCacheAdmin {
43+
44+
public RouterAsyncCacheAdmin(RouterRpcServer server) {
45+
super(server);
46+
}
47+
48+
@Override
49+
public long addCacheDirective(
50+
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
51+
invokeAddCacheDirective(path, flags);
52+
asyncApply((ApplyFunction<Map<RemoteLocation, Long>, Long>)
53+
response -> response.values().iterator().next());
54+
return asyncReturn(Long.class);
55+
}
56+
57+
@Override
58+
public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
59+
long prevId, CacheDirectiveInfo filter) throws IOException {
60+
invokeListCacheDirectives(prevId, filter);
61+
asyncApply((ApplyFunction<Map,
62+
BatchedEntries<CacheDirectiveEntry>>)
63+
response -> (BatchedEntries<CacheDirectiveEntry>) response.values().iterator().next());
64+
return asyncReturn(BatchedEntries.class);
65+
}
66+
67+
@Override
68+
public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException {
69+
invokeListCachePools(prevKey);
70+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, BatchedEntries>,
71+
BatchedEntries<CachePoolEntry>>)
72+
results -> results.values().iterator().next());
73+
return asyncReturn(BatchedEntries.class);
74+
}
75+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterCacheAdmin.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,19 @@ public RouterCacheAdmin(RouterRpcServer server) {
5959

6060
public long addCacheDirective(CacheDirectiveInfo path,
6161
EnumSet<CacheFlag> flags) throws IOException {
62+
Map<RemoteLocation, Long> response = invokeAddCacheDirective(path, flags);
63+
return response.values().iterator().next();
64+
}
65+
66+
protected Map<RemoteLocation, Long> invokeAddCacheDirective(
67+
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
6268
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
6369
final List<RemoteLocation> locations =
6470
rpcServer.getLocationsForPath(path.getPath().toString(), true, false);
6571
RemoteMethod method = new RemoteMethod("addCacheDirective",
6672
new Class<?>[] {CacheDirectiveInfo.class, EnumSet.class},
6773
new RemoteParam(getRemoteMap(path, locations)), flags);
68-
Map<RemoteLocation, Long> response =
69-
rpcClient.invokeConcurrent(locations, method, false, false, long.class);
70-
return response.values().iterator().next();
74+
return rpcClient.invokeConcurrent(locations, method, false, false, long.class);
7175
}
7276

7377
public void modifyCacheDirective(CacheDirectiveInfo directive,
@@ -100,24 +104,28 @@ public void removeCacheDirective(long id) throws IOException {
100104

101105
public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(long prevId,
102106
CacheDirectiveInfo filter) throws IOException {
107+
Map results = invokeListCacheDirectives(prevId, filter);
108+
return (BatchedEntries<CacheDirectiveEntry>) results.values().iterator().next();
109+
}
110+
111+
protected Map invokeListCacheDirectives(long prevId,
112+
CacheDirectiveInfo filter) throws IOException {
103113
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
104114
if (filter.getPath() != null) {
105115
final List<RemoteLocation> locations = rpcServer
106116
.getLocationsForPath(filter.getPath().toString(), true, false);
107117
RemoteMethod method = new RemoteMethod("listCacheDirectives",
108118
new Class<?>[] {long.class, CacheDirectiveInfo.class}, prevId,
109119
new RemoteParam(getRemoteMap(filter, locations)));
110-
Map<RemoteLocation, BatchedEntries> response = rpcClient.invokeConcurrent(
120+
return rpcClient.invokeConcurrent(
111121
locations, method, false, false, BatchedEntries.class);
112-
return response.values().iterator().next();
113122
}
114123
RemoteMethod method = new RemoteMethod("listCacheDirectives",
115124
new Class<?>[] {long.class, CacheDirectiveInfo.class}, prevId,
116125
filter);
117126
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
118-
Map<FederationNamespaceInfo, BatchedEntries> results = rpcClient
119-
.invokeConcurrent(nss, method, true, false, BatchedEntries.class);
120-
return results.values().iterator().next();
127+
return rpcClient.invokeConcurrent(
128+
nss, method, true, false, BatchedEntries.class);
121129
}
122130

123131
public void addCachePool(CachePoolInfo info) throws IOException {
@@ -146,13 +154,17 @@ public void removeCachePool(String cachePoolName) throws IOException {
146154

147155
public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
148156
throws IOException {
157+
Map<FederationNamespaceInfo, BatchedEntries> results = invokeListCachePools(prevKey);
158+
return results.values().iterator().next();
159+
}
160+
161+
protected Map<FederationNamespaceInfo, BatchedEntries> invokeListCachePools(
162+
String prevKey) throws IOException {
149163
rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
150164
RemoteMethod method = new RemoteMethod("listCachePools",
151165
new Class<?>[] {String.class}, prevKey);
152166
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
153-
Map<FederationNamespaceInfo, BatchedEntries> results = rpcClient
154-
.invokeConcurrent(nss, method, true, false, BatchedEntries.class);
155-
return results.values().iterator().next();
167+
return rpcClient.invokeConcurrent(nss, method, true, false, BatchedEntries.class);
156168
}
157169

158170
/**
@@ -161,7 +173,7 @@ public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
161173
* @param locations the locations to map.
162174
* @return map with CacheDirectiveInfo mapped to the locations.
163175
*/
164-
private Map<RemoteLocation, CacheDirectiveInfo> getRemoteMap(
176+
protected Map<RemoteLocation, CacheDirectiveInfo> getRemoteMap(
165177
CacheDirectiveInfo path, final List<RemoteLocation> locations) {
166178
final Map<RemoteLocation, CacheDirectiveInfo> dstMap = new HashMap<>();
167179
Iterator<RemoteLocation> iterator = locations.iterator();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.federation.router;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.CacheFlag;
23+
import org.apache.hadoop.fs.FSDataOutputStream;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
27+
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
28+
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
29+
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
30+
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
31+
import org.apache.hadoop.hdfs.server.federation.MockResolver;
32+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
33+
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
34+
import org.apache.hadoop.ipc.CallerContext;
35+
import org.junit.After;
36+
import org.junit.AfterClass;
37+
import org.junit.Before;
38+
import org.junit.BeforeClass;
39+
import org.junit.Test;
40+
import org.mockito.Mockito;
41+
42+
import java.io.IOException;
43+
import java.util.EnumSet;
44+
import java.util.concurrent.TimeUnit;
45+
46+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
47+
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
48+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
49+
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
50+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
51+
import static org.junit.Assert.assertEquals;
52+
import static org.junit.Assert.assertTrue;
53+
54+
public class TestRouterAsyncCacheAdmin {
55+
private static Configuration routerConf;
56+
/** Federated HDFS cluster. */
57+
private static MiniRouterDFSCluster cluster;
58+
private static String ns0;
59+
60+
/** Random Router for this federated cluster. */
61+
private MiniRouterDFSCluster.RouterContext router;
62+
private FileSystem routerFs;
63+
private RouterRpcServer routerRpcServer;
64+
private RouterAsyncCacheAdmin asyncCacheAdmin;
65+
66+
@BeforeClass
67+
public static void setUpCluster() throws Exception {
68+
cluster = new MiniRouterDFSCluster(true, 1, 2,
69+
DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
70+
cluster.setNumDatanodesPerNameservice(3);
71+
72+
cluster.startCluster();
73+
74+
// Making one Namenode active per nameservice
75+
if (cluster.isHighAvailability()) {
76+
for (String ns : cluster.getNameservices()) {
77+
cluster.switchToActive(ns, NAMENODES[0]);
78+
cluster.switchToStandby(ns, NAMENODES[1]);
79+
}
80+
}
81+
// Start routers with only an RPC service
82+
routerConf = new RouterConfigBuilder()
83+
.rpc()
84+
.build();
85+
86+
// Reduce the number of RPC clients threads to overload the Router easy
87+
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
88+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
89+
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
90+
// We decrease the DN cache times to make the test faster
91+
routerConf.setTimeDuration(
92+
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
93+
cluster.addRouterOverrides(routerConf);
94+
// Start routers with only an RPC service
95+
cluster.startRouters();
96+
97+
// Register and verify all NNs with all routers
98+
cluster.registerNamenodes();
99+
cluster.waitNamenodeRegistration();
100+
cluster.waitActiveNamespaces();
101+
ns0 = cluster.getNameservices().get(0);
102+
}
103+
104+
@AfterClass
105+
public static void shutdownCluster() throws Exception {
106+
if (cluster != null) {
107+
cluster.shutdown();
108+
}
109+
}
110+
111+
@Before
112+
public void setUp() throws IOException {
113+
router = cluster.getRandomRouter();
114+
routerFs = router.getFileSystem();
115+
routerRpcServer = router.getRouterRpcServer();
116+
routerRpcServer.initAsyncThreadPool();
117+
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
118+
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
119+
routerRpcServer.getRPCMonitor(),
120+
routerRpcServer.getRouterStateIdContext());
121+
RouterRpcServer spy = Mockito.spy(routerRpcServer);
122+
Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
123+
asyncCacheAdmin = new RouterAsyncCacheAdmin(spy);
124+
125+
// Create mock locations
126+
MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
127+
resolver.addLocation("/", ns0, "/");
128+
FSDataOutputStream fsDataOutputStream = routerFs.create(
129+
new Path("/testCache.file"), true);
130+
fsDataOutputStream.write(new byte[1024]);
131+
fsDataOutputStream.close();
132+
}
133+
134+
@After
135+
public void tearDown() throws IOException {
136+
// clear client context
137+
CallerContext.setCurrent(null);
138+
boolean delete = routerFs.delete(new Path("/testCache.file"));
139+
assertTrue(delete);
140+
if (routerFs != null) {
141+
routerFs.close();
142+
}
143+
}
144+
145+
@Test
146+
public void testRouterAsyncCacheAdmin() throws Exception {
147+
asyncCacheAdmin.addCachePool(new CachePoolInfo("pool"));
148+
syncReturn(null);
149+
150+
CacheDirectiveInfo path = new CacheDirectiveInfo.Builder().
151+
setPool("pool").
152+
setPath(new Path("/testCache.file")).
153+
build();
154+
asyncCacheAdmin.addCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
155+
long result = syncReturn(long.class);
156+
assertEquals(1, result);
157+
158+
asyncCacheAdmin.listCachePools("");
159+
BatchedEntries<CachePoolEntry> cachePoolEntries = syncReturn(BatchedEntries.class);
160+
assertEquals("pool", cachePoolEntries.get(0).getInfo().getPoolName());
161+
162+
CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder().
163+
setPool("pool").
164+
build();
165+
asyncCacheAdmin.listCacheDirectives(0, filter);
166+
BatchedEntries<CacheDirectiveEntry> cacheDirectiveEntries = syncReturn(BatchedEntries.class);
167+
assertEquals(new Path("/testCache.file"), cacheDirectiveEntries.get(0).getInfo().getPath());
168+
169+
CachePoolInfo pool = new CachePoolInfo("pool").setOwnerName("pool_user");
170+
asyncCacheAdmin.modifyCachePool(pool);
171+
syncReturn(null);
172+
173+
asyncCacheAdmin.listCachePools("");
174+
cachePoolEntries = syncReturn(BatchedEntries.class);
175+
assertEquals("pool_user", cachePoolEntries.get(0).getInfo().getOwnerName());
176+
177+
path = new CacheDirectiveInfo.Builder().
178+
setPool("pool").
179+
setPath(new Path("/testCache.file")).
180+
setReplication((short) 2).
181+
setId(1L).
182+
build();
183+
asyncCacheAdmin.modifyCacheDirective(path, EnumSet.of(CacheFlag.FORCE));
184+
syncReturn(null);
185+
186+
asyncCacheAdmin.listCacheDirectives(0, filter);
187+
cacheDirectiveEntries = syncReturn(BatchedEntries.class);
188+
assertEquals(Short.valueOf((short) 2), cacheDirectiveEntries.get(0).getInfo().getReplication());
189+
190+
asyncCacheAdmin.removeCacheDirective(1L);
191+
syncReturn(null);
192+
asyncCacheAdmin.removeCachePool("pool");
193+
syncReturn(null);
194+
}
195+
}

0 commit comments

Comments
 (0)