Skip to content

Commit ac715c5

Browse files
authored
Coalesce final DATA frames for some RPCs (#2274)
Motivation: For clients, end_stream is set on an empty DATA frame to signal the client half closing. This is completely valid, however, nginx handles this poorly and can reset the stream if it receives the response before end_stream for the client. Modifications: - When writing messages set end stream of the final DATA frame. - When flushing, don't emit writes if the client sends a single message, wait for the end part first. Result: Fewer DATA frames, fewer nginx issues
1 parent 956259f commit ac715c5

File tree

1 file changed

+53
-26
lines changed

1 file changed

+53
-26
lines changed

Sources/GRPC/GRPCClientChannelHandler.swift

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ internal final class GRPCClientChannelHandler {
287287
private let logger: Logger
288288
private var stateMachine: GRPCClientStateMachine
289289
private let maximumReceiveMessageLength: Int
290+
private let holdWritesUntilEOS: Bool
290291

291292
/// Creates a new gRPC channel handler for clients to translate HTTP/2 frames to gRPC messages.
292293
///
@@ -304,12 +305,16 @@ internal final class GRPCClientChannelHandler {
304305
switch callType {
305306
case .unary:
306307
self.stateMachine = .init(requestArity: .one, responseArity: .one)
308+
self.holdWritesUntilEOS = true
307309
case .clientStreaming:
308310
self.stateMachine = .init(requestArity: .many, responseArity: .one)
311+
self.holdWritesUntilEOS = false
309312
case .serverStreaming:
310313
self.stateMachine = .init(requestArity: .one, responseArity: .many)
314+
self.holdWritesUntilEOS = true
311315
case .bidirectionalStreaming:
312316
self.stateMachine = .init(requestArity: .many, responseArity: .many)
317+
self.holdWritesUntilEOS = false
313318
}
314319
}
315320
}
@@ -564,22 +569,36 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
564569

565570
case .end:
566571
// About to send end: write any outbound messages first.
567-
while let (result, promise) = self.stateMachine.nextRequest() {
572+
var next = self.stateMachine.nextRequest()
573+
var wroteEOS = false
574+
575+
while let (result, bufferedPromise) = next {
576+
next = self.stateMachine.nextRequest()
577+
568578
switch result {
569579
case let .success(buffer):
580+
// Write EOS if this is the final frame.
581+
let setEOS = next == nil
582+
var effectivePromise = bufferedPromise
583+
584+
if setEOS {
585+
effectivePromise.setOrCascade(to: promise)
586+
}
587+
588+
wroteEOS = wroteEOS || setEOS
570589
let framePayload: HTTP2Frame.FramePayload = .data(
571-
.init(data: .byteBuffer(buffer), endStream: false)
590+
.init(data: .byteBuffer(buffer), endStream: setEOS)
572591
)
573592

574593
self.logger.trace(
575594
"writing HTTP2 frame",
576595
metadata: [
577596
MetadataKey.h2Payload: "DATA",
578597
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
579-
MetadataKey.h2EndStream: "false",
598+
MetadataKey.h2EndStream: "\(setEOS)",
580599
]
581600
)
582-
context.write(self.wrapOutboundOut(framePayload), promise: promise)
601+
context.write(self.wrapOutboundOut(framePayload), promise: effectivePromise)
583602

584603
case let .failure(error):
585604
context.fireErrorCaught(error)
@@ -590,8 +609,9 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
590609

591610
// Okay: can we close the request stream?
592611
switch self.stateMachine.sendEndOfRequestStream() {
593-
case .success:
594-
// We can. Send an empty DATA frame with end-stream set.
612+
case .success where !wroteEOS:
613+
// EOS wasn't included in any buffered DATA frame above; send an empty DATA frame with
614+
// end-stream set.
595615
let empty = context.channel.allocator.buffer(capacity: 0)
596616
let framePayload: HTTP2Frame.FramePayload = .data(
597617
.init(data: .byteBuffer(empty), endStream: true)
@@ -607,6 +627,10 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
607627
)
608628
context.write(self.wrapOutboundOut(framePayload), promise: promise)
609629

630+
case .success:
631+
// EOS was already included in the final buffered DATA frame; nothing more to send.
632+
break
633+
610634
case let .failure(error):
611635
// Why can't we close the request stream?
612636
switch error {
@@ -628,28 +652,31 @@ extension GRPCClientChannelHandler: ChannelOutboundHandler {
628652
}
629653

630654
func flush(context: ChannelHandlerContext) {
631-
// Drain any requests.
632-
while let (result, promise) = self.stateMachine.nextRequest() {
633-
switch result {
634-
case let .success(buffer):
635-
let framePayload: HTTP2Frame.FramePayload = .data(
636-
.init(data: .byteBuffer(buffer), endStream: false)
637-
)
655+
// Drain any requests unless holding them until EOS (so that end stream isn't emitted in a
656+
// separate frame).
657+
if !self.holdWritesUntilEOS {
658+
while let (result, promise) = self.stateMachine.nextRequest() {
659+
switch result {
660+
case let .success(buffer):
661+
let framePayload: HTTP2Frame.FramePayload = .data(
662+
.init(data: .byteBuffer(buffer), endStream: false)
663+
)
638664

639-
self.logger.trace(
640-
"writing HTTP2 frame",
641-
metadata: [
642-
MetadataKey.h2Payload: "DATA",
643-
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
644-
MetadataKey.h2EndStream: "false",
645-
]
646-
)
647-
context.write(self.wrapOutboundOut(framePayload), promise: promise)
665+
self.logger.trace(
666+
"writing HTTP2 frame",
667+
metadata: [
668+
MetadataKey.h2Payload: "DATA",
669+
MetadataKey.h2DataBytes: "\(buffer.readableBytes)",
670+
MetadataKey.h2EndStream: "false",
671+
]
672+
)
673+
context.write(self.wrapOutboundOut(framePayload), promise: promise)
648674

649-
case let .failure(error):
650-
context.fireErrorCaught(error)
651-
promise?.fail(error)
652-
return
675+
case let .failure(error):
676+
context.fireErrorCaught(error)
677+
promise?.fail(error)
678+
return
679+
}
653680
}
654681
}
655682

0 commit comments

Comments
 (0)