Skip to content

Commit 5defb02

Browse files
authored
http (fix): Properly report exception stack traces to server-side logs (#3358)
The server-side stack trace was not recorded in airframe-http-netty. This PR fixes this issue.
1 parent 76dd486 commit 5defb02

File tree

8 files changed

+79
-94
lines changed

8 files changed

+79
-94
lines changed

airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ import io.netty.handler.stream.ChunkedWriteHandler
2525
import wvlet.airframe.codec.{MessageCodec, MessageCodecFactory}
2626
import wvlet.airframe.control.ThreadUtil
2727
import wvlet.airframe.http.HttpMessage.Response
28-
import wvlet.airframe.http.{HttpMessage, *}
2928
import wvlet.airframe.http.client.{AsyncClient, SyncClient}
30-
import wvlet.airframe.http.internal.{LogRotationHttpLogger, RPCLoggingFilter, RPCResponseFilter}
29+
import wvlet.airframe.http.internal.{LogRotationHttpLogger, RPCResponseFilter}
3130
import wvlet.airframe.http.router.{ControllerProvider, HttpRequestDispatcher}
31+
import wvlet.airframe.http.{HttpMessage, *}
3232
import wvlet.airframe.rx.Rx
3333
import wvlet.airframe.surface.Surface
3434
import wvlet.airframe.{Design, Session}
3535
import wvlet.log.LogSupport
3636
import wvlet.log.io.IOUtil
3737

38-
import java.util.concurrent.{Executors, TimeUnit}
3938
import java.util.concurrent.atomic.AtomicBoolean
39+
import java.util.concurrent.{Executors, TimeUnit}
4040
import javax.annotation.PostConstruct
4141
import scala.collection.immutable.ListMap
4242
import scala.concurrent.ExecutionContext
@@ -52,7 +52,6 @@ case class NettyServerConfig(
5252
httpLoggerProvider: HttpLoggerConfig => HttpLogger = { (config: HttpLoggerConfig) =>
5353
new LogRotationHttpLogger(config)
5454
},
55-
loggingFilter: HttpLogger => RxHttpFilter = { new RPCLoggingFilter(_) },
5655
customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty,
5756
// Thread manager for handling Future[_] responses
5857
executionContext: ExecutionContext = {
@@ -86,7 +85,6 @@ case class NettyServerConfig(
8685
}
8786
def noLogging: NettyServerConfig = {
8887
this.copy(
89-
loggingFilter = { _ => RxHttpFilter.identity },
9088
httpLoggerProvider = HttpLogger.emptyLogger(_)
9189
)
9290
}
@@ -138,8 +136,8 @@ case class NettyServerConfig(
138136

139137
class NettyServer(config: NettyServerConfig, session: Session) extends HttpServer with LogSupport {
140138

141-
private val httpLogger: HttpLogger = config.newHttpLogger
142-
private val loggingFilter: RxHttpFilter = config.loggingFilter(httpLogger)
139+
private val httpLogger: HttpLogger = config.newHttpLogger
140+
private val rpcFilter: RxHttpFilter = new RPCResponseFilter(httpLogger)
143141

144142
private val bossGroup = {
145143
val tf = ThreadUtil.newDaemonThreadFactory("airframe-netty-boss")
@@ -235,8 +233,7 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe
235233
NettyBackend
236234
.rxFilterAdapter(
237235
attachContextFilter
238-
.andThen(loggingFilter)
239-
.andThen(RPCResponseFilter)
236+
.andThen(rpcFilter)
240237
)
241238
.andThen(
242239
HttpRequestDispatcher.newDispatcher(

airframe-http/src/main/scala/wvlet/airframe/http/HttpLogger.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import wvlet.log.LogSupport
2121
* Interface for writing HTTP request/response logs
2222
*/
2323
trait HttpLogger extends AutoCloseable {
24+
// Headers to exclude from the logs
25+
val excludeHeaders: HttpMultiMap = HttpMultiMap.fromHeaderNames(config.excludeHeaders)
26+
2427
def config: HttpLoggerConfig
2528

2629
final def write(log: Map[String, Any]): Unit = {

airframe-http/src/main/scala/wvlet/airframe/http/RPCStatus.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ sealed abstract class RPCStatus(
439439

440440
def shouldReportStackTrace: Boolean = {
441441
this match {
442+
// Do not report stack traces for non-authorized user requests by default
442443
case UNAUTHENTICATED_U13 | PERMISSION_DENIED_U14 => false
443444
case _ => true
444445
}

airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientLoggingFilter.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,27 @@ import wvlet.airframe.http.{HttpLogger, HttpLoggerConfig, HttpMultiMap, RxHttpEn
1818
import wvlet.airframe.rx.Rx
1919
import wvlet.log.LogSupport
2020

21+
import scala.util.{Failure, Success}
22+
2123
/**
2224
* A client-side filter for logging HTTP requests and responses
2325
*/
2426
class HttpClientLoggingFilter(httpLogger: HttpLogger) extends HttpClientFilter with AutoCloseable with LogSupport {
25-
26-
private val excludeHeaders = HttpMultiMap.fromHeaderNames(httpLogger.config.excludeHeaders)
27-
2827
override def close(): Unit = {
2928
httpLogger.close()
3029
}
3130

3231
def apply(context: HttpClientContext): RxHttpFilter = new RxHttpFilter {
3332
override def apply(request: Request, next: RxHttpEndpoint): Rx[Response] = {
34-
HttpLogs.reportLog(httpLogger, excludeHeaders, request, next, Some(context))
33+
val logContext = new HttpLogs.LogContext(request, httpLogger, Some(context), None)
34+
next(request)
35+
.tap { resp =>
36+
// TODO Record exceptions returned from the server
37+
logContext.logResponse(resp, None)
38+
}
39+
.tapOnFailure { ex =>
40+
logContext.logError(ex)
41+
}
3542
}
3643
}
3744
}

airframe-http/src/main/scala/wvlet/airframe/http/internal/HttpLogs.scala

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,69 +19,68 @@ import wvlet.airframe.http.client.HttpClientContext
1919
import wvlet.airframe.rx.Rx
2020
import wvlet.airframe.surface.{Parameter, Surface, TypeName}
2121
import wvlet.airframe.ulid.ULID
22-
import wvlet.log.LogTimestampFormatter
22+
import wvlet.log.{LogSupport, LogTimestampFormatter}
2323

2424
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2525
import scala.annotation.tailrec
2626
import scala.collection.immutable.ListMap
2727
import scala.concurrent.ExecutionException
28-
import scala.util.Try
28+
import scala.util.{Failure, Success, Try}
2929

3030
/**
3131
* Internal utilities for HTTP request/response logging
3232
*/
33-
object HttpLogs {
33+
object HttpLogs extends LogSupport {
3434

35-
def reportLog(
36-
httpLogger: HttpLogger,
37-
excludeHeaders: HttpMultiMap,
35+
class LogContext(
3836
request: Request,
39-
next: RxHttpEndpoint,
40-
clientContext: Option[HttpClientContext] = None,
41-
rpcContext: Option[RPCContext] = None
42-
): Rx[Response] = {
43-
val baseTime = System.currentTimeMillis()
44-
val start = System.nanoTime()
45-
val m = ListMap.newBuilder[String, Any]
46-
m ++= unixTimeLogs(baseTime)
47-
m ++= commonRequestLogs(request)
48-
m ++= requestHeaderLogs(request, excludeHeaders)
49-
50-
def reportLogs: Unit = {
51-
// Finally, write the log
52-
httpLogger.write(httpLogger.config.logFilter(m.result()))
53-
}
37+
httpLogger: HttpLogger,
38+
clientContext: Option[HttpClientContext],
39+
rpcContext: Option[RPCContext]
40+
) {
41+
private val baseTime = System.currentTimeMillis()
42+
private val start = System.nanoTime()
43+
private val m = ListMap.newBuilder[String, Any]
5444

55-
def rpcCallLogs(): Unit = {
56-
// RPC call context will be set to a TLS after dispatching an event
57-
// TODO Pass RPC call context without using TLS
45+
init()
46+
47+
private def init(): Unit = {
48+
m ++= unixTimeLogs(baseTime)
49+
m ++= commonRequestLogs(request)
50+
m ++= requestHeaderLogs(request, httpLogger.excludeHeaders)
51+
52+
// Log RPC context in the client side
53+
clientContext.foreach {
54+
_.rpcMethod.map { rpc => m ++= rpcMethodLogs(rpc) }
55+
}
56+
// Log RPC context in the server side
5857
rpcContext.flatMap(_.rpcCallContext).foreach { rcc =>
5958
m ++= rpcLogs(rcc)
6059
}
6160
}
6261

63-
clientContext.foreach {
64-
_.rpcMethod.map { rpc => m ++= rpcMethodLogs(rpc) }
62+
private def logResponse(): Unit = {
63+
m ++= durationLogs(baseTime, start)
6564
}
6665

67-
next
68-
.apply(request)
69-
.toRx
70-
.map { resp =>
71-
m ++= durationLogs(baseTime, start)
72-
rpcCallLogs()
73-
m ++= commonResponseLogs(resp)
74-
m ++= responseHeaderLogs(resp, excludeHeaders)
75-
reportLogs
76-
resp
77-
}
78-
.recoverWith { case e: Throwable =>
79-
m ++= durationLogs(baseTime, start)
80-
rpcCallLogs()
81-
m ++= errorLogs(e)
82-
reportLogs
83-
Rx.exception(e)
84-
}
66+
def logResponse(response: Response, exception: Option[Throwable]): Response = {
67+
logResponse()
68+
m ++= commonResponseLogs(response)
69+
m ++= responseHeaderLogs(response, httpLogger.excludeHeaders)
70+
exception.foreach(ex => m ++= errorLogs(ex))
71+
72+
// Write the log
73+
httpLogger.write(httpLogger.config.logFilter(m.result()))
74+
response
75+
}
76+
77+
def logError(e: Throwable): Unit = {
78+
logResponse()
79+
m ++= errorLogs(e)
80+
81+
// Write the log
82+
httpLogger.write(httpLogger.config.logFilter(m.result()))
83+
}
8584
}
8685

8786
def durationLogs(baseTime: Long, sinceNano: Long): ListMap[String, Any] = {

airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCCallContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ case class RPCCallContext(
2424
// The full class name of the RPC implementation class
2525
def rpcClassName: String = TypeName.sanitizeTypeName(rpcMethodSurface.owner.fullName)
2626
def rpcInterfaceName: String = rpcMethod.rpcInterfaceName
27-
def rpcMethodName: String = rpcMethodSurface.name
27+
def rpcMethodName: String = rpcMethod.methodName
2828
def withRPCArgs(rpcArgs: Seq[Any]): RPCCallContext = this.copy(rpcArgs = rpcArgs)
2929
}

airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCLoggingFilter.scala

Lines changed: 0 additions & 30 deletions
This file was deleted.

airframe-http/src/main/scala/wvlet/airframe/http/internal/RPCResponseFilter.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ package wvlet.airframe.http.internal
1616
import wvlet.airframe.http.{
1717
Http,
1818
HttpHeader,
19+
HttpLogger,
1920
HttpMessage,
21+
HttpMultiMap,
2022
HttpServerException,
2123
HttpStatus,
24+
RPCContext,
2225
RPCException,
2326
RPCStatus,
2427
RxHttpEndpoint,
@@ -30,23 +33,28 @@ import wvlet.log.LogSupport
3033
import scala.util.{Failure, Success}
3134

3235
/**
33-
* Add RPCStatus to the response header and embed the error message to the request body
36+
* A filter for managing RPC status header, logs, and errors. Exception messages will be embedded to the response body.
3437
*/
35-
object RPCResponseFilter extends RxHttpFilter with LogSupport {
38+
class RPCResponseFilter(httpLogger: HttpLogger) extends RxHttpFilter with LogSupport {
3639
override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[HttpMessage.Response] = {
40+
41+
val logContext = new HttpLogs.LogContext(request, httpLogger, None, Some(RPCContext.current))
42+
3743
next(request)
3844
.transform {
3945
case Success(resp) =>
40-
setRPCStatus(resp)
46+
logContext.logResponse(setRPCStatus(resp), None)
4147
case Failure(e) =>
4248
e match {
4349
case ex: HttpServerException =>
4450
val re = RPCStatus.fromHttpStatus(ex.status).newException(ex.getMessage, ex.getCause)
45-
re.toResponse
51+
logContext.logResponse(re.toResponse, Some(re))
4652
case ex: RPCException =>
47-
ex.toResponse
53+
logContext.logResponse(ex.toResponse, Some(ex))
4854
case other =>
49-
RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other).toResponse
55+
val ex = RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other)
56+
// Report the original error to the log
57+
logContext.logResponse(ex.toResponse, Some(other))
5058
}
5159
}
5260
}

0 commit comments

Comments
 (0)