Skip to content

Commit af27c6f

Browse files
authored
Merge pull request #955 from grafana/954-transfer-zero-tokens
Don't succeed a transfer if no series are received.
2 parents 0eab06a + d9c8377 commit af27c6f

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

pkg/ingester/lifecycle_test.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,38 @@ func TestIngesterTransfer(t *testing.T) {
150150
}, response)
151151
}
152152

153+
func TestIngesterBadTransfer(t *testing.T) {
154+
// Start ingester in PENDING.
155+
cfg := defaultIngesterTestConfig()
156+
cfg.LifecyclerConfig.ID = "ingester1"
157+
cfg.LifecyclerConfig.Addr = "ingester1"
158+
cfg.LifecyclerConfig.ClaimOnRollout = true
159+
cfg.LifecyclerConfig.JoinAfter = 100 * time.Second
160+
cfg.SearchPendingFor = 1 * time.Second
161+
ing, err := New(cfg, nil)
162+
require.NoError(t, err)
163+
164+
poll(t, 100*time.Millisecond, ring.PENDING, func() interface{} {
165+
return ing.lifecycler.GetState()
166+
})
167+
168+
// Now transfer 0 series to this ingester, ensure it errors.
169+
client := ingesterClientAdapater{ingester: ing}
170+
stream, err := client.TransferChunks(context.Background())
171+
require.NoError(t, err)
172+
_, err = stream.CloseAndRecv()
173+
require.Error(t, err)
174+
175+
// Check the ingester is still waiting.
176+
require.Equal(t, ring.PENDING, ing.lifecycler.GetState())
177+
}
178+
153179
func numTokens(c ring.KVClient, name string) int {
154180
ringDesc, err := c.Get(context.Background(), ring.ConsulKey)
155181
if err != nil {
156182
level.Error(util.Logger).Log("msg", "error reading consul", "err", err)
157183
return 0
158184
}
159-
160185
count := 0
161186
for _, token := range ringDesc.(*ring.Desc).Tokens {
162187
if token.Ingester == name {

pkg/ingester/transfer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
6666

6767
userStates := newUserStates(i.limits, i.cfg)
6868
fromIngesterID := ""
69+
seriesReceived := 0
6970

7071
for {
7172
wireSeries, err := stream.Recv()
@@ -102,10 +103,21 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
102103
return err
103104
}
104105

106+
seriesReceived++
105107
memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
106108
receivedChunks.Add(float64(len(descs)))
107109
}
108110

111+
if fromIngesterID == "" {
112+
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
113+
return fmt.Errorf("no ingester id")
114+
}
115+
116+
if seriesReceived == 0 {
117+
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
118+
return fmt.Errorf("no series")
119+
}
120+
109121
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
110122
return err
111123
}

0 commit comments

Comments
 (0)