@@ -2,6 +2,7 @@ package client
22
33import (
44 "context"
5+ "fmt"
56 "net/http/httptest"
67 "strconv"
78 "testing"
@@ -11,6 +12,7 @@ import (
1112 "github.com/stretchr/testify/assert"
1213 "github.com/stretchr/testify/mock"
1314 "github.com/stretchr/testify/require"
15+ "github.com/weaveworks/common/user"
1416 "google.golang.org/grpc"
1517
1618 "github.com/cortexproject/cortex/pkg/cortexpb"
@@ -155,10 +157,8 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) {
155157 ctx , cancel := context .WithCancel (context .Background ())
156158 streamChan := make (chan * streamWriteJob , 1 )
157159
158- jobCtx , jobCancel := context .WithCancel (context .Background ())
159160 job := & streamWriteJob {
160- ctx : jobCtx ,
161- cancel : jobCancel ,
161+ sendDone : make (chan struct {}),
162162 }
163163 streamChan <- job
164164
@@ -178,6 +178,14 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) {
178178 _ , ok := <- client .streamPushChan
179179 assert .False (t , ok , "stream channel should be closed" )
180180
181+ // Verify job.sendDone was closed by Close()
182+ select {
183+ case <- job .sendDone :
184+ // Success - sendDone was closed
185+ case <- time .After (100 * time .Millisecond ):
186+ t .Error ("job.sendDone was not closed" )
187+ }
188+
181189 // Verify context is cancelled
182190 select {
183191 case <- client .streamCtx .Done ():
@@ -191,21 +199,11 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
191199 ctx , cancel := context .WithCancel (context .Background ())
192200 streamChan := make (chan * streamWriteJob , 2 )
193201
194- job1Cancelled := false
195- job2Cancelled := false
202+ job1Done := make ( chan struct {})
203+ job2Done := make ( chan struct {})
196204
197- job1 := & streamWriteJob {
198- ctx : context .Background (),
199- cancel : func () {
200- job1Cancelled = true
201- },
202- }
203- job2 := & streamWriteJob {
204- ctx : context .Background (),
205- cancel : func () {
206- job2Cancelled = true
207- },
208- }
205+ job1 := & streamWriteJob {sendDone : job1Done }
206+ job2 := & streamWriteJob {sendDone : job2Done }
209207 streamChan <- job1
210208 streamChan <- job2
211209
@@ -230,9 +228,17 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
230228 t .Error ("stream context was not cancelled" )
231229 }
232230
233- // Verify jobs were cancelled
234- assert .True (t , job1Cancelled , "job1 should have been cancelled" )
235- assert .True (t , job2Cancelled , "job2 should have been cancelled" )
231+ // Verify jobs were closed (sendDone channels closed)
232+ select {
233+ case <- job1Done :
234+ case <- time .After (500 * time .Millisecond ):
235+ t .Error ("job1.sendDone was not closed" )
236+ }
237+ select {
238+ case <- job2Done :
239+ case <- time .After (500 * time .Millisecond ):
240+ t .Error ("job2.sendDone was not closed" )
241+ }
236242}
237243
238244type mockClientStream struct {
@@ -249,6 +255,122 @@ func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) {
249255 return & cortexpb.WriteResponse {}, nil
250256}
251257
258+ // slowSendStream simulates a slow gRPC stream.
259+ // Send() pre-computes the buffer size (mirroring gRPC codec step 1),
260+ // sleeps for sendDelay so the caller's context deadline can fire first,
261+ // then calls MarshalToSizedBuffer (gRPC codec step 2).
262+ type slowSendStream struct {
263+ grpc.ClientStream
264+ sendDelay time.Duration
265+ panicCh chan any
266+ }
267+
268+ func (s * slowSendStream ) Send (req * cortexpb.StreamWriteRequest ) (retErr error ) {
269+ defer func () {
270+ if r := recover (); r != nil {
271+ s .panicCh <- r // forward the panic value to the test
272+ } else {
273+ s .panicCh <- nil
274+ }
275+ }()
276+
277+ // gRPC codec pre-computes buffer size.
278+ size := req .Size ()
279+ buf := make ([]byte , size )
280+
281+ // Sleep so the caller's ctx deadline fires and PushStreamConnection returns.
282+ // After the sleep the caller may have grown the timeseries.
283+ time .Sleep (s .sendDelay )
284+
285+ // marshal into the pre-allocated buffer.
286+ // Panics when actual data > size (the bug).
287+ _ , err := req .MarshalToSizedBuffer (buf )
288+ return err
289+ }
290+
291+ func (s * slowSendStream ) Recv () (* cortexpb.WriteResponse , error ) {
292+ return & cortexpb.WriteResponse {}, nil
293+ }
294+
295+ // TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows is an
296+ // end-to-end regression test for the distributor panic.
297+ func TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows (t * testing.T ) {
298+ const (
299+ // ctxDeadline mirrors distributor.remote-timeout.
300+ // Kept short here so the test completes quickly.
301+ ctxDeadline = 20 * time .Millisecond
302+ // sendDelay must exceed ctxDeadline so the deadline fires while Send()
303+ // is still sleeping between Size() and MarshalToSizedBuffer().
304+ sendDelay = 200 * time .Millisecond
305+ )
306+
307+ ts := cortexpb .TimeseriesFromPool ()
308+ ts .Labels = append (ts .Labels ,
309+ cortexpb.LabelAdapter {Name : "__name__" , Value : "test_metric" },
310+ cortexpb.LabelAdapter {Name : "job" , Value : "test" },
311+ )
312+ ts .Samples = append (ts .Samples , cortexpb.Sample {Value : 1.0 , TimestampMs : 1000 })
313+
314+ timeseries := cortexpb .PreallocTimeseriesSliceFromPool ()
315+ timeseries = append (timeseries , cortexpb.PreallocTimeseries {TimeSeries : ts })
316+
317+ writeReq := & cortexpb.WriteRequest {Timeseries : timeseries }
318+
319+ panicCh := make (chan any , 1 )
320+ stream := & slowSendStream {
321+ sendDelay : sendDelay ,
322+ panicCh : panicCh ,
323+ }
324+
325+ mockIng := & mockIngester {}
326+ mockIng .On ("PushStream" , mock .Anything , mock .Anything ).Return (stream , nil )
327+
328+ streamCtx , streamCancel := context .WithCancel (context .Background ())
329+ defer streamCancel ()
330+
331+ client := & closableHealthAndIngesterClient {
332+ IngesterClient : mockIng ,
333+ conn : & mockClientConn {},
334+ addr : "test-addr" ,
335+ inflightPushRequests : prometheus .NewGaugeVec (prometheus.GaugeOpts {}, []string {"ingester" }),
336+ streamCtx : streamCtx ,
337+ streamCancel : streamCancel ,
338+ streamPushChan : make (chan * streamWriteJob , 1 ),
339+ }
340+
341+ workerCtx := user .InjectOrgID (streamCtx , "test-worker" )
342+ require .NoError (t , client .worker (workerCtx ))
343+
344+ // Call PushStreamConnection with a context that expires before Send() finishes.
345+ pushCtx , pushCancel := context .WithTimeout (
346+ user .InjectOrgID (context .Background (), "test-tenant" ),
347+ ctxDeadline ,
348+ )
349+ defer pushCancel ()
350+
351+ // PushStreamConnection blocks until Send()+Recv() complete.
352+ _ , pushErr := client .PushStreamConnection (pushCtx , writeReq )
353+ require .ErrorIs (t , pushErr , context .DeadlineExceeded ,
354+ "caller should observe its own context deadline" )
355+
356+ for i := range 100 {
357+ ts .Labels = append (ts .Labels , cortexpb.LabelAdapter {
358+ Name : fmt .Sprintf ("extra_label_%d" , i ),
359+ Value : fmt .Sprintf ("extra_value_%d" , i ),
360+ })
361+ }
362+
363+ // No panic expected: Send() already completed before labels were appended.
364+ select {
365+ case panicVal := <- panicCh :
366+ require .Nil (t , panicVal ,
367+ "unexpected panic in MarshalToSizedBuffer: the fix should prevent " +
368+ "timeseries from being reused while Send() is still marshalling" )
369+ case <- time .After (sendDelay + time .Second ):
370+ t .Fatal ("timed out waiting for Send() to complete" )
371+ }
372+ }
373+
252374func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose (t * testing.T ) {
253375 ctx , cancel := context .WithCancel (context .Background ())
254376 streamChan := make (chan * streamWriteJob )
0 commit comments