Skip to content

Commit f8fb2fe

Browse files
committed
Add:RouterCacheAdmin supports asynchronous rpc
1 parent 4e0b405 commit f8fb2fe

File tree

15 files changed

+1741
-50
lines changed

15 files changed

+1741
-50
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,91 @@
1818

1919
package org.apache.hadoop.hdfs.protocolPB;
2020

21+
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
2122
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
2223
import org.apache.hadoop.io.Writable;
23-
import org.apache.hadoop.ipc.CallerContext;
2424
import org.apache.hadoop.ipc.Client;
2525
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
26-
import org.apache.hadoop.ipc.Server;
2726
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
2827
import org.apache.hadoop.util.concurrent.AsyncGet;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

3231
import java.io.IOException;
3332
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.Executor;
3434

3535
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
36-
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
3736
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCompleteWith;
3837
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
3938
import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
4039

40+
/**
41+
* <p>This utility class encapsulates the logic required to initiate asynchronous RPCs,
42+
* handle responses, and propagate exceptions. It works in conjunction with
43+
* {@link ProtobufRpcEngine2} and {@link Client} to facilitate the asynchronous
44+
* nature of the operations.
45+
*
46+
* @see ProtobufRpcEngine2
47+
* @see Client
48+
* @see CompletableFuture
49+
*/
4150
public final class AsyncRpcProtocolPBUtil {
4251
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
52+
/** The executor used for handling responses asynchronously. */
53+
private static Executor worker;
4354

4455
private AsyncRpcProtocolPBUtil() {}
4556

57+
/**
58+
* Asynchronously invokes an RPC call and applies a response transformation function
59+
* to the result. This method is generic and can be used to handle any type of
60+
* RPC call.
61+
*
62+
* <p>The method uses the {@link ShadedProtobufHelper.IpcCall} to prepare the RPC call
63+
* and the {@link ApplyFunction} to process the response. It also handles exceptions
64+
* that may occur during the RPC call and wraps them in a user-friendly manner.
65+
*
66+
* @param call The IPC call encapsulating the RPC request.
67+
* @param response The function to apply to the response of the RPC call.
68+
* @param clazz The class object representing the type {@code R} of the response.
69+
* @param <T> Type of the call's result.
70+
* @param <R> Type of method return.
71+
* @return An object of type {@code R} that is the result of applying the response
72+
* function to the RPC call result.
73+
* @throws IOException If an I/O error occurs during the asynchronous RPC call.
74+
*/
4675
public static <T, R> R asyncIpcClient(
4776
ShadedProtobufHelper.IpcCall<T> call, ApplyFunction<T, R> response,
4877
Class<R> clazz) throws IOException {
4978
ipc(call);
5079
AsyncGet<T, Exception> asyncReqMessage =
5180
(AsyncGet<T, Exception>) ProtobufRpcEngine2.getAsyncReturnMessage();
5281
CompletableFuture<Writable> responseFuture = Client.getResponseFuture();
53-
// transfer originCall & callerContext to worker threads of executor.
54-
final Server.Call originCall = Server.getCurCall().get();
55-
final CallerContext originContext = CallerContext.getCurrent();
56-
asyncCompleteWith(responseFuture);
57-
asyncApply(o -> {
82+
// transfer thread local context to worker threads of executor.
83+
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
84+
asyncCompleteWith(responseFuture.handleAsync((result, e) -> {
85+
threadLocalContext.transfer();
86+
if (e != null) {
87+
throw warpCompletionException(e);
88+
}
5889
try {
59-
Server.getCurCall().set(originCall);
60-
CallerContext.setCurrent(originContext);
6190
T res = asyncReqMessage.get(-1, null);
6291
return response.apply(res);
63-
} catch (Exception e) {
64-
throw warpCompletionException(e);
92+
} catch (Exception ex) {
93+
throw warpCompletionException(ex);
6594
}
66-
});
95+
}, worker));
6796
return asyncReturn(clazz);
6897
}
98+
99+
/**
100+
* Sets the executor used for handling responses asynchronously within
101+
* the utility class.
102+
*
103+
* @param worker The executor to be used for handling responses asynchronously.
104+
*/
105+
public static void setWorker(Executor worker) {
106+
AsyncRpcProtocolPBUtil.worker = worker;
107+
}
69108
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
5252

5353

5454
/** Time for an operation to be received in the Router. */
55-
private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>();
55+
private static final ThreadLocal<Long> START_TIME = ThreadLocal.withInitial(() -> -1L);
5656
/** Time for an operation to be sent to the Namenode. */
57-
private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>();
57+
private static final ThreadLocal<Long> PROXY_TIME = ThreadLocal.withInitial(() -> -1L);
5858

5959
/** Configuration for the performance monitor. */
6060
private Configuration conf;
@@ -141,6 +141,14 @@ public void startOp() {
141141
START_TIME.set(monotonicNow());
142142
}
143143

144+
public static long getStartOpTime() {
145+
return START_TIME.get();
146+
}
147+
148+
public static void setStartOpTime(long startOpTime) {
149+
START_TIME.set(startOpTime);
150+
}
151+
144152
@Override
145153
public long proxyOp() {
146154
PROXY_TIME.set(monotonicNow());
@@ -151,6 +159,14 @@ public long proxyOp() {
151159
return Thread.currentThread().getId();
152160
}
153161

162+
public static long getProxyOpTime() {
163+
return PROXY_TIME.get();
164+
}
165+
166+
public static void setProxyOpTime(long proxyOpTime) {
167+
PROXY_TIME.set(proxyOpTime);
168+
}
169+
154170
@Override
155171
public void proxyOpComplete(boolean success, String nsId,
156172
FederationNamenodeServiceState state) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
7272
public static final String DFS_ROUTER_RPC_ENABLE =
7373
FEDERATION_ROUTER_PREFIX + "rpc.enable";
7474
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
75+
public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
76+
FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
77+
public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
78+
public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
79+
FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
80+
public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
81+
public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
82+
FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
83+
public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
7584

7685
public static final String DFS_ROUTER_METRICS_ENABLE =
7786
FEDERATION_ROUTER_PREFIX + "metrics.enable";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.federation.router;
20+
21+
import org.apache.hadoop.fs.CacheFlag;
22+
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
23+
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
24+
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
25+
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
26+
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
27+
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
28+
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
29+
30+
import java.io.IOException;
31+
import java.util.EnumSet;
32+
import java.util.Map;
33+
34+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
35+
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
36+
37+
/**
38+
* Module that implements all the asynchronous RPC calls in
39+
* {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to Cache Admin
40+
* in the {@link RouterRpcServer}.
41+
*/
42+
public class RouterAsyncCacheAdmin extends RouterCacheAdmin{
43+
44+
public RouterAsyncCacheAdmin(RouterRpcServer server) {
45+
super(server);
46+
}
47+
48+
@Override
49+
public long addCacheDirective(
50+
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
51+
invokeAddCacheDirective(path, flags);
52+
asyncApply((ApplyFunction<Map<RemoteLocation, Long>, Long>)
53+
response -> response.values().iterator().next());
54+
return asyncReturn(Long.class);
55+
}
56+
57+
@Override
58+
public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
59+
long prevId, CacheDirectiveInfo filter) throws IOException {
60+
invokeListCacheDirectives(prevId, filter);
61+
asyncApply((ApplyFunction<Map,
62+
BatchedEntries<CacheDirectiveEntry>>)
63+
response -> (BatchedEntries<CacheDirectiveEntry>) response.values().iterator().next());
64+
return asyncReturn(BatchedEntries.class);
65+
}
66+
67+
@Override
68+
public BatchedEntries<CachePoolEntry> listCachePools(String prevKey) throws IOException {
69+
invokeListCachePools(prevKey);
70+
asyncApply((ApplyFunction<Map<FederationNamespaceInfo, BatchedEntries>,
71+
BatchedEntries<CachePoolEntry>>)
72+
results -> results.values().iterator().next());
73+
return asyncReturn(BatchedEntries.class);
74+
}
75+
}

0 commit comments

Comments
 (0)