Skip to content

Commit 0bcbced

Browse files
authored
Fix read reader multiple times in QFE (#5202)
* fix read reader multiple times in QFE Signed-off-by: Ben Ye <[email protected]> * write to error Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent 64b6c2b commit 0bcbced

File tree

3 files changed

+49
-2
lines changed

3 files changed

+49
-2
lines changed

integration/e2ecortex/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
122122
return value, err
123123
}
124124

125-
// Query runs a query range.
125+
// QueryRange runs a query range.
126126
func (c *Client) QueryRange(query string, start, end time.Time, step time.Duration) (model.Value, error) {
127127
value, _, err := c.querierClient.QueryRange(context.Background(), query, promv1.Range{
128128
Start: start,

integration/query_frontend_test.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,46 @@ func TestQueryFrontendTLSWithBlocksStorageViaFlags(t *testing.T) {
154154
})
155155
}
156156

157+
func TestQueryFrontendWithVerticalSharding(t *testing.T) {
158+
runQueryFrontendTest(t, queryFrontendTestConfig{
159+
testMissingMetricName: false,
160+
querySchedulerEnabled: false,
161+
queryStatsEnabled: true,
162+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
163+
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
164+
165+
minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
166+
require.NoError(t, s.StartAndWaitReady(minio))
167+
168+
// Enable vertical sharding.
169+
flags = mergeFlags(e2e.EmptyFlags(), map[string]string{
170+
"-frontend.query-vertical-shard-size": "2",
171+
})
172+
return cortexConfigFile, flags
173+
},
174+
})
175+
}
176+
177+
func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
178+
runQueryFrontendTest(t, queryFrontendTestConfig{
179+
testMissingMetricName: false,
180+
querySchedulerEnabled: true,
181+
queryStatsEnabled: true,
182+
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
183+
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
184+
185+
minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"])
186+
require.NoError(t, s.StartAndWaitReady(minio))
187+
188+
// Enable vertical sharding.
189+
flags = mergeFlags(e2e.EmptyFlags(), map[string]string{
190+
"-frontend.query-vertical-shard-size": "2",
191+
})
192+
return cortexConfigFile, flags
193+
},
194+
})
195+
}
196+
157197
func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
158198
const numUsers = 10
159199
const numQueriesPerUser = 10
@@ -304,7 +344,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
304344

305345
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))
306346

307-
// The number of received request is greater then the query requests because include
347+
// The number of received request is greater than the query requests because include
308348
// requests to /metrics and /ready.
309349
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
310350
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))

pkg/frontend/transport/handler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,13 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
137137
var buf bytes.Buffer
138138
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
139139
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
140+
// We parse form here so that we can use buf as body, in order to
141+
// prevent https://github.com/cortexproject/cortex/issues/5201.
142+
if err := r.ParseForm(); err != nil {
143+
writeError(w, err)
144+
return
145+
}
146+
r.Body = io.NopCloser(&buf)
140147

141148
startTime := time.Now()
142149
resp, err := f.roundTripper.RoundTrip(r)

0 commit comments

Comments
 (0)