@@ -198,9 +198,7 @@ loop:
198
198
break loop
199
199
}
200
200
}
201
-
202
- // Mark ourselved as Leaving so no more samples are send to us.
203
- i .changeState (ring .LEAVING )
201
+ i .changeState (ring .PREPARING_TO_LEAVE )
204
202
205
203
// Do the transferring / flushing on a background goroutine so we can continue
206
204
// to heartbeat to consul.
@@ -318,14 +316,16 @@ func (i *Ingester) updateConsul() error {
318
316
}
319
317
320
318
// changeState updates consul with state transitions for us. NB this must be
321
- // called from loop()! Use ChangeState for calls from outside of loop().
319
+ // called from loop()!
320
+ // Use ChangeState for calls from outside of loop() (unless the loop has shut down)
322
321
func (i * Ingester ) changeState (state ring.IngesterState ) error {
323
322
// Only the following state transitions can be triggered externally
324
323
if ! ((i .state == ring .PENDING && state == ring .JOINING ) || // triggered by TransferChunks at the beginning
325
324
(i .state == ring .JOINING && state == ring .PENDING ) || // triggered by TransferChunks on failure
326
325
(i .state == ring .JOINING && state == ring .ACTIVE ) || // triggered by TransferChunks on success
327
326
(i .state == ring .PENDING && state == ring .ACTIVE ) || // triggered by autoJoin
328
- (i .state == ring .ACTIVE && state == ring .LEAVING )) { // triggered by shutdown
327
+ (i .state == ring .ACTIVE && state == ring .PREPARING_TO_LEAVE ) || // triggered by shutdown
328
+ (i .state == ring .PREPARING_TO_LEAVE && state == ring .LEAVING )) { // triggered by shutdown
329
329
return fmt .Errorf ("Changing ingester state from %v -> %v is disallowed" , i .state , state )
330
330
}
331
331
@@ -343,6 +343,10 @@ func (i *Ingester) processShutdown() {
343
343
flushRequired = false
344
344
}
345
345
}
346
+ if i .state != ring .LEAVING {
347
+ // Mark ourselved as Leaving so no more samples are send to us.
348
+ i .changeState (ring .LEAVING )
349
+ }
346
350
347
351
if flushRequired {
348
352
i .flushAllChunks ()
@@ -411,6 +415,12 @@ func (i *Ingester) transferChunks() error {
411
415
}
412
416
413
417
sentChunks .Add (float64 (len (chunks )))
418
+ if i .state != ring .LEAVING {
419
+ // Mark ourselved as Leaving so no more samples are send to us.
420
+ // We wait until we have sent the first item through the stream, so that the remote
421
+ // side has a chance to mark all the tokens for transfer.
422
+ i .changeState (ring .LEAVING )
423
+ }
414
424
}
415
425
}
416
426
0 commit comments