Skip to content

Commit cd37abb

Browse files
committed
HDFS-17672. [ARR] Move asynchronous related classes to the async package. (apache#7184). Contributed by Jian Zhang.
1 parent ee1dd54 commit cd37abb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+649
-787
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.hadoop.hdfs.protocolPB;
2020

2121
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
22-
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
22+
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
2323
import org.apache.hadoop.io.Writable;
2424
import org.apache.hadoop.ipc.Client;
2525
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
@@ -32,9 +32,9 @@
3232
import java.util.concurrent.CompletableFuture;
3333
import java.util.concurrent.Executor;
3434

35-
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
36-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
37-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
35+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
36+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
37+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
3838
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
3939

4040
/**

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public QuotaUsage getQuotaUsage(String path) throws IOException {
139139
* @return quota usage for each remote location.
140140
* @throws IOException If the quota system is disabled.
141141
*/
142-
Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
142+
protected Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
143143
throws IOException {
144144
rpcServer.checkOperation(OperationCategory.READ);
145145
if (!router.isQuotaEnabled()) {
@@ -252,8 +252,9 @@ protected List<RemoteLocation> getValidQuotaLocations(String path)
252252
* @param path Federation path of the results.
253253
* @param results Quota query result.
254254
* @return Aggregated Quota.
255+
* @throws IOException If the quota system is disabled.
255256
*/
256-
QuotaUsage aggregateQuota(String path,
257+
protected QuotaUsage aggregateQuota(String path,
257258
Map<RemoteLocation, QuotaUsage> results) throws IOException {
258259
long nsCount = 0;
259260
long ssCount = 0;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
2222
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
2323
import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
24-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
24+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
2525

2626
import java.io.IOException;
2727
import java.net.InetSocketAddress;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
44+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
4545

4646
/**
4747
* Service to periodically update the {@link RouterQuotaUsage}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,7 +1940,7 @@ private String getCurrentFairnessPolicyControllerClassName() {
19401940
* @return A prioritized list of NNs to use for communication.
19411941
* @throws IOException If a NN cannot be located for the nameservice ID.
19421942
*/
1943-
protected List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
1943+
public List<? extends FederationNamenodeContext> getOrderedNamenodes(String nsId,
19441944
boolean isObserverRead) throws IOException {
19451945
final List<? extends FederationNamenodeContext> namenodes;
19461946

@@ -2047,39 +2047,39 @@ protected static class ExecutionStatus {
20472047
private static final byte SHOULD_USE_OBSERVER_BIT = 2;
20482048
private static final byte COMPLETE_BIT = 4;
20492049

2050-
ExecutionStatus() {
2050+
public ExecutionStatus() {
20512051
this(false, false);
20522052
}
20532053

2054-
ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
2054+
public ExecutionStatus(boolean failOver, boolean shouldUseObserver) {
20552055
this.flag = 0;
20562056
setFailOver(failOver);
20572057
setShouldUseObserver(shouldUseObserver);
20582058
setComplete(false);
20592059
}
20602060

2061-
private void setFailOver(boolean failOver) {
2061+
public void setFailOver(boolean failOver) {
20622062
flag = (byte) (failOver ? (flag | FAIL_OVER_BIT) : (flag & ~FAIL_OVER_BIT));
20632063
}
20642064

2065-
private void setShouldUseObserver(boolean shouldUseObserver) {
2065+
public void setShouldUseObserver(boolean shouldUseObserver) {
20662066
flag = (byte) (shouldUseObserver ?
20672067
(flag | SHOULD_USE_OBSERVER_BIT) : (flag & ~SHOULD_USE_OBSERVER_BIT));
20682068
}
20692069

2070-
void setComplete(boolean complete) {
2070+
public void setComplete(boolean complete) {
20712071
flag = (byte) (complete ? (flag | COMPLETE_BIT) : (flag & ~COMPLETE_BIT));
20722072
}
20732073

2074-
boolean isFailOver() {
2074+
public boolean isFailOver() {
20752075
return (flag & FAIL_OVER_BIT) != 0;
20762076
}
20772077

2078-
boolean isShouldUseObserver() {
2078+
public boolean isShouldUseObserver() {
20792079
return (flag & SHOULD_USE_OBSERVER_BIT) != 0;
20802080
}
20812081

2082-
boolean isComplete() {
2082+
public boolean isComplete() {
20832083
return (flag & COMPLETE_BIT) != 0;
20842084
}
20852085
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@
3737
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
3838
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
3939
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
40-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
41-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
42-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
43-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
44-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
45-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
40+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
41+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
42+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
43+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
44+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
45+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
4646
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
4747

4848
import java.io.FileNotFoundException;
@@ -75,9 +75,10 @@
7575
import org.apache.hadoop.hdfs.HAUtil;
7676
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
7777
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
78-
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
79-
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
80-
import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction;
78+
import org.apache.hadoop.hdfs.server.federation.router.async.RouterAsyncRpcClient;
79+
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
80+
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncCatchFunction;
81+
import org.apache.hadoop.hdfs.server.federation.router.async.utils.CatchFunction;
8182
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
8283
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
8384
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -686,7 +687,7 @@ public InetSocketAddress getRpcAddress() {
686687
* client requests.
687688
* @throws UnsupportedOperationException If the operation is not supported.
688689
*/
689-
void checkOperation(OperationCategory op, boolean supported)
690+
public void checkOperation(OperationCategory op, boolean supported)
690691
throws StandbyException, UnsupportedOperationException {
691692
checkOperation(op);
692693

@@ -1032,7 +1033,7 @@ RemoteLocation getCreateLocation(
10321033
* @return The remote location for this file.
10331034
* @throws IOException If the file has no creation location.
10341035
*/
1035-
RemoteLocation getCreateLocationAsync(
1036+
public RemoteLocation getCreateLocationAsync(
10361037
final String src, final List<RemoteLocation> locations)
10371038
throws IOException {
10381039

@@ -1995,7 +1996,7 @@ public Long getNextSPSPath() throws IOException {
19951996
* @return Prioritized list of locations in the federated cluster.
19961997
* @throws IOException If the location for this path cannot be determined.
19971998
*/
1998-
protected List<RemoteLocation> getLocationsForPath(String path,
1999+
public List<RemoteLocation> getLocationsForPath(String path,
19992000
boolean failIfLocked) throws IOException {
20002001
return getLocationsForPath(path, failIfLocked, true);
20012002
}
@@ -2010,7 +2011,7 @@ protected List<RemoteLocation> getLocationsForPath(String path,
20102011
* @return Prioritized list of locations in the federated cluster.
20112012
* @throws IOException If the location for this path cannot be determined.
20122013
*/
2013-
protected List<RemoteLocation> getLocationsForPath(String path,
2014+
public List<RemoteLocation> getLocationsForPath(String path,
20142015
boolean failIfLocked, boolean needQuotaVerify) throws IOException {
20152016
try {
20162017
if (failIfLocked) {
@@ -2227,9 +2228,9 @@ private MountTable getMountTable(final String path){
22272228
* mount entry.
22282229
* @param path The path on which the operation need to be invoked.
22292230
* @return true if the call is supposed to invoked on all locations.
2230-
* @throws IOException
2231+
* @throws IOException If an I/O error occurs.
22312232
*/
2232-
boolean isInvokeConcurrent(final String path) throws IOException {
2233+
public boolean isInvokeConcurrent(final String path) throws IOException {
22332234
if (subclusterResolver instanceof MountTableResolver) {
22342235
MountTableResolver mountTableResolver =
22352236
(MountTableResolver) subclusterResolver;

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
*/
4949
@InterfaceAudience.Private
5050
@InterfaceStability.Evolving
51-
class RouterStateIdContext implements AlignmentContext {
51+
public class RouterStateIdContext implements AlignmentContext {
5252

5353
private final HashSet<String> coordinatedMethods;
5454
/**
@@ -93,6 +93,8 @@ class RouterStateIdContext implements AlignmentContext {
9393

9494
/**
9595
* Adds the {@link #namespaceIdMap} to the response header that will be sent to a client.
96+
*
97+
* @param headerBuilder the response header that will be sent to a client.
9698
*/
9799
public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) {
98100
if (namespaceIdMap.isEmpty()) {
@@ -110,7 +112,8 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder)
110112
}
111113

112114
public LongAccumulator getNamespaceStateId(String nsId) {
113-
return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
115+
return namespaceIdMap.computeIfAbsent(nsId,
116+
key -> new LongAccumulator(Math::max, Long.MIN_VALUE));
114117
}
115118

116119
public List<String> getNamespaces() {
@@ -127,6 +130,9 @@ public void removeNamespaceStateId(String nsId) {
127130

128131
/**
129132
* Utility function to parse routerFederatedState field in RPC headers.
133+
*
134+
* @param byteString the byte string of routerFederatedState.
135+
* @return the router federated state map.
130136
*/
131137
public static Map<String, Long> getRouterFederatedStateMap(ByteString byteString) {
132138
if (byteString != null) {
@@ -148,7 +154,8 @@ public static long getClientStateIdFromCurrentCall(String nsId) {
148154
if (call != null) {
149155
ByteString callFederatedNamespaceState = call.getFederatedNamespaceState();
150156
if (callFederatedNamespaceState != null) {
151-
Map<String, Long> clientFederatedStateIds = getRouterFederatedStateMap(callFederatedNamespaceState);
157+
Map<String, Long> clientFederatedStateIds =
158+
getRouterFederatedStateMap(callFederatedNamespaceState);
152159
clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE);
153160
}
154161
}
Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.hadoop.hdfs.server.federation.router;
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
1919

2020
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
2121
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
@@ -25,7 +25,12 @@
2525
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
2626
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
2727
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
28-
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
28+
import org.apache.hadoop.hdfs.server.federation.router.ErasureCoding;
29+
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
30+
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
31+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
32+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
33+
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
2934
import org.apache.hadoop.hdfs.server.namenode.NameNode;
3035

3136
import java.io.IOException;
@@ -36,9 +41,15 @@
3641
import java.util.Set;
3742

3843
import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
39-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
40-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
44+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
45+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
4146

47+
/**
48+
* Provides asynchronous operations for erasure coding in HDFS Federation.
49+
* This class extends {@link org.apache.hadoop.hdfs.server.federation.router.ErasureCoding}
50+
* and overrides its methods to perform erasure coding operations in a non-blocking manner,
51+
* allowing for concurrent execution and improved performance.
52+
*/
4253
public class AsyncErasureCoding extends ErasureCoding {
4354
/** RPC server to receive client calls. */
4455
private final RouterRpcServer rpcServer;
@@ -54,6 +65,17 @@ public AsyncErasureCoding(RouterRpcServer server) {
5465
this.namenodeResolver = this.rpcClient.getNamenodeResolver();
5566
}
5667

68+
/**
69+
* Asynchronously get an array of all erasure coding policies.
70+
* This method checks the operation category and then invokes the
71+
* getErasureCodingPolicies method concurrently across all namespaces.
72+
* <p>
73+
* The results are merged and returned as an array of ErasureCodingPolicyInfo.
74+
*
75+
* @return Array of ErasureCodingPolicyInfo.
76+
* @throws IOException If an I/O error occurs.
77+
*/
78+
@Override
5779
public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
5880
throws IOException {
5981
rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -70,6 +92,16 @@ public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
7092
return asyncReturn(ErasureCodingPolicyInfo[].class);
7193
}
7294

95+
/**
96+
* Asynchronously get the erasure coding codecs available.
97+
* This method checks the operation category and then invokes the
98+
* getErasureCodingCodecs method concurrently across all namespaces.
99+
* <p>
100+
* The results are merged into a single map of codec names to codec properties.
101+
*
102+
* @return Map of erasure coding codecs.
103+
* @throws IOException If an I/O error occurs.
104+
*/
73105
@Override
74106
public Map<String, String> getErasureCodingCodecs() throws IOException {
75107
rpcServer.checkOperation(NameNode.OperationCategory.READ);
@@ -97,6 +129,17 @@ public Map<String, String> getErasureCodingCodecs() throws IOException {
97129
return asyncReturn(Map.class);
98130
}
99131

132+
/**
133+
* Asynchronously add an array of erasure coding policies.
134+
* This method checks the operation category and then invokes the
135+
* addErasureCodingPolicies method concurrently across all namespaces.
136+
* <p>
137+
* The results are merged and returned as an array of AddErasureCodingPolicyResponse.
138+
*
139+
* @param policies Array of erasure coding policies to add.
140+
* @return Array of AddErasureCodingPolicyResponse.
141+
* @throws IOException If an I/O error occurs.
142+
*/
100143
@Override
101144
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
102145
ErasureCodingPolicy[] policies) throws IOException {
@@ -117,6 +160,17 @@ public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
117160
return asyncReturn(AddErasureCodingPolicyResponse[].class);
118161
}
119162

163+
/**
164+
* Asynchronously get the erasure coding policy for a given source path.
165+
* This method checks the operation category and then invokes the
166+
* getErasureCodingPolicy method sequentially for the given path.
167+
* <p>
168+
* The result is returned as an ErasureCodingPolicy object.
169+
*
170+
* @param src Source path to get the erasure coding policy for.
171+
* @return ErasureCodingPolicy for the given path.
172+
* @throws IOException If an I/O error occurs.
173+
*/
120174
@Override
121175
public ErasureCodingPolicy getErasureCodingPolicy(String src)
122176
throws IOException {
@@ -136,6 +190,17 @@ public ErasureCodingPolicy getErasureCodingPolicy(String src)
136190
return asyncReturn(ErasureCodingPolicy.class);
137191
}
138192

193+
/**
194+
* Asynchronously get the EC topology result for the given policies.
195+
* This method checks the operation category and then invokes the
196+
* getECTopologyResultForPolicies method concurrently across all namespaces.
197+
* <p>
198+
* The results are merged and the first unsupported result is returned.
199+
*
200+
* @param policyNames Array of policy names to check.
201+
* @return ECTopologyVerifierResult for the policies.
202+
* @throws IOException If an I/O error occurs.
203+
*/
139204
@Override
140205
public ECTopologyVerifierResult getECTopologyResultForPolicies(
141206
String[] policyNames) throws IOException {
@@ -162,6 +227,16 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies(
162227
return asyncReturn(ECTopologyVerifierResult.class);
163228
}
164229

230+
/**
231+
* Asynchronously get the erasure coding block group statistics.
232+
* This method checks the operation category and then invokes the
233+
* getECBlockGroupStats method concurrently across all namespaces.
234+
* <p>
235+
* The results are merged and returned as an ECBlockGroupStats object.
236+
*
237+
* @return ECBlockGroupStats for the erasure coding block groups.
238+
* @throws IOException If an I/O error occurs.
239+
*/
165240
@Override
166241
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
167242
rpcServer.checkOperation(NameNode.OperationCategory.READ);

0 commit comments

Comments
 (0)