Skip to content

Commit 1f842eb

Browse files
committed
RPC client uses CompletableFuture to support asynchronous operations.
1 parent 2ee0bf9 commit 1f842eb

File tree

2 files changed

+116
-0
lines changed

2 files changed

+116
-0
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public class Client implements AutoCloseable {
9696
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
9797
private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
9898
= new ThreadLocal<>();
99+
public static final ThreadLocal<CompletableFuture<Object>> CALL_FUTURE_THREAD_LOCAL
100+
= new ThreadLocal<>();
99101
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
100102
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
101103
private static final ThreadLocal<Boolean> asynchronousMode =
@@ -283,6 +285,7 @@ static class Call {
283285
boolean done; // true when call is done
284286
private final Object externalHandler;
285287
private AlignmentContext alignmentContext;
288+
private final CompletableFuture<Object> completableFuture;
286289

287290
private Call(RPC.RpcKind rpcKind, Writable param) {
288291
this.rpcKind = rpcKind;
@@ -304,6 +307,8 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
304307
}
305308

306309
this.externalHandler = EXTERNAL_CALL_HANDLER.get();
310+
this.completableFuture = CALL_FUTURE_THREAD_LOCAL.get();
311+
CALL_FUTURE_THREAD_LOCAL.remove();
307312
}
308313

309314
@Override
@@ -322,6 +327,9 @@ protected synchronized void callComplete() {
322327
externalHandler.notify();
323328
}
324329
}
330+
if (completableFuture != null) {
331+
completableFuture.complete(this);
332+
}
325333
}
326334

327335
/**

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
2929
import org.apache.hadoop.net.NetUtils;
3030
import org.apache.hadoop.util.StringUtils;
31+
import org.apache.hadoop.util.Time;
3132
import org.apache.hadoop.util.concurrent.AsyncGetFuture;
3233
import org.junit.Assert;
3334
import org.junit.Before;
@@ -38,13 +39,17 @@
3839
import java.io.IOException;
3940
import java.net.InetSocketAddress;
4041
import java.util.*;
42+
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.CompletionException;
4144
import java.util.concurrent.ExecutionException;
4245
import java.util.concurrent.Future;
4346
import java.util.concurrent.TimeUnit;
4447
import java.util.concurrent.TimeoutException;
4548

4649
import static org.junit.Assert.assertEquals;
4750
import static org.junit.Assert.assertFalse;
51+
import static org.junit.Assert.assertTrue;
52+
import static org.junit.Assert.fail;
4853

4954
public class TestAsyncIPC {
5055

@@ -137,6 +142,77 @@ void assertReturnValues(long timeout, TimeUnit unit)
137142
}
138143
}
139144

145+
/**
146+
* For testing the asynchronous calls of the RPC client
147+
* implemented with CompletableFuture.
148+
*/
149+
static class AsyncCompletableFutureCaller extends Thread {
150+
private final Client client;
151+
private final InetSocketAddress server;
152+
private final int count;
153+
private final List<CompletableFuture<Object>> completableFutures = new ArrayList<>();
154+
private final List<Long> expectedValues = new ArrayList<>();
155+
156+
AsyncCompletableFutureCaller(Client client, InetSocketAddress server, int count) {
157+
this.client = client;
158+
this.server = server;
159+
this.count = count;
160+
setName("Async CompletableFuture Caller");
161+
}
162+
163+
@Override
164+
public void run() {
165+
// Set the RPC client to use asynchronous mode.
166+
Client.setAsynchronousMode(true);
167+
long startTime = Time.monotonicNow();
168+
try {
169+
for (int i = 0; i < count; i++) {
170+
final long param = TestIPC.RANDOM.nextLong();
171+
// Set the CompletableFuture object for the current Client.Call.
172+
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
173+
Client.CALL_FUTURE_THREAD_LOCAL.set(completableFuture);
174+
// Execute asynchronous call.
175+
TestIPC.call(client, param, server, conf);
176+
expectedValues.add(param);
177+
// After the call is completed, the response thread
178+
// (currently the Client.connection thread) retrieves the response
179+
// using the AsyncGetFuture<Writable, IOException> object.
180+
AsyncGetFuture<Writable, IOException> asyncRpcResponse = getAsyncRpcResponseFuture();
181+
completableFuture = completableFuture.thenApply(call -> {
182+
LOG.info("[{}] Async response for {}", Thread.currentThread().getName(), call);
183+
assertTrue(Thread.currentThread().getName().contains("connection"));
184+
try {
185+
// Since the current call has already been completed,
186+
// this method does not need to block.
187+
return asyncRpcResponse.get();
188+
} catch (Exception e) {
189+
throw new CompletionException(e);
190+
}
191+
});
192+
completableFutures.add(completableFuture);
193+
}
194+
// Since the run method is asynchronous,
195+
// it does not need to wait for a response after sending a request,
196+
// so the time taken by the run method is less than count * 100
197+
// (where 100 is the time taken by the server to process a request).
198+
long cost = Time.monotonicNow() - startTime;
199+
assertTrue(cost < count * 100);
200+
LOG.info("[{}] run cost {}ms", Thread.currentThread().getName(), cost);
201+
} catch (Exception e) {
202+
fail();
203+
}
204+
}
205+
206+
public void assertReturnValues()
207+
throws InterruptedException, ExecutionException {
208+
for (int i = 0; i < count; i++) {
209+
LongWritable value = (LongWritable) completableFutures.get(i).get();
210+
Assert.assertEquals("call" + i + " failed.",
211+
expectedValues.get(i).longValue(), value.get());
212+
}
213+
}
214+
}
215+
140216
static class AsyncLimitlCaller extends Thread {
141217
private Client client;
142218
private InetSocketAddress server;
@@ -538,4 +614,36 @@ public void run() {
538614
assertEquals(startID + i, callIds.get(i).intValue());
539615
}
540616
}
617+
618+
@Test(timeout = 60000)
619+
public void testAsyncCallWithCompletableFuture() throws IOException,
620+
InterruptedException, ExecutionException {
621+
// Override client to store the call id
622+
final Client client = new Client(LongWritable.class, conf);
623+
624+
// Construct an RPC server, which includes a handler thread.
625+
final TestServer server = new TestIPC.TestServer(1, false, conf);
626+
server.callListener = () -> {
627+
try {
628+
// The server requires at least 100 milliseconds to process a request.
629+
Thread.sleep(100);
630+
} catch (InterruptedException e) {
631+
throw new RuntimeException(e);
632+
}
633+
};
634+
635+
try {
636+
InetSocketAddress addr = NetUtils.getConnectAddress(server);
637+
server.start();
638+
// Send 10 asynchronous requests.
639+
final AsyncCompletableFutureCaller caller =
640+
new AsyncCompletableFutureCaller(client, addr, 10);
641+
caller.run();
642+
// Check if the values returned by the asynchronous call meet the expected values.
643+
caller.assertReturnValues();
644+
} finally {
645+
client.stop();
646+
server.stop();
647+
}
648+
}
541649
}

0 commit comments

Comments
 (0)