Skip to content

Commit 24c1de0

Browse files
committed
HADOOP-19362. RPC metrics should be updated correctly when call is defered.
1 parent be06adc commit 24c1de0

File tree

3 files changed

+87
-19
lines changed

3 files changed

+87
-19
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.classification.InterfaceAudience;
2727
import org.apache.hadoop.classification.InterfaceStability;
2828
import org.apache.hadoop.classification.InterfaceStability.Unstable;
29+
import org.apache.hadoop.classification.VisibleForTesting;
2930
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.io.Writable;
3132
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -34,7 +35,6 @@
3435
import org.apache.hadoop.security.UserGroupInformation;
3536
import org.apache.hadoop.security.token.SecretManager;
3637
import org.apache.hadoop.security.token.TokenIdentifier;
37-
import org.apache.hadoop.classification.VisibleForTesting;
3838
import org.apache.hadoop.tracing.TraceScope;
3939
import org.apache.hadoop.tracing.Tracer;
4040
import org.apache.hadoop.util.Time;
@@ -393,28 +393,39 @@ static class ProtobufRpcEngineCallbackImpl
393393
private final RPC.Server server;
394394
private final Call call;
395395
private final String methodName;
396-
private final long setupTime;
396+
private final long callStartNanos;
397397

398398
public ProtobufRpcEngineCallbackImpl() {
399399
this.server = CURRENT_CALL_INFO.get().getServer();
400400
this.call = Server.getCurCall().get();
401401
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
402-
this.setupTime = Time.monotonicNow();
402+
this.callStartNanos = Server.getCurCallStartNanos().get();
403+
}
404+
405+
private void updateProcessingDetails(Call call, long deltaNanos) {
406+
ProcessingDetails details = call.getProcessingDetails();
407+
call.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
408+
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
409+
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
410+
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
411+
details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
403412
}
404413

405414
@Override
406415
public void setResponse(Message message) {
407-
long processingTime = Time.monotonicNow() - setupTime;
416+
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
417+
updateProcessingDetails(call, deltaNanos);
408418
call.setDeferredResponse(RpcWritable.wrap(message));
409-
server.updateDeferredMetrics(methodName, processingTime);
419+
server.updateDeferredMetrics(call, methodName, deltaNanos);
410420
}
411421

412422
@Override
413423
public void error(Throwable t) {
414-
long processingTime = Time.monotonicNow() - setupTime;
415-
String detailedMetricsName = t.getClass().getSimpleName();
416-
server.updateDeferredMetrics(detailedMetricsName, processingTime);
424+
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
425+
updateProcessingDetails(call, deltaNanos);
417426
call.setDeferredError(t);
427+
String detailedMetricsName = t.getClass().getSimpleName();
428+
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
418429
}
419430
}
420431

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.io.Writable;
2626
import org.apache.hadoop.io.retry.RetryPolicy;
2727
import org.apache.hadoop.ipc.Client.ConnectionId;
28+
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
2829
import org.apache.hadoop.ipc.RPC.RpcInvoker;
2930
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
3031
import org.apache.hadoop.security.UserGroupInformation;
@@ -425,28 +426,39 @@ static class ProtobufRpcEngineCallbackImpl
425426
private final RPC.Server server;
426427
private final Call call;
427428
private final String methodName;
428-
private final long setupTime;
429+
private final long callStartNanos;
429430

430431
ProtobufRpcEngineCallbackImpl() {
431432
this.server = CURRENT_CALL_INFO.get().getServer();
432433
this.call = Server.getCurCall().get();
433434
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
434-
this.setupTime = Time.monotonicNow();
435+
this.callStartNanos = Server.getCurCallStartNanos().get();
435436
}
436437

438+
private void updateProcessingDetails(Call call, long deltaNanos) {
439+
ProcessingDetails details = call.getProcessingDetails();
440+
call.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
441+
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
442+
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
443+
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
444+
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
445+
}
446+
437447
@Override
438448
public void setResponse(Message message) {
439-
long processingTime = Time.monotonicNow() - setupTime;
449+
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
450+
updateProcessingDetails(call, deltaNanos);
440451
call.setDeferredResponse(RpcWritable.wrap(message));
441-
server.updateDeferredMetrics(methodName, processingTime);
452+
server.updateDeferredMetrics(call, methodName, deltaNanos);
442453
}
443454

