diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3a4cdc7c1f..c128c77cbc 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -20,6 +20,7 @@ import ( "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" cortex_chunk "github.com/weaveworks/cortex/pkg/chunk" "github.com/weaveworks/cortex/pkg/ingester/client" "github.com/weaveworks/cortex/pkg/ring" @@ -163,6 +164,9 @@ type Ingester struct { startTime time.Time ready bool + joiningSampleQueueLock sync.Mutex + joiningSampleQueue []userSamples + // One queue per flush thread. Fingerprint is used to // pick a queue. flushQueues []*util.PriorityQueue @@ -185,6 +189,11 @@ type ChunkStore interface { Put(ctx context.Context, chunks []cortex_chunk.Chunk) error } +type userSamples struct { + userID string + samples []model.Sample +} + // New constructs a new Ingester. func New(cfg Config, chunkStore ChunkStore) (*Ingester, error) { if cfg.FlushCheckPeriod == 0 { @@ -249,6 +258,8 @@ func New(cfg Config, chunkStore ChunkStore) (*Ingester, error) { state: ring.PENDING, startTime: time.Now(), + joiningSampleQueue: make([]userSamples, 0), + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes), ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{ @@ -304,6 +315,20 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. var lastPartialErr error samples := client.FromWriteRequest(req) + if i.state == ring.JOINING { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + i.joiningSampleQueueLock.Lock() + defer i.joiningSampleQueueLock.Unlock() + if i.state == ring.JOINING { // Confirm we are still joining, after we get the lock + i.joiningSampleQueue = append( + i.joiningSampleQueue, + userSamples{userID: userID, samples: samples}) + return &client.WriteResponse{}, nil + } + } samples: for j := range samples { if err := i.append(ctx, &samples[j]); err != nil { diff --git a/pkg/ingester/ingester_claim.go b/pkg/ingester/ingester_claim.go index 2b74db14af..cedf12117c 100644 --- a/pkg/ingester/ingester_claim.go +++ b/pkg/ingester/ingester_claim.go @@ -74,6 +74,10 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e if fromIngesterID == "" { fromIngesterID = wireSeries.FromIngesterId level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) + + if err := i.PreClaimTokensFor(fromIngesterID); err != nil { + return err + } } metric := client.FromLabelPairs(wireSeries.Labels) userCtx := user.InjectOrgID(stream.Context(), wireSeries.UserId) @@ -102,13 +106,31 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e return err } + i.joiningSampleQueueLock.Lock() i.userStatesMtx.Lock() - defer i.userStatesMtx.Unlock() + i.userStates = userStates + i.userStatesMtx.Unlock() + + level.Info(util.Logger).Log("msg", "Importing sample queue") + for j := range i.joiningSampleQueue { + userSamples := &i.joiningSampleQueue[j] + userCtx := user.InjectOrgID(stream.Context(), userSamples.userID) + for k := range userSamples.samples { + sample := &userSamples.samples[k] + err := i.append(userCtx, sample) + if err != nil { + level.Error(util.Logger).Log("msg", "Error importing queued sample", "sample", sample, "err", err) + // Just continue, so we keep as many samples as possible + } + } + } + i.joiningSampleQueue = []userSamples{} if err := i.ChangeState(ring.ACTIVE); err != nil { + i.joiningSampleQueueLock.Unlock() return err } - i.userStates = userStates + i.joiningSampleQueueLock.Unlock() // Close the stream last, as this is what tells the "from" ingester that // it's OK to shut down. diff --git a/pkg/ingester/ingester_lifecycle.go b/pkg/ingester/ingester_lifecycle.go index 91de2822d0..d62eccbc20 100644 --- a/pkg/ingester/ingester_lifecycle.go +++ b/pkg/ingester/ingester_lifecycle.go @@ -81,8 +81,21 @@ func (i *Ingester) ChangeState(state ring.IngesterState) error { return <-err } +// PreClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. +func (i *Ingester) PreClaimTokensFor(ingesterID string) error { + return i.updateTokensFor(ingesterID, func(ringDesc *ring.Desc) []uint32 { + return ringDesc.PreClaimTokens(ingesterID, i.id) + }) +} + // ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. func (i *Ingester) ClaimTokensFor(ingesterID string) error { + return i.updateTokensFor(ingesterID, func(ringDesc *ring.Desc) []uint32 { + return ringDesc.ClaimTokens(ingesterID, i.id) + }) +} + +func (i *Ingester) updateTokensFor(ingesterID string, updater func(*ring.Desc) []uint32) error { err := make(chan error) i.actorChan <- func() { @@ -94,7 +107,7 @@ func (i *Ingester) ClaimTokensFor(ingesterID string) error { return nil, false, fmt.Errorf("Cannot claim tokens in an empty ring") } - tokens = ringDesc.ClaimTokens(ingesterID, i.id) + tokens = updater(ringDesc) return ringDesc, true, nil } @@ -185,9 +198,7 @@ loop: break loop } } - - // Mark ourselved as Leaving so no more samples are send to us. - i.changeState(ring.LEAVING) + i.changeState(ring.PREPARING_TO_LEAVE) // Do the transferring / flushing on a background goroutine so we can continue // to heartbeat to consul. @@ -305,14 +316,16 @@ func (i *Ingester) updateConsul() error { } // changeState updates consul with state transitions for us. NB this must be -// called from loop()! Use ChangeState for calls from outside of loop(). +// called from loop()! +// Use ChangeState for calls from outside of loop() (unless the loop has shut down) func (i *Ingester) changeState(state ring.IngesterState) error { // Only the following state transitions can be triggered externally if !((i.state == ring.PENDING && state == ring.JOINING) || // triggered by TransferChunks at the beginning (i.state == ring.JOINING && state == ring.PENDING) || // triggered by TransferChunks on failure (i.state == ring.JOINING && state == ring.ACTIVE) || // triggered by TransferChunks on success (i.state == ring.PENDING && state == ring.ACTIVE) || // triggered by autoJoin - (i.state == ring.ACTIVE && state == ring.LEAVING)) { // triggered by shutdown + (i.state == ring.ACTIVE && state == ring.PREPARING_TO_LEAVE) || // triggered by shutdown + (i.state == ring.PREPARING_TO_LEAVE && state == ring.LEAVING)) { // triggered by shutdown return fmt.Errorf("Changing ingester state from %v -> %v is disallowed", i.state, state) } @@ -330,6 +343,10 @@ func (i *Ingester) processShutdown() { flushRequired = false } } + if i.state != ring.LEAVING { + // Mark ourselved as Leaving so no more samples are send to us. + i.changeState(ring.LEAVING) + } if flushRequired { i.flushAllChunks() @@ -398,6 +415,12 @@ func (i *Ingester) transferChunks() error { } sentChunks.Add(float64(len(chunks))) + if i.state != ring.LEAVING { + // Mark ourselved as Leaving so no more samples are send to us. + // We wait until we have sent the first item through the stream, so that the remote + // side has a chance to mark all the tokens for transfer. + i.changeState(ring.LEAVING) + } } } diff --git a/pkg/ingester/ingester_lifecycle_test.go b/pkg/ingester/ingester_lifecycle_test.go index 6248f1b323..0e5ba458d1 100644 --- a/pkg/ingester/ingester_lifecycle_test.go +++ b/pkg/ingester/ingester_lifecycle_test.go @@ -2,11 +2,13 @@ package ingester import ( "io" + "os" "reflect" "runtime" "testing" "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,6 +29,10 @@ const ( aLongTime = 60 * time.Second ) +func init() { + util.Logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) +} + func defaultIngesterTestConfig() Config { consul := ring.NewInMemoryKVClient() return Config{ @@ -124,6 +130,20 @@ func TestIngesterTransfer(t *testing.T) { }, nil } + // Add a sample to check that they are dequeued correctly + ing2.joiningSampleQueue = []userSamples{ + { + userID: userID, + samples: []model.Sample{ + { + Metric: m, + Value: model.SampleValue(798), + Timestamp: model.TimeFromUnix(124), + }, + }, + }, + } + // Now stop the first ingester ing1.Shutdown() @@ -145,6 +165,10 @@ func TestIngesterTransfer(t *testing.T) { Value: 456, TimestampMs: 123000, }, + { + Value: 798, + TimestampMs: 124000, + }, }, }, }, diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 9d6c971691..08bf9b512e 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -59,13 +59,27 @@ func (d *Desc) RemoveIngester(id string) { d.Tokens = output } +// PreClaimTokens marks all the tokens from one ingester in preparation for transferring them to +// another, returning the modified tokens. +func (d *Desc) PreClaimTokens(from, to string) []uint32 { + var result []uint32 + for i := 0; i < len(d.Tokens); i++ { + if d.Tokens[i].Ingester == from { + d.Tokens[i].NextIngester = to + result = append(result, d.Tokens[i].Token) + } + } + return result +} + // ClaimTokens transfers all the tokens from one ingester to another, -// returning the claimed token. +// returning the claimed tokens. func (d *Desc) ClaimTokens(from, to string) []uint32 { var result []uint32 for i := 0; i < len(d.Tokens); i++ { if d.Tokens[i].Ingester == from { d.Tokens[i].Ingester = to + d.Tokens[i].NextIngester = "" result = append(result, d.Tokens[i].Token) } } diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 2e0c8d5f4c..442da225a9 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -48,7 +48,7 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) ( // IsHealthy checks whether an ingester appears to be alive and heartbeating func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool { - if op == Write && ingester.State != ACTIVE { + if op == Write && !(ingester.State == ACTIVE || ingester.State == PREPARING_TO_LEAVE || ingester.State == JOINING) { return false } else if op == Read && ingester.State == JOINING { return false diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index bd9c08f22b..04fddff836 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -213,15 +213,22 @@ func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) { distinctHosts[token.Ingester] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] - // We do not want to Write to Ingesters that are not ACTIVE, but we do want - // to write the extra replica somewhere. So we increase the size of the set - // of replicas for the key. This means we have to also increase the - // size of the replica set for read, but we can read from Leaving ingesters, - // so don't skip it in this case. + // We want to only Write to Ingesters that are ACTIVE (or PREPARING_TO_LEAVE), however + // we still write to the same number of replicas. So we increase the size of the set of + // replicas for the key. + // To match this, we have to also increase the size of the replica set for read, but we + // can still read from ingesters which are PREPARING_TO_LEAVE or LEAVING so don't skip it + // in this case. // NB dead ingester will be filtered later (by replication_strategy.go). - if op == Write && ingester.State != ACTIVE { - n++ - } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { + if op == Write && !(ingester.State == ACTIVE || ingester.State == PREPARING_TO_LEAVE) { + // If a token is being transferred to a new ingester, we can send writes to that ingester instead + // where they will be queued until the transfer is complete + if nextIngester := r.ringDesc.Ingesters[token.NextIngester]; nextIngester != nil && nextIngester.State == JOINING { + ingester = nextIngester + } else { + n++ + } + } else if op == Read && !(ingester.State == ACTIVE || ingester.State == PREPARING_TO_LEAVE || ingester.State == LEAVING) { n++ } diff --git a/pkg/ring/ring.proto b/pkg/ring/ring.proto index a54b98b79b..2ea8260ccd 100644 --- a/pkg/ring/ring.proto +++ b/pkg/ring/ring.proto @@ -18,12 +18,36 @@ message IngesterDesc { message TokenDesc { uint32 token = 1; string ingester = 2; + string nextIngester = 3; } enum IngesterState { + // ACTIVE: The ingester under normal operation + // Will be sent read and write requests ACTIVE = 0; + + // LEAVING: End state for ingesters shutting down. + // The ingester is shutting down and may be either: + // - transferring chunks to another joining ingester, or + // - flushing all in-memory chunks to persistent storage + // Will only be sent read requests. LEAVING = 1; + // PENDING: Start state for a new ingester. + // The ingester is waiting to join the ring, it will either: + // - receive an incoming chunk transfer request from a leaving ingester (-> JOINING) + // - after a timeout, create new tokens and join the ring (-> ACTIVE) + // Will not be used for reads or writes. PENDING = 2; + + // JOINING: The injester was recently waiting to join the ring (PENDING ->) and is now + // handling an incoming chunk transfer request from a leaving ingester and will become active + // when transfer completes (-> ACTIVE). + // Will be sent write requests for the tokens it is receiving from the leaving ingester JOINING = 3; + + // PREPARING_TO_LEAVE: The ingester is about to shut down, and is looking for a replacement + // to receive its in-memory chunks, and will soon leave the ring (-> LEAVING). + // Will only be sent write requests + PREPARING_TO_LEAVE = 4; }