diff --git a/pkg/ingester/ingester_claim.go b/pkg/ingester/ingester_claim.go index 66019a03ba4..55dc996b97a 100644 --- a/pkg/ingester/ingester_claim.go +++ b/pkg/ingester/ingester_claim.go @@ -98,10 +98,6 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e sentChunks.Add(float64(len(descs))) } - if err := stream.SendAndClose(&client.TransferChunksResponse{}); err != nil { - return err - } - if err := i.ClaimTokensFor(fromIngesterID); err != nil { return err } @@ -114,6 +110,13 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e } i.userStates = userStates + // Close the stream last, as this is what tells the "from" ingester that + // it's OK to shut down. + if err := stream.SendAndClose(&client.TransferChunksResponse{}); err != nil { + level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "ingester", fromIngesterID, "err", err) + return err + } + level.Info(util.Logger).Log("msg", "Successfully transferred chunks to ingester", "ingester", fromIngesterID) return nil } diff --git a/pkg/ingester/ingester_lifecycle.go b/pkg/ingester/ingester_lifecycle.go index 03096ff061e..87aa7a2e047 100644 --- a/pkg/ingester/ingester_lifecycle.go +++ b/pkg/ingester/ingester_lifecycle.go @@ -425,11 +425,12 @@ func (i *Ingester) findTargetIngester() (*ring.IngesterDesc, error) { for { ingester, err := findIngester() if err != nil { - level.Error(util.Logger).Log("msg", "Error looking for pending ingester: %v", err) + level.Debug(util.Logger).Log("msg", "Error looking for pending ingester", "err", err) if time.Now().Before(deadline) { time.Sleep(i.cfg.SearchPendingFor / pendingSearchIterations) continue } else { + level.Warn(util.Logger).Log("msg", "Could not find pending ingester before deadline", "err", err) return nil, err } } diff --git a/pkg/ring/consul_client.go b/pkg/ring/consul_client.go index 3f81685fe41..bebdf2d1d5b 100644 --- a/pkg/ring/consul_client.go +++ b/pkg/ring/consul_client.go @@ -178,11 +178,11 @@ func (c *consulClient) CAS(key string, f CASCallback) error { ModifyIndex: index, }, writeOptions) if err != nil { - level.Error(util.Logger).Log("msg", "error CASing", "ley", key, "err", err) + level.Error(util.Logger).Log("msg", "error CASing", "key", key, "err", err) continue } if !ok { - level.Error(util.Logger).Log("msg", "error CASing, trying again", "key", key, "index", index) + level.Debug(util.Logger).Log("msg", "error CASing, trying again", "key", key, "index", index) continue } return nil