444455
@Override
445456
public void error(Throwable t) {
446-
long processingTime = Time.monotonicNow() - setupTime;
447-
String detailedMetricsName = t.getClass().getSimpleName();
448-
server.updateDeferredMetrics(detailedMetricsName, processingTime);
457+
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
458+
updateProcessingDetails(call, deltaNanos);
449459
call.setDeferredError(t);
460+
String detailedMetricsName = t.getClass().getSimpleName();
461+
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
450462
}
451463
}
452464

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,13 +351,19 @@ public static Server get() {
351351
* after the call returns.
352352
*/
353353
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
354-
354+
355+
private static final ThreadLocal<Long> CurCallStartNanos = new ThreadLocal<Long>();
356+
355357
/** @return Get the current call. */
356358
@VisibleForTesting
357359
public static ThreadLocal<Call> getCurCall() {
358360
return CurCall;
359361
}
360-
362+
363+
public static ThreadLocal<Long> getCurCallStartNanos() {
364+
return CurCallStartNanos;
365+
}
366+
361367
/**
362368
* Returns the currently active RPC call's sequential ID number. A negative
363369
* call ID indicates an invalid value, such as if there is no currently active
@@ -638,7 +644,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
638644
rpcMetrics.addRpcQueueTime(queueTime);
639645

640646
if (call.isResponseDeferred() || connDropped) {
641-
// call was skipped; don't include it in processing metrics
647+
// The call was skipped; don't include it in processing metrics.
648+
// Will update metrics in method updateDeferredMetrics.
642649
return;
643650
}
644651

@@ -668,9 +675,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
668675
}
669676
}
670677

671-
void updateDeferredMetrics(String name, long processingTime) {
678+
/**
679+
* Update rpc metrics for defered calls.
680+
* @param call The Rpc Call
681+
* @param name Rpc method name
682+
* @param processingTime processing call in ms unit.
683+
*/
684+
void updateDeferredMetrics(Call call, String name, long processingTime) {
685+
long completionTimeNanos = Time.monotonicNowNanos();
686+
long arrivalTimeNanos = call.timestampNanos;
687+
688+
ProcessingDetails details = call.getProcessingDetails();
689+
long waitTime =
690+
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
691+
long responseTime =
692+
details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
693+
rpcMetrics.addRpcLockWaitTime(waitTime);
694+
rpcMetrics.addRpcProcessingTime(processingTime);
695+
rpcMetrics.addRpcResponseTime(responseTime);
672696
rpcMetrics.addDeferredRpcProcessingTime(processingTime);
673697
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
698+
// don't include lock wait for detailed metrics.
699+
processingTime -= waitTime;
700+
rpcDetailedMetrics.addProcessingTime(name, processingTime);
701+
702+
// Overall processing time is from arrival to completion.
703+
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
704+
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
705+
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
706+
callQueue.addResponseTime(name, call, details);
707+
if (isLogSlowRPC()) {
708+
logSlowRpcCalls(name, call, details);
709+
}
710+
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
711+
rpcMetrics.incrRpcCallSuccesses();
712+
}
674713
}
675714

676715
/**
@@ -1243,6 +1282,8 @@ public Void run() throws Exception {
12431282
}
12441283

12451284
long startNanos = Time.monotonicNowNanos();
1285+
// TODO ZHB 这个用来统计processing耗时
1286+
CurCallStartNanos.set(startNanos);
12461287
Writable value = null;
12471288
ResponseParams responseParams = new ResponseParams();
12481289

@@ -1331,6 +1372,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException {
13311372
* Send a deferred response, ignoring errors.
13321373
*/
13331374
private void sendDeferedResponse() {
1375+
long startNanos = Time.monotonicNowNanos();
13341376
try {
13351377
connection.sendResponse(this);
13361378
} catch (Exception e) {
@@ -1342,6 +1384,8 @@ private void sendDeferedResponse() {
13421384
.currentThread().getName() + ", CallId="
13431385
+ callId + ", hostname=" + getHostAddress());
13441386
}
1387+
getProcessingDetails().set(Timing.RESPONSE,
1388+
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
13451389
}
13461390

13471391
@Override
@@ -3220,6 +3264,7 @@ public void run() {
32203264
}
32213265
} finally {
32223266
CurCall.set(null);
3267+
CurCallStartNanos.set(null);
32233268
numInProcessHandler.decrementAndGet();
32243269
IOUtils.cleanupWithLogger(LOG, traceScope);
32253270
if (call != null) {

0 commit comments

Comments
 (0)