@@ -305,8 +305,9 @@ void checkResponse(RpcResponseHeaderProto header) throws IOException {
305
305
}
306
306
}
307
307
308
- Call createCall (RPC .RpcKind rpcKind , Writable rpcRequest ) {
309
- return new Call (rpcKind , rpcRequest );
308
+ Call createCall (RPC .RpcKind rpcKind , Writable rpcRequest ,
309
+ AlignmentContext alignmentContext ) {
310
+ return new Call (rpcKind , rpcRequest , alignmentContext );
310
311
}
311
312
312
313
/**
@@ -319,11 +320,13 @@ static class Call {
319
320
private final CompletableFuture <Writable > rpcResponseFuture ;
320
321
final RPC .RpcKind rpcKind ; // Rpc EngineKind
321
322
private final Object externalHandler ;
322
- private AlignmentContext alignmentContext ;
323
+ private final AlignmentContext alignmentContext ;
323
324
324
- private Call (RPC .RpcKind rpcKind , Writable param ) {
325
+ private Call (RPC .RpcKind rpcKind , Writable param ,
326
+ AlignmentContext alignmentContext ) {
325
327
this .rpcKind = rpcKind ;
326
328
this .rpcRequest = param ;
329
+ this .alignmentContext = alignmentContext ;
327
330
328
331
final Integer id = callId .get ();
329
332
if (id == null ) {
@@ -351,44 +354,32 @@ public String toString() {
351
354
352
355
/** Indicate when the call is complete and the
353
356
* value or error are available. Notifies by default. */
354
- protected synchronized void callComplete (Writable rpcResponse , IOException error ) {
355
- if (error != null ) {
356
- rpcResponseFuture .completeExceptionally (error );
357
- } else {
358
- rpcResponseFuture .complete (rpcResponse );
359
- }
357
+ protected void callComplete () {
360
358
if (externalHandler != null ) {
361
359
synchronized (externalHandler ) {
362
360
externalHandler .notify ();
363
361
}
364
362
}
365
363
}
366
364
367
- /**
368
- * Set an AlignmentContext for the call to update when call is done.
369
- *
370
- * @param ac alignment context to update.
371
- */
372
- public synchronized void setAlignmentContext (AlignmentContext ac ) {
373
- this .alignmentContext = ac ;
374
- }
375
-
376
365
/** Set the exception when there is an error.
377
366
* Notify the caller the call is done.
378
367
*
379
368
* @param error exception thrown by the call; either local or remote
380
369
*/
381
- public synchronized void setException (IOException error ) {
382
- callComplete (null , error );
370
+ public void setException (IOException error ) {
371
+ rpcResponseFuture .completeExceptionally (error );
372
+ callComplete ();
383
373
}
384
374
385
375
/** Set the return value when there is no error.
386
376
* Notify the caller the call is done.
387
377
*
388
378
* @param rpcResponse return value of the rpc call.
389
379
*/
390
- public synchronized void setRpcResponse (Writable rpcResponse ) {
391
- callComplete (rpcResponse , null );
380
+ public void setRpcResponse (Writable rpcResponse ) {
381
+ rpcResponseFuture .complete (rpcResponse );
382
+ callComplete ();
392
383
}
393
384
}
394
385
@@ -1503,8 +1494,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
1503
1494
ConnectionId remoteId , int serviceClass ,
1504
1495
AtomicBoolean fallbackToSimpleAuth , AlignmentContext alignmentContext )
1505
1496
throws IOException {
1506
- final Call call = createCall (rpcKind , rpcRequest );
1507
- call .setAlignmentContext (alignmentContext );
1497
+ final Call call = createCall (rpcKind , rpcRequest , alignmentContext );
1508
1498
final Connection connection = getConnection (remoteId , call , serviceClass ,
1509
1499
fallbackToSimpleAuth );
1510
1500
0 commit comments