85
85
import org .apache .hadoop .hdfs .server .federation .resolver .MountTableResolver ;
86
86
import org .apache .hadoop .hdfs .server .federation .resolver .RemoteLocation ;
87
87
import org .apache .hadoop .hdfs .server .federation .resolver .RouterResolveException ;
88
+ import org .apache .hadoop .hdfs .server .federation .router .async .AsyncErasureCoding ;
89
+ import org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncCacheAdmin ;
90
+ import org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncSnapshot ;
91
+ import org .apache .hadoop .hdfs .server .federation .router .async .RouterAsyncStoragePolicy ;
88
92
import org .apache .hadoop .hdfs .server .federation .router .security .RouterSecurityManager ;
89
93
import org .apache .hadoop .hdfs .server .federation .store .records .MountTable ;
90
94
import org .apache .hadoop .hdfs .server .namenode .NameNode ;
@@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol {
166
170
/** Router security manager to handle token operations. */
167
171
private RouterSecurityManager securityManager = null ;
168
172
169
- RouterClientProtocol (Configuration conf , RouterRpcServer rpcServer ) {
173
+ public RouterClientProtocol (Configuration conf , RouterRpcServer rpcServer ) {
170
174
this .rpcServer = rpcServer ;
171
175
this .rpcClient = rpcServer .getRPCClient ();
172
176
this .subclusterResolver = rpcServer .getSubclusterResolver ();
@@ -194,10 +198,17 @@ public class RouterClientProtocol implements ClientProtocol {
194
198
this .superGroup = conf .get (
195
199
DFSConfigKeys .DFS_PERMISSIONS_SUPERUSERGROUP_KEY ,
196
200
DFSConfigKeys .DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT );
197
- this .erasureCoding = new ErasureCoding (rpcServer );
198
- this .storagePolicy = new RouterStoragePolicy (rpcServer );
199
- this .snapshotProto = new RouterSnapshot (rpcServer );
200
- this .routerCacheAdmin = new RouterCacheAdmin (rpcServer );
201
+ if (rpcServer .isAsync ()) {
202
+ this .erasureCoding = new AsyncErasureCoding (rpcServer );
203
+ this .storagePolicy = new RouterAsyncStoragePolicy (rpcServer );
204
+ this .snapshotProto = new RouterAsyncSnapshot (rpcServer );
205
+ this .routerCacheAdmin = new RouterAsyncCacheAdmin (rpcServer );
206
+ } else {
207
+ this .erasureCoding = new ErasureCoding (rpcServer );
208
+ this .storagePolicy = new RouterStoragePolicy (rpcServer );
209
+ this .snapshotProto = new RouterSnapshot (rpcServer );
210
+ this .routerCacheAdmin = new RouterCacheAdmin (rpcServer );
211
+ }
201
212
this .securityManager = rpcServer .getRouterSecurityManager ();
202
213
this .rbfRename = new RouterFederationRename (rpcServer , conf );
203
214
this .defaultNameServiceEnabled = conf .getBoolean (
@@ -347,7 +358,7 @@ protected static boolean isUnavailableSubclusterException(
347
358
* @throws IOException If this path is not fault tolerant or the exception
348
359
* should not be retried (e.g., NSQuotaExceededException).
349
360
*/
350
- private List <RemoteLocation > checkFaultTolerantRetry (
361
+ protected List <RemoteLocation > checkFaultTolerantRetry (
351
362
final RemoteMethod method , final String src , final IOException ioe ,
352
363
final RemoteLocation excludeLoc , final List <RemoteLocation > locations )
353
364
throws IOException {
@@ -820,7 +831,7 @@ public void renewLease(String clientName, List<String> namespaces)
820
831
/**
821
832
* For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
822
833
*/
823
- private static class GetListingComparator
834
+ protected static class GetListingComparator
824
835
implements Comparator <byte []>, Serializable {
825
836
@ Override
826
837
public int compare (byte [] o1 , byte [] o2 ) {
@@ -831,6 +842,10 @@ public int compare(byte[] o1, byte[] o2) {
831
842
private static GetListingComparator comparator =
832
843
new GetListingComparator ();
833
844
845
+ public static GetListingComparator getComparator () {
846
+ return comparator ;
847
+ }
848
+
834
849
@ Override
835
850
public DirectoryListing getListing (String src , byte [] startAfter ,
836
851
boolean needLocation ) throws IOException {
@@ -1104,7 +1119,7 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
1104
1119
return mergeDtanodeStorageReport (dnSubcluster );
1105
1120
}
1106
1121
1107
- private DatanodeStorageReport [] mergeDtanodeStorageReport (
1122
+ protected DatanodeStorageReport [] mergeDtanodeStorageReport (
1108
1123
Map <String , DatanodeStorageReport []> dnSubcluster ) {
1109
1124
// Avoid repeating machines in multiple subclusters
1110
1125
Map <String , DatanodeStorageReport > datanodesMap = new LinkedHashMap <>();
@@ -1335,20 +1350,23 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio
1335
1350
}
1336
1351
1337
1352
/**
1338
- * Get all the locations of the path for {@link this #getContentSummary(String)}.
1353
+ * Get all the locations of the path for {@link RouterClientProtocol #getContentSummary(String)}.
1339
1354
* For example, there are some mount points:
1340
- * /a -> ns0 -> /a
1341
- * /a/b -> ns0 -> /a/b
1342
- * /a/b/c -> ns1 -> /a/b/c
1355
+ * <p>
1356
+ * /a - [ns0 - /a]
1357
+ * /a/b - [ns0 - /a/b]
1358
+ * /a/b/c - [ns1 - /a/b/c]
1359
+ * </p>
1343
1360
* When the path is '/a', the result of locations should be
1344
1361
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
1345
1362
* When the path is '/b', will throw NoLocationException.
1363
+ *
1346
1364
* @param path the path to get content summary
1347
1365
* @return one list contains all the remote location
1348
- * @throws IOException
1366
+ * @throws IOException if an I/O error occurs
1349
1367
*/
1350
1368
@ VisibleForTesting
1351
- List <RemoteLocation > getLocationsForContentSummary (String path ) throws IOException {
1369
+ protected List <RemoteLocation > getLocationsForContentSummary (String path ) throws IOException {
1352
1370
// Try to get all the locations of the path.
1353
1371
final Map <String , List <RemoteLocation >> ns2Locations = getAllLocations (path );
1354
1372
if (ns2Locations .isEmpty ()) {
@@ -2039,7 +2057,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
2039
2057
* replacement value.
2040
2058
* @throws IOException If the dst paths could not be determined.
2041
2059
*/
2042
- private RemoteParam getRenameDestinations (
2060
+ protected RemoteParam getRenameDestinations (
2043
2061
final List <RemoteLocation > srcLocations ,
2044
2062
final List <RemoteLocation > dstLocations ) throws IOException {
2045
2063
@@ -2087,7 +2105,7 @@ private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
2087
2105
* @param summaries Collection of individual summaries.
2088
2106
* @return Aggregated content summary.
2089
2107
*/
2090
- private ContentSummary aggregateContentSummary (
2108
+ protected ContentSummary aggregateContentSummary (
2091
2109
Collection <ContentSummary > summaries ) {
2092
2110
if (summaries .size () == 1 ) {
2093
2111
return summaries .iterator ().next ();
@@ -2142,7 +2160,7 @@ private ContentSummary aggregateContentSummary(
2142
2160
* everywhere.
2143
2161
* @throws IOException If all the locations throw an exception.
2144
2162
*/
2145
- private HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
2163
+ protected HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
2146
2164
final RemoteMethod method ) throws IOException {
2147
2165
return getFileInfoAll (locations , method , -1 );
2148
2166
}
@@ -2157,7 +2175,7 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
2157
2175
* everywhere.
2158
2176
* @throws IOException If all the locations throw an exception.
2159
2177
*/
2160
- private HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
2178
+ protected HdfsFileStatus getFileInfoAll (final List <RemoteLocation > locations ,
2161
2179
final RemoteMethod method , long timeOutMs ) throws IOException {
2162
2180
2163
2181
// Get the file info from everybody
@@ -2186,12 +2204,11 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
2186
2204
2187
2205
/**
2188
2206
* Get the permissions for the parent of a child with given permissions.
2189
- * Add implicit u+wx permission for parent. This is based on
2190
- * @{FSDirMkdirOp#addImplicitUwx}.
2207
+ * Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx.
2191
2208
* @param mask The permission mask of the child.
2192
2209
* @return The permission mask of the parent.
2193
2210
*/
2194
- private static FsPermission getParentPermission (final FsPermission mask ) {
2211
+ protected static FsPermission getParentPermission (final FsPermission mask ) {
2195
2212
FsPermission ret = new FsPermission (
2196
2213
mask .getUserAction ().or (FsAction .WRITE_EXECUTE ),
2197
2214
mask .getGroupAction (),
@@ -2208,7 +2225,7 @@ private static FsPermission getParentPermission(final FsPermission mask) {
2208
2225
* @return New HDFS file status representing a mount point.
2209
2226
*/
2210
2227
@ VisibleForTesting
2211
- HdfsFileStatus getMountPointStatus (
2228
+ protected HdfsFileStatus getMountPointStatus (
2212
2229
String name , int childrenNum , long date ) {
2213
2230
return getMountPointStatus (name , childrenNum , date , true );
2214
2231
}
@@ -2223,7 +2240,7 @@ HdfsFileStatus getMountPointStatus(
2223
2240
* @return New HDFS file status representing a mount point.
2224
2241
*/
2225
2242
@ VisibleForTesting
2226
- HdfsFileStatus getMountPointStatus (
2243
+ protected HdfsFileStatus getMountPointStatus (
2227
2244
String name , int childrenNum , long date , boolean setPath ) {
2228
2245
long modTime = date ;
2229
2246
long accessTime = date ;
@@ -2300,7 +2317,7 @@ HdfsFileStatus getMountPointStatus(
2300
2317
* @param path Name of the path to start checking dates from.
2301
2318
* @return Map with the modification dates for all sub-entries.
2302
2319
*/
2303
- private Map <String , Long > getMountPointDates (String path ) {
2320
+ protected Map <String , Long > getMountPointDates (String path ) {
2304
2321
Map <String , Long > ret = new TreeMap <>();
2305
2322
if (subclusterResolver instanceof MountTableResolver ) {
2306
2323
try {
@@ -2361,9 +2378,15 @@ private long getModifiedTime(Map<String, Long> ret, String path,
2361
2378
}
2362
2379
2363
2380
/**
2364
- * Get listing on remote locations.
2381
+ * Get a partial listing of the indicated directory.
2382
+ *
2383
+ * @param src the directory name
2384
+ * @param startAfter the name to start after
2385
+ * @param needLocation if blockLocations need to be returned
2386
+ * @return a partial listing starting after startAfter
2387
+ * @throws IOException if other I/O error occurred
2365
2388
*/
2366
- private List <RemoteResult <RemoteLocation , DirectoryListing >> getListingInt (
2389
+ protected List <RemoteResult <RemoteLocation , DirectoryListing >> getListingInt (
2367
2390
String src , byte [] startAfter , boolean needLocation ) throws IOException {
2368
2391
try {
2369
2392
List <RemoteLocation > locations =
@@ -2400,9 +2423,9 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
2400
2423
* @param startAfter starting listing from client, used to define listing
2401
2424
* start boundary
2402
2425
* @param remainingEntries how many entries left from subcluster
2403
- * @return
2426
+ * @return true if should add mount point, otherwise false;
2404
2427
*/
2405
- private static boolean shouldAddMountPoint (
2428
+ protected static boolean shouldAddMountPoint (
2406
2429
byte [] mountPoint , byte [] lastEntry , byte [] startAfter ,
2407
2430
int remainingEntries ) {
2408
2431
if (comparator .compare (mountPoint , startAfter ) > 0 &&
@@ -2425,7 +2448,7 @@ private static boolean shouldAddMountPoint(
2425
2448
* @throws IOException if unable to get the file status.
2426
2449
*/
2427
2450
@ VisibleForTesting
2428
- boolean isMultiDestDirectory (String src ) throws IOException {
2451
+ protected boolean isMultiDestDirectory (String src ) throws IOException {
2429
2452
try {
2430
2453
if (rpcServer .isPathAll (src )) {
2431
2454
List <RemoteLocation > locations ;
@@ -2449,4 +2472,56 @@ boolean isMultiDestDirectory(String src) throws IOException {
2449
2472
public int getRouterFederationRenameCount () {
2450
2473
return rbfRename .getRouterFederationRenameCount ();
2451
2474
}
2475
+
2476
+ public RouterRpcServer getRpcServer () {
2477
+ return rpcServer ;
2478
+ }
2479
+
2480
+ public RouterRpcClient getRpcClient () {
2481
+ return rpcClient ;
2482
+ }
2483
+
2484
+ public FileSubclusterResolver getSubclusterResolver () {
2485
+ return subclusterResolver ;
2486
+ }
2487
+
2488
+ public ActiveNamenodeResolver getNamenodeResolver () {
2489
+ return namenodeResolver ;
2490
+ }
2491
+
2492
+ public long getServerDefaultsLastUpdate () {
2493
+ return serverDefaultsLastUpdate ;
2494
+ }
2495
+
2496
+ public long getServerDefaultsValidityPeriod () {
2497
+ return serverDefaultsValidityPeriod ;
2498
+ }
2499
+
2500
+ public boolean isAllowPartialList () {
2501
+ return allowPartialList ;
2502
+ }
2503
+
2504
+ public long getMountStatusTimeOut () {
2505
+ return mountStatusTimeOut ;
2506
+ }
2507
+
2508
+ public String getSuperUser () {
2509
+ return superUser ;
2510
+ }
2511
+
2512
+ public String getSuperGroup () {
2513
+ return superGroup ;
2514
+ }
2515
+
2516
+ public RouterStoragePolicy getStoragePolicy () {
2517
+ return storagePolicy ;
2518
+ }
2519
+
2520
+ public void setServerDefaultsLastUpdate (long serverDefaultsLastUpdate ) {
2521
+ this .serverDefaultsLastUpdate = serverDefaultsLastUpdate ;
2522
+ }
2523
+
2524
+ public RouterFederationRename getRbfRename () {
2525
+ return rbfRename ;
2526
+ }
2452
2527
}
0 commit comments