Skip to content

Commit c48db62

Browse files
HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. (#7188)
Co-authored-by: Jian Zhang <[email protected]>
1 parent be06adc commit c48db62

File tree

7 files changed

+1352
-38
lines changed

7 files changed

+1352
-38
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 104 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@
8585
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
8686
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
8787
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
88+
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
89+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin;
90+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot;
91+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy;
8892
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
8993
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
9094
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol {
166170
/** Router security manager to handle token operations. */
167171
private RouterSecurityManager securityManager = null;
168172

169-
RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
173+
public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
170174
this.rpcServer = rpcServer;
171175
this.rpcClient = rpcServer.getRPCClient();
172176
this.subclusterResolver = rpcServer.getSubclusterResolver();
@@ -194,10 +198,17 @@ public class RouterClientProtocol implements ClientProtocol {
194198
this.superGroup = conf.get(
195199
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
196200
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
197-
this.erasureCoding = new ErasureCoding(rpcServer);
198-
this.storagePolicy = new RouterStoragePolicy(rpcServer);
199-
this.snapshotProto = new RouterSnapshot(rpcServer);
200-
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
201+
if (rpcServer.isAsync()) {
202+
this.erasureCoding = new AsyncErasureCoding(rpcServer);
203+
this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer);
204+
this.snapshotProto = new RouterAsyncSnapshot(rpcServer);
205+
this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer);
206+
} else {
207+
this.erasureCoding = new ErasureCoding(rpcServer);
208+
this.storagePolicy = new RouterStoragePolicy(rpcServer);
209+
this.snapshotProto = new RouterSnapshot(rpcServer);
210+
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
211+
}
201212
this.securityManager = rpcServer.getRouterSecurityManager();
202213
this.rbfRename = new RouterFederationRename(rpcServer, conf);
203214
this.defaultNameServiceEnabled = conf.getBoolean(
@@ -347,7 +358,7 @@ protected static boolean isUnavailableSubclusterException(
347358
* @throws IOException If this path is not fault tolerant or the exception
348359
* should not be retried (e.g., NSQuotaExceededException).
349360
*/
350-
private List<RemoteLocation> checkFaultTolerantRetry(
361+
protected List<RemoteLocation> checkFaultTolerantRetry(
351362
final RemoteMethod method, final String src, final IOException ioe,
352363
final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
353364
throws IOException {
@@ -820,7 +831,7 @@ public void renewLease(String clientName, List<String> namespaces)
820831
/**
821832
* For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
822833
*/
823-
private static class GetListingComparator
834+
protected static class GetListingComparator
824835
implements Comparator<byte[]>, Serializable {
825836
@Override
826837
public int compare(byte[] o1, byte[] o2) {
@@ -831,6 +842,10 @@ public int compare(byte[] o1, byte[] o2) {
831842
private static GetListingComparator comparator =
832843
new GetListingComparator();
833844

845+
public static GetListingComparator getComparator() {
846+
return comparator;
847+
}
848+
834849
@Override
835850
public DirectoryListing getListing(String src, byte[] startAfter,
836851
boolean needLocation) throws IOException {
@@ -1104,7 +1119,7 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
11041119
return mergeDtanodeStorageReport(dnSubcluster);
11051120
}
11061121

1107-
private DatanodeStorageReport[] mergeDtanodeStorageReport(
1122+
protected DatanodeStorageReport[] mergeDtanodeStorageReport(
11081123
Map<String, DatanodeStorageReport[]> dnSubcluster) {
11091124
// Avoid repeating machines in multiple subclusters
11101125
Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
@@ -1335,20 +1350,23 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio
13351350
}
13361351

13371352
/**
1338-
* Get all the locations of the path for {@link this#getContentSummary(String)}.
1353+
* Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}.
13391354
* For example, there are some mount points:
1340-
* /a -> ns0 -> /a
1341-
* /a/b -> ns0 -> /a/b
1342-
* /a/b/c -> ns1 -> /a/b/c
1355+
* <p>
1356+
* /a - [ns0 - /a]
1357+
* /a/b - [ns0 - /a/b]
1358+
* /a/b/c - [ns1 - /a/b/c]
1359+
* </p>
13431360
* When the path is '/a', the result of locations should be
13441361
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
13451362
* When the path is '/b', will throw NoLocationException.
1363+
*
13461364
* @param path the path to get content summary
13471365
* @return one list contains all the remote location
1348-
* @throws IOException
1366+
* @throws IOException if an I/O error occurs
13491367
*/
13501368
@VisibleForTesting
1351-
List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
1369+
protected List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
13521370
// Try to get all the locations of the path.
13531371
final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path);
13541372
if (ns2Locations.isEmpty()) {
@@ -2039,7 +2057,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
20392057
* replacement value.
20402058
* @throws IOException If the dst paths could not be determined.
20412059
*/
2042-
private RemoteParam getRenameDestinations(
2060+
protected RemoteParam getRenameDestinations(
20432061
final List<RemoteLocation> srcLocations,
20442062
final List<RemoteLocation> dstLocations) throws IOException {
20452063

@@ -2087,7 +2105,7 @@ private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
20872105
* @param summaries Collection of individual summaries.
20882106
* @return Aggregated content summary.
20892107
*/
2090-
private ContentSummary aggregateContentSummary(
2108+
protected ContentSummary aggregateContentSummary(
20912109
Collection<ContentSummary> summaries) {
20922110
if (summaries.size() == 1) {
20932111
return summaries.iterator().next();
@@ -2142,7 +2160,7 @@ private ContentSummary aggregateContentSummary(
21422160
* everywhere.
21432161
* @throws IOException If all the locations throw an exception.
21442162
*/
2145-
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
2163+
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21462164
final RemoteMethod method) throws IOException {
21472165
return getFileInfoAll(locations, method, -1);
21482166
}
@@ -2157,7 +2175,7 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21572175
* everywhere.
21582176
* @throws IOException If all the locations throw an exception.
21592177
*/
2160-
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
2178+
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21612179
final RemoteMethod method, long timeOutMs) throws IOException {
21622180

21632181
// Get the file info from everybody
@@ -2186,12 +2204,11 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
21862204

21872205
/**
21882206
* Get the permissions for the parent of a child with given permissions.
2189-
* Add implicit u+wx permission for parent. This is based on
2190-
* @{FSDirMkdirOp#addImplicitUwx}.
2207+
* Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx.
21912208
* @param mask The permission mask of the child.
21922209
* @return The permission mask of the parent.
21932210
*/
2194-
private static FsPermission getParentPermission(final FsPermission mask) {
2211+
protected static FsPermission getParentPermission(final FsPermission mask) {
21952212
FsPermission ret = new FsPermission(
21962213
mask.getUserAction().or(FsAction.WRITE_EXECUTE),
21972214
mask.getGroupAction(),
@@ -2208,7 +2225,7 @@ private static FsPermission getParentPermission(final FsPermission mask) {
22082225
* @return New HDFS file status representing a mount point.
22092226
*/
22102227
@VisibleForTesting
2211-
HdfsFileStatus getMountPointStatus(
2228+
protected HdfsFileStatus getMountPointStatus(
22122229
String name, int childrenNum, long date) {
22132230
return getMountPointStatus(name, childrenNum, date, true);
22142231
}
@@ -2223,7 +2240,7 @@ HdfsFileStatus getMountPointStatus(
22232240
* @return New HDFS file status representing a mount point.
22242241
*/
22252242
@VisibleForTesting
2226-
HdfsFileStatus getMountPointStatus(
2243+
protected HdfsFileStatus getMountPointStatus(
22272244
String name, int childrenNum, long date, boolean setPath) {
22282245
long modTime = date;
22292246
long accessTime = date;
@@ -2300,7 +2317,7 @@ HdfsFileStatus getMountPointStatus(
23002317
* @param path Name of the path to start checking dates from.
23012318
* @return Map with the modification dates for all sub-entries.
23022319
*/
2303-
private Map<String, Long> getMountPointDates(String path) {
2320+
protected Map<String, Long> getMountPointDates(String path) {
23042321
Map<String, Long> ret = new TreeMap<>();
23052322
if (subclusterResolver instanceof MountTableResolver) {
23062323
try {
@@ -2361,9 +2378,15 @@ private long getModifiedTime(Map<String, Long> ret, String path,
23612378
}
23622379

23632380
/**
2364-
* Get listing on remote locations.
2381+
* Get a partial listing of the indicated directory.
2382+
*
2383+
* @param src the directory name
2384+
* @param startAfter the name to start after
2385+
* @param needLocation if blockLocations need to be returned
2386+
* @return a partial listing starting after startAfter
2387+
* @throws IOException if other I/O error occurred
23652388
*/
2366-
private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
2389+
protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
23672390
String src, byte[] startAfter, boolean needLocation) throws IOException {
23682391
try {
23692392
List<RemoteLocation> locations =
@@ -2400,9 +2423,9 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
24002423
* @param startAfter starting listing from client, used to define listing
24012424
* start boundary
24022425
* @param remainingEntries how many entries left from subcluster
2403-
* @return
2426+
* @return true if should add mount point, otherwise false;
24042427
*/
2405-
private static boolean shouldAddMountPoint(
2428+
protected static boolean shouldAddMountPoint(
24062429
byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
24072430
int remainingEntries) {
24082431
if (comparator.compare(mountPoint, startAfter) > 0 &&
@@ -2425,7 +2448,7 @@ private static boolean shouldAddMountPoint(
24252448
* @throws IOException if unable to get the file status.
24262449
*/
24272450
@VisibleForTesting
2428-
boolean isMultiDestDirectory(String src) throws IOException {
2451+
protected boolean isMultiDestDirectory(String src) throws IOException {
24292452
try {
24302453
if (rpcServer.isPathAll(src)) {
24312454
List<RemoteLocation> locations;
@@ -2449,4 +2472,56 @@ boolean isMultiDestDirectory(String src) throws IOException {
24492472
public int getRouterFederationRenameCount() {
24502473
return rbfRename.getRouterFederationRenameCount();
24512474
}
2475+
2476+
public RouterRpcServer getRpcServer() {
2477+
return rpcServer;
2478+
}
2479+
2480+
public RouterRpcClient getRpcClient() {
2481+
return rpcClient;
2482+
}
2483+
2484+
public FileSubclusterResolver getSubclusterResolver() {
2485+
return subclusterResolver;
2486+
}
2487+
2488+
public ActiveNamenodeResolver getNamenodeResolver() {
2489+
return namenodeResolver;
2490+
}
2491+
2492+
public long getServerDefaultsLastUpdate() {
2493+
return serverDefaultsLastUpdate;
2494+
}
2495+
2496+
public long getServerDefaultsValidityPeriod() {
2497+
return serverDefaultsValidityPeriod;
2498+
}
2499+
2500+
public boolean isAllowPartialList() {
2501+
return allowPartialList;
2502+
}
2503+
2504+
public long getMountStatusTimeOut() {
2505+
return mountStatusTimeOut;
2506+
}
2507+
2508+
public String getSuperUser() {
2509+
return superUser;
2510+
}
2511+
2512+
public String getSuperGroup() {
2513+
return superGroup;
2514+
}
2515+
2516+
public RouterStoragePolicy getStoragePolicy() {
2517+
return storagePolicy;
2518+
}
2519+
2520+
public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
2521+
this.serverDefaultsLastUpdate = serverDefaultsLastUpdate;
2522+
}
2523+
2524+
public RouterFederationRename getRbfRename() {
2525+
return rbfRename;
2526+
}
24522527
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) {
9393
* @throws IOException if rename fails.
9494
* @return true if rename succeeds.
9595
*/
96-
boolean routerFedRename(final String src, final String dst,
96+
public boolean routerFedRename(final String src, final String dst,
9797
final List<RemoteLocation> srcLocations,
9898
final List<RemoteLocation> dstLocations) throws IOException {
9999
if (!rpcServer.isEnableRenameAcrossNamespace()) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1969,7 +1969,7 @@ protected boolean isObserverReadEligible(String nsId, Method method) {
19691969
* @param nsId namespaceID
19701970
* @return whether the 'namespace' has observer reads enabled.
19711971
*/
1972-
boolean isNamespaceObserverReadEligible(String nsId) {
1972+
public boolean isNamespaceObserverReadEligible(String nsId) {
19731973
return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
19741974
}
19751975

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@
7575
import org.apache.hadoop.hdfs.HAUtil;
7676
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
7777
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
78+
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota;
79+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol;
80+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol;
7881
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
82+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol;
7983
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
8084
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
8185
import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
@@ -288,6 +292,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
288292
* @param fileResolver File resolver to resolve file paths to subclusters.
289293
* @throws IOException If the RPC server could not be created.
290294
*/
295+
@SuppressWarnings("checkstyle:MethodLength")
291296
public RouterRpcServer(Configuration conf, Router router,
292297
ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
293298
throws IOException {
@@ -424,14 +429,19 @@ public RouterRpcServer(Configuration conf, Router router,
424429
if (this.enableAsync) {
425430
this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
426431
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
432+
this.clientProto = new RouterAsyncClientProtocol(conf, this);
433+
this.nnProto = new RouterAsyncNamenodeProtocol(this);
434+
this.routerProto = new RouterAsyncUserProtocol(this);
435+
this.quotaCall = new AsyncQuota(this.router, this);
427436
} else {
428437
this.rpcClient = new RouterRpcClient(this.conf, this.router,
429438
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
439+
this.clientProto = new RouterClientProtocol(conf, this);
440+
this.nnProto = new RouterNamenodeProtocol(this);
441+
this.routerProto = new RouterUserProtocol(this);
442+
this.quotaCall = new Quota(this.router, this);
430443
}
431-
this.nnProto = new RouterNamenodeProtocol(this);
432-
this.quotaCall = new Quota(this.router, this);
433-
this.clientProto = new RouterClientProtocol(conf, this);
434-
this.routerProto = new RouterUserProtocol(this);
444+
435445
long dnCacheExpire = conf.getTimeDuration(
436446
DN_REPORT_CACHE_EXPIRE,
437447
DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
@@ -2193,7 +2203,7 @@ public FederationRPCMetrics getRPCMetrics() {
21932203
* @param path Path to check.
21942204
* @return If a path should be in all subclusters.
21952205
*/
2196-
boolean isPathAll(final String path) {
2206+
public boolean isPathAll(final String path) {
21972207
MountTable entry = getMountTable(path);
21982208
return entry != null && entry.isAll();
21992209
}

0 commit comments

Comments
 (0)