Skip to content

WIP: Send samples to joining ingester during handover #788

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
cortex_chunk "github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/ring"
Expand Down Expand Up @@ -163,6 +164,9 @@ type Ingester struct {
startTime time.Time
ready bool

joiningSampleQueueLock sync.Mutex
joiningSampleQueue []userSamples

// One queue per flush thread. Fingerprint is used to
// pick a queue.
flushQueues []*util.PriorityQueue
Expand All @@ -185,6 +189,11 @@ type ChunkStore interface {
Put(ctx context.Context, chunks []cortex_chunk.Chunk) error
}

type userSamples struct {
userID string
samples []model.Sample
}

// New constructs a new Ingester.
func New(cfg Config, chunkStore ChunkStore) (*Ingester, error) {
if cfg.FlushCheckPeriod == 0 {
Expand Down Expand Up @@ -249,6 +258,8 @@ func New(cfg Config, chunkStore ChunkStore) (*Ingester, error) {
state: ring.PENDING,
startTime: time.Now(),

joiningSampleQueue: make([]userSamples, 0),

flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),

ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -304,6 +315,20 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
var lastPartialErr error
samples := client.FromWriteRequest(req)

if i.state == ring.JOINING {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
i.joiningSampleQueueLock.Lock()
defer i.joiningSampleQueueLock.Unlock()
if i.state == ring.JOINING { // Confirm we are still joining, after we get the lock
i.joiningSampleQueue = append(
i.joiningSampleQueue,
userSamples{userID: userID, samples: samples})
return &client.WriteResponse{}, nil
}
}
samples:
for j := range samples {
if err := i.append(ctx, &samples[j]); err != nil {
Expand Down
26 changes: 24 additions & 2 deletions pkg/ingester/ingester_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
if fromIngesterID == "" {
fromIngesterID = wireSeries.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)

if err := i.PreClaimTokensFor(fromIngesterID); err != nil {
return err
}
}
metric := client.FromLabelPairs(wireSeries.Labels)
userCtx := user.InjectOrgID(stream.Context(), wireSeries.UserId)
Expand Down Expand Up @@ -102,13 +106,31 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
return err
}

i.joiningSampleQueueLock.Lock()
i.userStatesMtx.Lock()
defer i.userStatesMtx.Unlock()
i.userStates = userStates
i.userStatesMtx.Unlock()

level.Info(util.Logger).Log("msg", "Importing sample queue")
for j := range i.joiningSampleQueue {
userSamples := &i.joiningSampleQueue[j]
userCtx := user.InjectOrgID(stream.Context(), userSamples.userID)
for k := range userSamples.samples {
sample := &userSamples.samples[k]
err := i.append(userCtx, sample)
if err != nil {
level.Error(util.Logger).Log("msg", "Error importing queued sample", "sample", sample, "err", err)
// Just continue, so we keep as many samples as possible
}
}
}
i.joiningSampleQueue = []userSamples{}

if err := i.ChangeState(ring.ACTIVE); err != nil {
i.joiningSampleQueueLock.Unlock()
return err
}
i.userStates = userStates
i.joiningSampleQueueLock.Unlock()

// Close the stream last, as this is what tells the "from" ingester that
// it's OK to shut down.
Expand Down
35 changes: 29 additions & 6 deletions pkg/ingester/ingester_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,21 @@ func (i *Ingester) ChangeState(state ring.IngesterState) error {
return <-err
}

// PreClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
func (i *Ingester) PreClaimTokensFor(ingesterID string) error {
return i.updateTokensFor(ingesterID, func(ringDesc *ring.Desc) []uint32 {
return ringDesc.PreClaimTokens(ingesterID, i.id)
})
}

// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
func (i *Ingester) ClaimTokensFor(ingesterID string) error {
return i.updateTokensFor(ingesterID, func(ringDesc *ring.Desc) []uint32 {
return ringDesc.ClaimTokens(ingesterID, i.id)
})
}

func (i *Ingester) updateTokensFor(ingesterID string, updater func(*ring.Desc) []uint32) error {
err := make(chan error)

i.actorChan <- func() {
Expand All @@ -94,7 +107,7 @@ func (i *Ingester) ClaimTokensFor(ingesterID string) error {
return nil, false, fmt.Errorf("Cannot claim tokens in an empty ring")
}

tokens = ringDesc.ClaimTokens(ingesterID, i.id)
tokens = updater(ringDesc)
return ringDesc, true, nil
}

Expand Down Expand Up @@ -185,9 +198,7 @@ loop:
break loop
}
}

// Mark ourselved as Leaving so no more samples are send to us.
i.changeState(ring.LEAVING)
i.changeState(ring.PREPARING_TO_LEAVE)

// Do the transferring / flushing on a background goroutine so we can continue
// to heartbeat to consul.
Expand Down Expand Up @@ -305,14 +316,16 @@ func (i *Ingester) updateConsul() error {
}

// changeState updates consul with state transitions for us. NB this must be
// called from loop()! Use ChangeState for calls from outside of loop().
// called from loop()!
// Use ChangeState for calls from outside of loop() (unless the loop has shut down)
func (i *Ingester) changeState(state ring.IngesterState) error {
// Only the following state transitions can be triggered externally
if !((i.state == ring.PENDING && state == ring.JOINING) || // triggered by TransferChunks at the beginning
(i.state == ring.JOINING && state == ring.PENDING) || // triggered by TransferChunks on failure
(i.state == ring.JOINING && state == ring.ACTIVE) || // triggered by TransferChunks on success
(i.state == ring.PENDING && state == ring.ACTIVE) || // triggered by autoJoin
(i.state == ring.ACTIVE && state == ring.LEAVING)) { // triggered by shutdown
(i.state == ring.ACTIVE && state == ring.PREPARING_TO_LEAVE) || // triggered by shutdown
(i.state == ring.PREPARING_TO_LEAVE && state == ring.LEAVING)) { // triggered by shutdown
return fmt.Errorf("Changing ingester state from %v -> %v is disallowed", i.state, state)
}

Expand All @@ -330,6 +343,10 @@ func (i *Ingester) processShutdown() {
flushRequired = false
}
}
if i.state != ring.LEAVING {
// Mark ourselved as Leaving so no more samples are send to us.
i.changeState(ring.LEAVING)
}

if flushRequired {
i.flushAllChunks()
Expand Down Expand Up @@ -398,6 +415,12 @@ func (i *Ingester) transferChunks() error {
}

sentChunks.Add(float64(len(chunks)))
if i.state != ring.LEAVING {
// Mark ourselved as Leaving so no more samples are send to us.
// We wait until we have sent the first item through the stream, so that the remote
// side has a chance to mark all the tokens for transfer.
i.changeState(ring.LEAVING)
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/ingester/ingester_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package ingester

import (
"io"
"os"
"reflect"
"runtime"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -27,6 +29,10 @@ const (
aLongTime = 60 * time.Second
)

func init() {
util.Logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
}

func defaultIngesterTestConfig() Config {
consul := ring.NewInMemoryKVClient()
return Config{
Expand Down Expand Up @@ -124,6 +130,20 @@ func TestIngesterTransfer(t *testing.T) {
}, nil
}

// Add a sample to check that they are dequeued correctly
ing2.joiningSampleQueue = []userSamples{
{
userID: userID,
samples: []model.Sample{
{
Metric: m,
Value: model.SampleValue(798),
Timestamp: model.TimeFromUnix(124),
},
},
},
}

// Now stop the first ingester
ing1.Shutdown()

Expand All @@ -145,6 +165,10 @@ func TestIngesterTransfer(t *testing.T) {
Value: 456,
TimestampMs: 123000,
},
{
Value: 798,
TimestampMs: 124000,
},
},
},
},
Expand Down
16 changes: 15 additions & 1 deletion pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,27 @@ func (d *Desc) RemoveIngester(id string) {
d.Tokens = output
}

// PreClaimTokens marks all the tokens from one ingester in preparation for transferring them to
// another, returning the modified tokens.
func (d *Desc) PreClaimTokens(from, to string) []uint32 {
var result []uint32
for i := 0; i < len(d.Tokens); i++ {
if d.Tokens[i].Ingester == from {
d.Tokens[i].NextIngester = to
result = append(result, d.Tokens[i].Token)
}
}
return result
}

// ClaimTokens transfers all the tokens from one ingester to another,
// returning the claimed token.
// returning the claimed tokens.
func (d *Desc) ClaimTokens(from, to string) []uint32 {
var result []uint32
for i := 0; i < len(d.Tokens); i++ {
if d.Tokens[i].Ingester == from {
d.Tokens[i].Ingester = to
d.Tokens[i].NextIngester = ""
result = append(result, d.Tokens[i].Token)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) (

// IsHealthy checks whether an ingester appears to be alive and heartbeating
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool {
if op == Write && ingester.State != ACTIVE {
if op == Write && !(ingester.State == ACTIVE || ingester.State == PREPARING_TO_LEAVE || ingester.State == JOINING) {
return false
} else if op == Read && ingester.State == JOINING {
return false
Expand Down
23 changes: 15 additions & 8 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,22 @@ func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) {
distinctHosts[token.Ingester] = struct{}{}
ingester := r.ringDesc.Ingesters[token.Ingester]

// We do not want to Write to Ingesters that are not ACTIVE, but we do want
// to write the extra replica somewhere. So we increase the size of the set
// of replicas for the key. This means we have to also increase the
// size of the replica set for read, but we can read from Leaving ingesters,
// so don't skip it in this case.
// We want to only Write to Ingesters that are ACTIVE (or PREPARING_TO_LEAVE), however
// we still write to the same number of replicas. So we increase the size of the set of
// replicas for the key.
// To match this, we have to also increase the size of the replica set for read, but we
// can still read from ingesters which are PREPARING_TO_LEAVE or LEAVING so don't skip it
// in this case.
// NB dead ingester will be filtered later (by replication_strategy.go).
if op == Write && ingester.State != ACTIVE {
n++
} else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) {
if op == Write && !(ingester.State == ACTIVE || ingester.State == PREPARING_TO_LEAVE) {
// If a token is being transferred to a new ingester, we can send writes to that ingester instead
// where they will be queued until the transfer is complete
if nextIngester := r.ringDesc.Ingesters[token.NextIngester]; nextIngester != nil && nextIngester.State == JOINING {
ingester = nextIngester
} else {
n++
}
} else if op == Read && !(ingester.State == ACTIVE || ingester.State == PREPARING_TO_LEAVE || ingester.State == LEAVING) {
n++
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/ring/ring.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,36 @@ message IngesterDesc {
message TokenDesc {
uint32 token = 1;
string ingester = 2;
string nextIngester = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this extra state redundant? Could this be detected uniquely by a token being owned by a "PENDING" ingester?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the transfer is underway the tokens are still owned by the leaving ingester. The ingester recieving the chunks is in JOINING state.

}

enum IngesterState {
// ACTIVE: The ingester under normal operation
// Will be sent read and write requests
ACTIVE = 0;

// LEAVING: End state for ingesters shutting down.
// The ingester is shutting down and may be either:
// - transferring chunks to another joining ingester, or
// - flushing all in-memory chunks to persistent storage
// Will only be sent read requests.
LEAVING = 1;

// PENDING: Start state for a new ingester.
// The ingester is waiting to join the ring, it will either:
// - receive an incoming chunk transfer request from a leaving ingester (-> JOINING)
// - after a timeout, create new tokens and join the ring (-> ACTIVE)
// Will not be used for reads or writes.
PENDING = 2;

// JOINING: The injester was recently waiting to join the ring (PENDING ->) and is now
// handling an incoming chunk transfer request from a leaving ingester and will become active
// when transfer completes (-> ACTIVE).
// Will be sent write requests for the tokens it is receiving from the leaving ingester
JOINING = 3;

// PREPARING_TO_LEAVE: The ingester is about to shut down, and is looking for a replacement
// to receive its in-memory chunks, and will soon leave the ring (-> LEAVING).
// Will only be sent write requests
PREPARING_TO_LEAVE = 4;
}