Skip to content

HDFS-17640.[ARR] RouterClientProtocol supports asynchronous rpc. #7188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncErasureCoding;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncCacheAdmin;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncSnapshot;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncStoragePolicy;
import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
Expand Down Expand Up @@ -166,7 +170,7 @@ public class RouterClientProtocol implements ClientProtocol {
/** Router security manager to handle token operations. */
private RouterSecurityManager securityManager = null;

RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
public RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
this.rpcServer = rpcServer;
this.rpcClient = rpcServer.getRPCClient();
this.subclusterResolver = rpcServer.getSubclusterResolver();
Expand Down Expand Up @@ -194,10 +198,17 @@ public class RouterClientProtocol implements ClientProtocol {
this.superGroup = conf.get(
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.erasureCoding = new ErasureCoding(rpcServer);
this.storagePolicy = new RouterStoragePolicy(rpcServer);
this.snapshotProto = new RouterSnapshot(rpcServer);
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
if (rpcServer.isAsync()) {
this.erasureCoding = new AsyncErasureCoding(rpcServer);
this.storagePolicy = new RouterAsyncStoragePolicy(rpcServer);
this.snapshotProto = new RouterAsyncSnapshot(rpcServer);
this.routerCacheAdmin = new RouterAsyncCacheAdmin(rpcServer);
} else {
this.erasureCoding = new ErasureCoding(rpcServer);
this.storagePolicy = new RouterStoragePolicy(rpcServer);
this.snapshotProto = new RouterSnapshot(rpcServer);
this.routerCacheAdmin = new RouterCacheAdmin(rpcServer);
}
this.securityManager = rpcServer.getRouterSecurityManager();
this.rbfRename = new RouterFederationRename(rpcServer, conf);
this.defaultNameServiceEnabled = conf.getBoolean(
Expand Down Expand Up @@ -347,7 +358,7 @@ protected static boolean isUnavailableSubclusterException(
* @throws IOException If this path is not fault tolerant or the exception
* should not be retried (e.g., NSQuotaExceededException).
*/
private List<RemoteLocation> checkFaultTolerantRetry(
protected List<RemoteLocation> checkFaultTolerantRetry(
final RemoteMethod method, final String src, final IOException ioe,
final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
throws IOException {
Expand Down Expand Up @@ -820,7 +831,7 @@ public void renewLease(String clientName, List<String> namespaces)
/**
* For {@link #getListing(String,byte[],boolean) GetLisiting} to sort results.
*/
private static class GetListingComparator
protected static class GetListingComparator
implements Comparator<byte[]>, Serializable {
@Override
public int compare(byte[] o1, byte[] o2) {
Expand All @@ -831,6 +842,10 @@ public int compare(byte[] o1, byte[] o2) {
private static GetListingComparator comparator =
new GetListingComparator();

public static GetListingComparator getComparator() {
return comparator;
}

@Override
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws IOException {
Expand Down Expand Up @@ -1104,7 +1119,7 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
return mergeDtanodeStorageReport(dnSubcluster);
}

private DatanodeStorageReport[] mergeDtanodeStorageReport(
protected DatanodeStorageReport[] mergeDtanodeStorageReport(
Map<String, DatanodeStorageReport[]> dnSubcluster) {
// Avoid repeating machines in multiple subclusters
Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -1335,20 +1350,23 @@ Map<String, List<RemoteLocation>> getAllLocations(String path) throws IOExceptio
}

/**
* Get all the locations of the path for {@link this#getContentSummary(String)}.
* Get all the locations of the path for {@link RouterClientProtocol#getContentSummary(String)}.
* For example, there are some mount points:
* /a -> ns0 -> /a
* /a/b -> ns0 -> /a/b
* /a/b/c -> ns1 -> /a/b/c
* <p>
* /a - [ns0 - /a]
* /a/b - [ns0 - /a/b]
* /a/b/c - [ns1 - /a/b/c]
* </p>
* When the path is '/a', the result of locations should be
* [RemoteLocation('/a', ns0, '/a'), RemoteLocation('/a/b/c', ns1, '/a/b/c')]
* When the path is '/b', will throw NoLocationException.
*
* @param path the path to get content summary
* @return one list contains all the remote location
* @throws IOException
* @throws IOException if an I/O error occurs
*/
@VisibleForTesting
List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
protected List<RemoteLocation> getLocationsForContentSummary(String path) throws IOException {
// Try to get all the locations of the path.
final Map<String, List<RemoteLocation>> ns2Locations = getAllLocations(path);
if (ns2Locations.isEmpty()) {
Expand Down Expand Up @@ -2039,7 +2057,7 @@ public HAServiceProtocol.HAServiceState getHAServiceState() {
* replacement value.
* @throws IOException If the dst paths could not be determined.
*/
private RemoteParam getRenameDestinations(
protected RemoteParam getRenameDestinations(
final List<RemoteLocation> srcLocations,
final List<RemoteLocation> dstLocations) throws IOException {

Expand Down Expand Up @@ -2087,7 +2105,7 @@ private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
* @param summaries Collection of individual summaries.
* @return Aggregated content summary.
*/
private ContentSummary aggregateContentSummary(
protected ContentSummary aggregateContentSummary(
Collection<ContentSummary> summaries) {
if (summaries.size() == 1) {
return summaries.iterator().next();
Expand Down Expand Up @@ -2142,7 +2160,7 @@ private ContentSummary aggregateContentSummary(
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method) throws IOException {
return getFileInfoAll(locations, method, -1);
}
Expand All @@ -2157,7 +2175,7 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method, long timeOutMs) throws IOException {

// Get the file info from everybody
Expand Down Expand Up @@ -2186,12 +2204,11 @@ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,

/**
* Get the permissions for the parent of a child with given permissions.
* Add implicit u+wx permission for parent. This is based on
* @{FSDirMkdirOp#addImplicitUwx}.
* Add implicit u+wx permission for parent. This is based on FSDirMkdirOp#addImplicitUwx.
* @param mask The permission mask of the child.
* @return The permission mask of the parent.
*/
private static FsPermission getParentPermission(final FsPermission mask) {
protected static FsPermission getParentPermission(final FsPermission mask) {
FsPermission ret = new FsPermission(
mask.getUserAction().or(FsAction.WRITE_EXECUTE),
mask.getGroupAction(),
Expand All @@ -2208,7 +2225,7 @@ private static FsPermission getParentPermission(final FsPermission mask) {
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
HdfsFileStatus getMountPointStatus(
protected HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date) {
return getMountPointStatus(name, childrenNum, date, true);
}
Expand All @@ -2223,7 +2240,7 @@ HdfsFileStatus getMountPointStatus(
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
HdfsFileStatus getMountPointStatus(
protected HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date, boolean setPath) {
long modTime = date;
long accessTime = date;
Expand Down Expand Up @@ -2300,7 +2317,7 @@ HdfsFileStatus getMountPointStatus(
* @param path Name of the path to start checking dates from.
* @return Map with the modification dates for all sub-entries.
*/
private Map<String, Long> getMountPointDates(String path) {
protected Map<String, Long> getMountPointDates(String path) {
Map<String, Long> ret = new TreeMap<>();
if (subclusterResolver instanceof MountTableResolver) {
try {
Expand Down Expand Up @@ -2361,9 +2378,15 @@ private long getModifiedTime(Map<String, Long> ret, String path,
}

/**
* Get listing on remote locations.
* Get a partial listing of the indicated directory.
*
* @param src the directory name
* @param startAfter the name to start after
* @param needLocation if blockLocations need to be returned
* @return a partial listing starting after startAfter
* @throws IOException if other I/O error occurred
*/
private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
String src, byte[] startAfter, boolean needLocation) throws IOException {
try {
List<RemoteLocation> locations =
Expand Down Expand Up @@ -2400,9 +2423,9 @@ private List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
* @param startAfter starting listing from client, used to define listing
* start boundary
* @param remainingEntries how many entries left from subcluster
* @return
* @return true if should add mount point, otherwise false;
*/
private static boolean shouldAddMountPoint(
protected static boolean shouldAddMountPoint(
byte[] mountPoint, byte[] lastEntry, byte[] startAfter,
int remainingEntries) {
if (comparator.compare(mountPoint, startAfter) > 0 &&
Expand All @@ -2425,7 +2448,7 @@ private static boolean shouldAddMountPoint(
* @throws IOException if unable to get the file status.
*/
@VisibleForTesting
boolean isMultiDestDirectory(String src) throws IOException {
protected boolean isMultiDestDirectory(String src) throws IOException {
try {
if (rpcServer.isPathAll(src)) {
List<RemoteLocation> locations;
Expand All @@ -2449,4 +2472,56 @@ boolean isMultiDestDirectory(String src) throws IOException {
public int getRouterFederationRenameCount() {
return rbfRename.getRouterFederationRenameCount();
}

public RouterRpcServer getRpcServer() {
return rpcServer;
}

public RouterRpcClient getRpcClient() {
return rpcClient;
}

public FileSubclusterResolver getSubclusterResolver() {
return subclusterResolver;
}

public ActiveNamenodeResolver getNamenodeResolver() {
return namenodeResolver;
}

public long getServerDefaultsLastUpdate() {
return serverDefaultsLastUpdate;
}

public long getServerDefaultsValidityPeriod() {
return serverDefaultsValidityPeriod;
}

public boolean isAllowPartialList() {
return allowPartialList;
}

public long getMountStatusTimeOut() {
return mountStatusTimeOut;
}

public String getSuperUser() {
return superUser;
}

public String getSuperGroup() {
return superGroup;
}

public RouterStoragePolicy getStoragePolicy() {
return storagePolicy;
}

public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
this.serverDefaultsLastUpdate = serverDefaultsLastUpdate;
}

public RouterFederationRename getRbfRename() {
return rbfRename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public RouterFederationRename(RouterRpcServer rpcServer, Configuration conf) {
* @throws IOException if rename fails.
* @return true if rename succeeds.
*/
boolean routerFedRename(final String src, final String dst,
public boolean routerFedRename(final String src, final String dst,
final List<RemoteLocation> srcLocations,
final List<RemoteLocation> dstLocations) throws IOException {
if (!rpcServer.isEnableRenameAcrossNamespace()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,7 @@ protected boolean isObserverReadEligible(String nsId, Method method) {
* @param nsId namespaceID
* @return whether the 'namespace' has observer reads enabled.
*/
boolean isNamespaceObserverReadEligible(String nsId) {
public boolean isNamespaceObserverReadEligible(String nsId) {
return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncQuota;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncClientProtocol;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncNamenodeProtocol;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncUserProtocol;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
Expand Down Expand Up @@ -288,6 +292,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
* @param fileResolver File resolver to resolve file paths to subclusters.
* @throws IOException If the RPC server could not be created.
*/
@SuppressWarnings("checkstyle:MethodLength")
public RouterRpcServer(Configuration conf, Router router,
ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver)
throws IOException {
Expand Down Expand Up @@ -424,14 +429,19 @@ public RouterRpcServer(Configuration conf, Router router,
if (this.enableAsync) {
this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
this.clientProto = new RouterAsyncClientProtocol(conf, this);
this.nnProto = new RouterAsyncNamenodeProtocol(this);
this.routerProto = new RouterAsyncUserProtocol(this);
this.quotaCall = new AsyncQuota(this.router, this);
} else {
this.rpcClient = new RouterRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
this.clientProto = new RouterClientProtocol(conf, this);
this.nnProto = new RouterNamenodeProtocol(this);
this.routerProto = new RouterUserProtocol(this);
this.quotaCall = new Quota(this.router, this);
}
this.nnProto = new RouterNamenodeProtocol(this);
this.quotaCall = new Quota(this.router, this);
this.clientProto = new RouterClientProtocol(conf, this);
this.routerProto = new RouterUserProtocol(this);

long dnCacheExpire = conf.getTimeDuration(
DN_REPORT_CACHE_EXPIRE,
DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -2193,7 +2203,7 @@ public FederationRPCMetrics getRPCMetrics() {
* @param path Path to check.
* @return If a path should be in all subclusters.
*/
boolean isPathAll(final String path) {
public boolean isPathAll(final String path) {
MountTable entry = getMountTable(path);
return entry != null && entry.isAll();
}
Expand Down
Loading
Loading