Skip to content

Map HTTP/2 RST_STREAM codes back to RPC codes #321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func (d *duplexHTTPCall) Read(data []byte) (int, error) {
if d.response == nil {
return 0, fmt.Errorf("nil response from %v", d.request.URL)
}
return d.response.Body.Read(data)
n, err := d.response.Body.Read(data)
return n, wrapIfRSTError(err)
}

func (d *duplexHTTPCall) CloseRead() error {
Expand All @@ -162,9 +163,9 @@ func (d *duplexHTTPCall) CloseRead() error {
return nil
}
if err := discard(d.response.Body); err != nil {
return err
return wrapIfRSTError(err)
}
return d.response.Body.Close()
return wrapIfRSTError(d.response.Body.Close())
}

// ResponseStatusCode is the response's HTTP status code.
Expand Down Expand Up @@ -245,6 +246,7 @@ func (d *duplexHTTPCall) makeRequest() {
err = wrapIfContextError(err)
err = wrapIfLikelyH2CNotConfiguredError(d.request, err)
err = wrapIfLikelyWithGRPCNotUsedError(err)
err = wrapIfRSTError(err)
if _, ok := asError(err); !ok {
err = NewError(CodeUnavailable, err)
}
Expand Down
57 changes: 57 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strings"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -247,3 +248,59 @@ func wrapIfLikelyWithGRPCNotUsedError(err error) error {
}
return err
}

// HTTP/2 has its own set of error codes, which it sends in RST_STREAM frames.
// When the server sends one of these errors, we should map it back into our
// RPC error codes following
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#http2-transport-mapping.
//
// This would be vastly simpler if we were using x/net/http2 directly, since
// the StreamError type is exported. When x/net/http2 gets vendored into
// net/http, though, all these types become unexported...so we're left with
// string munging.
func wrapIfRSTError(err error) error {
const (
streamErrPrefix = "stream error: "
fromPeerSuffix = "; received from peer"
)
if err == nil {
return nil
}
if _, ok := asError(err); ok {
return err
}
if urlErr := new(url.Error); errors.As(err, &urlErr) {
// If we get an RST_STREAM error from http.Client.Do, it's wrapped in a
// *url.Error.
err = urlErr.Unwrap()
}
msg := err.Error()
if !strings.HasPrefix(msg, streamErrPrefix) {
return err
}
if !strings.HasSuffix(msg, fromPeerSuffix) {
return err
}
msg = strings.TrimSuffix(msg, fromPeerSuffix)
i := strings.LastIndex(msg, ";")
if i < 0 || i >= len(msg)-1 {
return err
}
msg = msg[i+1:]
msg = strings.TrimSpace(msg)
switch msg {
case "NO_ERROR", "PROTOCOL_ERROR", "INTERNAL_ERROR", "FLOW_CONTROL_ERROR",
"SETTINGS_TIMEOUT", "FRAME_SIZE_ERROR", "COMPRESSION_ERROR", "CONNECT_ERROR":
return NewError(CodeInternal, err)
case "REFUSED_STREAM":
return NewError(CodeUnavailable, err)
case "CANCEL":
return NewError(CodeCanceled, err)
case "ENHANCE_YOUR_CALM":
return NewError(CodeResourceExhausted, fmt.Errorf("bandwidth exhausted: %w", err))
case "INADEQUATE_SECURITY":
return NewError(CodePermissionDenied, fmt.Errorf("transport protocol insecure: %w", err))
default:
return err
}
}
18 changes: 11 additions & 7 deletions recover_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ func (s *panicPingServer) Ping(
}

func (s *panicPingServer) CountUp(
context.Context,
*connect.Request[pingv1.CountUpRequest],
*connect.ServerStream[pingv1.CountUpResponse],
_ context.Context,
_ *connect.Request[pingv1.CountUpRequest],
stream *connect.ServerStream[pingv1.CountUpResponse],
) error {
if err := stream.Send(&pingv1.CountUpResponse{}); err != nil {
return err
}
panic(s.panicWith) // nolint:forbidigo
}

Expand All @@ -60,14 +63,15 @@ func TestWithRecover(t *testing.T) {
}
assertNotHandled := func(err error) {
t.Helper()
if err != nil {
assert.NotEqual(t, connect.CodeOf(err), connect.CodeFailedPrecondition)
}
// When HTTP/2 handlers panic, net/http sends an RST_STREAM frame with code
// INTERNAL_ERROR. We should be mapping this back to CodeInternal.
assert.Equal(t, connect.CodeOf(err), connect.CodeInternal)
}
drainStream := func(stream *connect.ServerStreamForClient[pingv1.CountUpResponse]) error {
t.Helper()
defer stream.Close()
assert.False(t, stream.Receive())
assert.True(t, stream.Receive()) // expect one response msg
assert.False(t, stream.Receive()) // expect panic before second response msg
return stream.Err()
}
pinger := &panicPingServer{}
Expand Down