Skip to content

Commit e70658e

Browse files
Tpuljakpellareddmathieu
authored
fix: support getBody in otelploghttp (#8096)
Based on #7931 Adding another `getBody` PR to the chain 😄. This one is harder to catch if it's a problem since logs aren't being sent in case of error. --------- Signed-off-by: Toma Puljak <toma.puljak@hotmail.com> Co-authored-by: Robert Pająk <pellared@hotmail.com> Co-authored-by: Damien Mathieu <42@dmathieu.com>
1 parent 4afe468 commit e70658e

File tree

3 files changed

+63
-0
lines changed

3 files changed

+63
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
3535
- Return spec-compliant `TraceIdRatioBased` description. This is a breaking behavioral change, but it is necessary to
3636
make the implementation [spec-compliant](https://opentelemetry.io/docs/specs/otel/trace/sdk/#traceidratiobased). (#8027)
3737
- Fix a race condition in `go.opentelemetry.io/otel/sdk/metric` where the lastvalue aggregation could collect the value 0 even when no zero-value measurements were recorded. (#8056)
38+
- Fix missing `request.GetBody` in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp` to correctly handle HTTP2 GOAWAY frame. (#8096)
3839

3940
<!-- Released section -->
4041
<!-- Don't change this section unless doing release -->

exporters/otlp/otlplog/otlploghttp/client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, erro
263263
case NoCompression:
264264
r.ContentLength = int64(len(body))
265265
req.bodyReader = bodyReader(body)
266+
req.GetBody = bodyReaderErr(body)
266267
case GzipCompression:
267268
// Ensure the content length is not used.
268269
r.ContentLength = -1
@@ -283,6 +284,7 @@ func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, erro
283284
}
284285

285286
req.bodyReader = bodyReader(b.Bytes())
287+
req.GetBody = bodyReaderErr(body)
286288
}
287289

288290
return req, nil
@@ -295,6 +297,13 @@ func bodyReader(buf []byte) func() io.ReadCloser {
295297
}
296298
}
297299

300+
// bodyReaderErr returns a closure returning a new reader for buf.
301+
func bodyReaderErr(buf []byte) func() (io.ReadCloser, error) {
302+
return func() (io.ReadCloser, error) {
303+
return io.NopCloser(bytes.NewReader(buf)), nil
304+
}
305+
}
306+
298307
// request wraps an http.Request with a resettable body reader.
299308
type request struct {
300309
*http.Request

exporters/otlp/otlplog/otlploghttp/client_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"math/big"
2121
"net"
2222
"net/http"
23+
"net/http/httptest"
2324
"net/url"
2425
"strings"
2526
"sync"
@@ -833,6 +834,58 @@ func TestConfig(t *testing.T) {
833834
})
834835
}
835836

837+
func TestGetBodyCalledOnRedirect(t *testing.T) {
838+
// Test that req.GetBody is set correctly, allowing the HTTP transport
839+
// to re-send the body on 307 redirects.
840+
var mu sync.Mutex
841+
var requestBodies [][]byte
842+
843+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
844+
body, err := io.ReadAll(r.Body)
845+
if err != nil {
846+
w.WriteHeader(http.StatusInternalServerError)
847+
return
848+
}
849+
850+
mu.Lock()
851+
requestBodies = append(requestBodies, body)
852+
isFirstRequest := len(requestBodies) == 1
853+
mu.Unlock()
854+
855+
if isFirstRequest {
856+
w.Header().Set("Location", "/v1/logs/final")
857+
w.WriteHeader(http.StatusTemporaryRedirect)
858+
return
859+
}
860+
861+
w.Header().Set("Content-Type", "application/x-protobuf")
862+
w.WriteHeader(http.StatusOK)
863+
})
864+
865+
server := httptest.NewServer(handler)
866+
defer server.Close()
867+
868+
opts := []Option{WithEndpoint(server.Listener.Addr().String()), WithInsecure()}
869+
cfg := newConfig(opts)
870+
client, err := newHTTPClient(t.Context(), cfg)
871+
require.NoError(t, err)
872+
873+
exporter, err := newExporter(client, cfg)
874+
require.NoError(t, err)
875+
ctx := t.Context()
876+
defer func() { _ = exporter.Shutdown(ctx) }()
877+
878+
err = exporter.Export(ctx, make([]log.Record, 1))
879+
require.NoError(t, err)
880+
881+
mu.Lock()
882+
defer mu.Unlock()
883+
884+
require.Len(t, requestBodies, 2, "expected 2 requests (original + redirect)")
885+
assert.NotEmpty(t, requestBodies[0], "original request body should not be empty")
886+
assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original")
887+
}
888+
836889
// SetExporterID sets the exporter ID counter to v and returns the previous
837890
// value.
838891
//

0 commit comments

Comments
 (0)