@@ -167,7 +167,7 @@ extension GRPCWebToHTTP2ServerCodec {
167167 case fullyOpen( InboundState , OutboundState )
168168
169169 /// The server has closed the response stream, we may receive other request parts from the client.
170- case clientOpenServerClosed
170+ case clientOpenServerClosed( InboundState )
171171
172172 /// The client has sent everything, the server still needs to close the response stream.
173173 case clientClosedServerOpen( OutboundState )
@@ -304,40 +304,15 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
304304 preconditionFailure ( " Invalid state: haven't received request head " )
305305
306306 case . fullyOpen( var inbound, let outbound) :
307- if inbound. requestBuffer == nil {
308- // We're not dealing with gRPC Web Text: just forward the buffer.
309- return . fireChannelRead( . data( . init( data: . byteBuffer( buffer) ) ) )
310- }
311-
312- if inbound. requestBuffer!. readableBytes == 0 {
313- inbound. requestBuffer = buffer
314- } else {
315- inbound. requestBuffer!. writeBuffer ( & buffer)
316- }
317-
318- let readableBytes = inbound. requestBuffer!. readableBytes
319- // The length of base64 encoded data must be a multiple of 4.
320- let bytesToRead = readableBytes - ( readableBytes % 4 )
321-
322- let action : GRPCWebToHTTP2ServerCodec . StateMachine . Action
323-
324- if bytesToRead > 0 ,
325- let base64Encoded = inbound. requestBuffer!. readString ( length: bytesToRead) ,
326- let base64Decoded = Data ( base64Encoded: base64Encoded) {
327- // Recycle the input buffer and restore the request buffer.
328- buffer. clear ( )
329- buffer. writeContiguousBytes ( base64Decoded)
330- action = . fireChannelRead( . data( . init( data: . byteBuffer( buffer) ) ) )
331- } else {
332- action = . none
333- }
334-
307+ let action = inbound. processInboundData ( buffer: & buffer)
335308 self = . fullyOpen( inbound, outbound)
336309 return action
337310
338- case . clientOpenServerClosed:
339- // The server is already done; so drop the request.
340- return . none
311+ case var . clientOpenServerClosed( inbound) :
312+ // The server is already done, but it's not our place to drop the request.
313+ let action = inbound. processInboundData ( buffer: & buffer)
314+ self = . clientOpenServerClosed( inbound)
315+ return action
341316
342317 case . clientClosedServerOpen:
343318 preconditionFailure ( " End of request stream already received " )
@@ -366,9 +341,13 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
366341 preconditionFailure ( " End of request stream already received " )
367342
368343 case . clientOpenServerClosed:
369- // Both sides are closed now, back to idle.
344+ // Both sides are closed now, back to idle. Don't forget to pass on the .end, as
345+ // it's necessary to communicate to the other peers that the response is done.
370346 self = . idle
371- return . none
347+
348+ // Send an empty DATA frame with the end stream flag set.
349+ let empty = allocator. buffer ( capacity: 0 )
350+ return . fireChannelRead( . data( . init( data: . byteBuffer( empty) , endStream: true ) ) )
372351
373352 case . _modifying:
374353 preconditionFailure ( " Left in modifying state " )
@@ -388,12 +367,12 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
388367 case . idle:
389368 preconditionFailure ( " Invalid state: haven't received request head " )
390369
391- case var . fullyOpen( _ , outbound) :
370+ case . fullyOpen( let inbound , var outbound) :
392371 // Double check these are trailers.
393372 assert ( outbound. responseHeadersSent)
394373
395374 // We haven't seen the end of the request stream yet.
396- self = . clientOpenServerClosed
375+ self = . clientOpenServerClosed( inbound )
397376
398377 // Avoid CoW-ing the buffers.
399378 let responseBuffers = outbound. responseBuffer
@@ -467,9 +446,9 @@ extension GRPCWebToHTTP2ServerCodec.StateMachine.State {
467446 case . idle:
468447 preconditionFailure ( " Invalid state: haven't received request head " )
469448
470- case let . fullyOpen( _ , outbound) :
449+ case let . fullyOpen( inbound , outbound) :
471450 // We still haven't seen the end of the request stream.
472- self = . clientOpenServerClosed
451+ self = . clientOpenServerClosed( inbound )
473452
474453 let head = GRPCWebToHTTP2ServerCodec . makeResponseHead (
475454 hpackHeaders: trailers,
@@ -703,6 +682,42 @@ extension GRPCWebToHTTP2ServerCodec {
703682 }
704683}
705684
685+ extension GRPCWebToHTTP2ServerCodec . StateMachine . InboundState {
686+ fileprivate mutating func processInboundData(
687+ buffer: inout ByteBuffer
688+ ) -> GRPCWebToHTTP2ServerCodec . StateMachine . Action {
689+ if self . requestBuffer == nil {
690+ // We're not dealing with gRPC Web Text: just forward the buffer.
691+ return . fireChannelRead( . data( . init( data: . byteBuffer( buffer) ) ) )
692+ }
693+
694+ if self . requestBuffer!. readableBytes == 0 {
695+ self . requestBuffer = buffer
696+ } else {
697+ self . requestBuffer!. writeBuffer ( & buffer)
698+ }
699+
700+ let readableBytes = self . requestBuffer!. readableBytes
701+ // The length of base64 encoded data must be a multiple of 4.
702+ let bytesToRead = readableBytes - ( readableBytes % 4 )
703+
704+ let action : GRPCWebToHTTP2ServerCodec . StateMachine . Action
705+
706+ if bytesToRead > 0 ,
707+ let base64Encoded = self . requestBuffer!. readString ( length: bytesToRead) ,
708+ let base64Decoded = Data ( base64Encoded: base64Encoded) {
709+ // Recycle the input buffer and restore the request buffer.
710+ buffer. clear ( )
711+ buffer. writeContiguousBytes ( base64Decoded)
712+ action = . fireChannelRead( . data( . init( data: . byteBuffer( buffer) ) ) )
713+ } else {
714+ action = . none
715+ }
716+
717+ return action
718+ }
719+ }
720+
706721extension HTTPHeaders {
707722 fileprivate init ( hpackHeaders headers: HPACKHeaders ) {
708723 self . init ( )
0 commit comments