Skip to content
Merged
129 changes: 53 additions & 76 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type Config struct {
BillingConfig billing.Config
IngesterClientConfig ingester_client.Config

ReplicationFactor int
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
Expand All @@ -88,7 +87,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
cfg.BillingConfig.RegisterFlags(f)
cfg.IngesterClientConfig.RegisterFlags(f)
flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
Expand All @@ -98,9 +96,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// New constructs a new Distributor
func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
if 0 > cfg.ReplicationFactor {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = ingester_client.MakeIngesterClient
}
Expand Down Expand Up @@ -189,7 +184,13 @@ func (d *Distributor) Stop() {

func (d *Distributor) removeStaleIngesterClients() {
ingesters := map[string]struct{}{}
for _, ing := range d.ring.GetAll() {
replicationSet, err := d.ring.GetAll()
if err != nil {
level.Error(util.Logger).Log("msg", "error removing stale ingester clients", "err", err)
return
}

for _, ing := range replicationSet.Ingesters {
ingesters[ing.Addr] = struct{}{}
}

Expand Down Expand Up @@ -270,45 +271,17 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
return nil, errIngestionRateLimitExceeded
}

var ingesters [][]*ring.IngesterDesc
if err := instrument.TimeRequestHistogram(ctx, "Distributor.Push[ring-lookup]", d.sendDuration, func(context.Context) error {
var err error
ingesters, err = d.ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write)
if err != nil {
return err
}
return nil
}); err != nil {
replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
}

samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{}
for i := range samples {
// We need a response from a quorum of ingesters, which is n/2 + 1.
minSuccess := (len(ingesters[i]) / 2) + 1
samples[i].minSuccess = minSuccess
samples[i].maxFailures = len(ingesters[i]) - minSuccess

// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed ingesters
// will cause the whole write to fail.
liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i]))
for _, ingester := range ingesters[i] {
if d.ring.IsHealthy(ingester) {
liveIngesters = append(liveIngesters, ingester)
}
}

// This is just a shortcut - if there are not minSuccess available ingesters,
// after filtering out dead ones, don't even bother trying.
if len(liveIngesters) < minSuccess {
return nil, fmt.Errorf("wanted at least %d live ingesters to process write, had %d",
minSuccess, len(liveIngesters))
}

for _, liveIngester := range liveIngesters {
sampleForIngester := samplesByIngester[liveIngester]
samplesByIngester[liveIngester] = append(sampleForIngester, &samples[i])
for i, replicationSet := range replicationSets {
samples[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
samples[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Ingesters {
samplesByIngester[ingester] = append(samplesByIngester[ingester], &samples[i])
}
}

Expand Down Expand Up @@ -425,58 +398,53 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
}

// Get ingesters by metricName if one exists, otherwise get all ingesters
var ingesters []*ring.IngesterDesc
var replicationSet ring.ReplicationSet
if ok && metricNameMatcher.Type == labels.MatchEqual {
ingesters, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), d.cfg.ReplicationFactor, ring.Read)
if err != nil {
return promql.ErrStorage(err)
}
replicationSet, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), ring.Read)
} else {
ingesters = d.ring.GetAll()
replicationSet, err = d.ring.GetAll()
}
if err != nil {
return promql.ErrStorage(err)
}

matrix, err = d.queryIngesters(ctx, ingesters, d.cfg.ReplicationFactor, req)
matrix, err = d.queryIngesters(ctx, replicationSet, req)
return promql.ErrStorage(err)
})
return matrix, err
}

// Query implements Querier.
func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, replicationFactor int, req *client.QueryRequest) (model.Matrix, error) {
// We need a response from a quorum of ingesters, where maxErrs is n/2, where n is the replicationFactor
maxErrs := replicationFactor / 2
minSuccess := len(ingesters) - maxErrs
if len(ingesters) < minSuccess {
return nil, fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess)
}

func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (model.Matrix, error) {
// Fetch samples from multiple ingesters
var numErrs int32
errReceived := make(chan error)
results := make(chan model.Matrix, len(ingesters))

for _, ing := range ingesters {
errs := make(chan error, len(replicationSet.Ingesters))
results := make(chan model.Matrix, len(replicationSet.Ingesters))
for _, ing := range replicationSet.Ingesters {
go func(ing *ring.IngesterDesc) {
result, err := d.queryIngester(ctx, ing, req)
if err != nil {
if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) {
errReceived <- err
}
errs <- err
} else {
results <- result
}
}(ing)
}

// Only wait for minSuccess ingesters (or an error), and accumulate the samples
// Only wait for minSuccessful responses (or maxErrors), and accumulate the samples
// by fingerprint, merging them into any existing samples.
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
for i := 0; i < minSuccess; i++ {
minSuccess := len(replicationSet.Ingesters) - replicationSet.MaxErrors
var numErrs, numSuccess int
for numSuccess < minSuccess {
select {
case err := <-errReceived:
return nil, err
case err := <-errs:
numErrs++
if numErrs > replicationSet.MaxErrors {
return nil, err
}

case result := <-results:
numSuccess++
for _, ss := range result {
fp := ss.Metric.Fingerprint()
mss, ok := fpToSampleStream[fp]
Expand Down Expand Up @@ -516,9 +484,13 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc,

// forAllIngesters runs f, in parallel, for all ingesters
func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
replicationSet, err := d.ring.GetAll()
if err != nil {
return nil, err
}

resps, errs := make(chan interface{}), make(chan error)
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
for _, ingester := range replicationSet.Ingesters {
go func(ingester *ring.IngesterDesc) {
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
Expand All @@ -537,17 +509,19 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}

var lastErr error
result, numErrs := []interface{}{}, 0
for range ingesters {
for range replicationSet.Ingesters {
select {
case resp := <-resps:
result = append(result, resp)
case lastErr = <-errs:
numErrs++
}
}
if numErrs > d.cfg.ReplicationFactor/2 {

if numErrs > replicationSet.MaxErrors {
return nil, lastErr
}

return result, nil
}

Expand Down Expand Up @@ -624,8 +598,8 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
totalStats.NumSeries += resp.(*client.UserStatsResponse).NumSeries
}

totalStats.IngestionRate /= float64(d.cfg.ReplicationFactor)
totalStats.NumSeries /= uint64(d.cfg.ReplicationFactor)
totalStats.IngestionRate /= float64(d.ring.ReplicationFactor())
totalStats.NumSeries /= uint64(d.ring.ReplicationFactor())

return totalStats, nil
}
Expand All @@ -645,8 +619,11 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
req := &client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
// Not using d.forAllIngesters(), so we can fail after first error.
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
replicationSet, err := d.ring.GetAll()
if err != nil {
return nil, err
}
for _, ingester := range replicationSet.Ingesters {
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return nil, err
Expand Down
38 changes: 21 additions & 17 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,36 @@ import (
// mockRing doesn't do any consistent hashing, just returns same ingesters for every query.
type mockRing struct {
prometheus.Counter
ingesters []*ring.IngesterDesc
heartbeatTimeout time.Duration
ingesters []*ring.IngesterDesc
}

func (r mockRing) Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) {
return r.ingesters[:n], nil
func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters[:3],
MaxErrors: 1,
}, nil
}

func (r mockRing) BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) {
result := [][]*ring.IngesterDesc{}
func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
result := []ring.ReplicationSet{}
for i := 0; i < len(keys); i++ {
result = append(result, r.ingesters[:n])
result = append(result, ring.ReplicationSet{
Ingesters: r.ingesters[:3],
MaxErrors: 1,
})
}
return result, nil
}

func (r mockRing) GetAll() []*ring.IngesterDesc {
return r.ingesters
func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
}, nil
}

func (r mockRing) IsHealthy(ingester *ring.IngesterDesc) bool {
return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout
func (r mockRing) ReplicationFactor() int {
return 3
}

type mockIngester struct {
Expand Down Expand Up @@ -159,12 +167,10 @@ func TestDistributorPush(t *testing.T) {
Counter: prometheus.NewCounter(prometheus.CounterOpts{
Name: "foo",
}),
ingesters: ingesterDescs,
heartbeatTimeout: 1 * time.Minute,
ingesters: ingesterDescs,
}

d, err := New(Config{
ReplicationFactor: 3,
RemoteTimeout: 1 * time.Minute,
ClientCleanupPeriod: 1 * time.Minute,
IngestionRateLimit: 10000,
Expand Down Expand Up @@ -299,12 +305,10 @@ func TestDistributorQuery(t *testing.T) {
Counter: prometheus.NewCounter(prometheus.CounterOpts{
Name: "foo",
}),
ingesters: ingesterDescs,
heartbeatTimeout: 1 * time.Minute,
ingesters: ingesterDescs,
}

d, err := New(Config{
ReplicationFactor: 3,
RemoteTimeout: 1 * time.Minute,
ClientCleanupPeriod: 1 * time.Minute,
IngestionRateLimit: 10000,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ing := r.ringDesc.Ingesters[id]
timestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(ing) {
if !r.IsHealthy(ing, Reporting) {
state = unhealthy
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/ring/replication_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ring

import (
"fmt"
"time"
)

// replicationStrategy decides, given the set of ingesters eligible for a key,
// which ingesters you will try and write to and how many failures you will
// tolerate.
// - Filters out dead ingesters so the one doesn't even try to write to them.
// - Checks there is enough ingesters for an operation to succeed.
func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) (
liveIngesters []*IngesterDesc, maxFailure int, err error,
) {
// We need a response from a quorum of ingesters, which is n/2 + 1. In the
// case of a node joining/leaving, the actual replica set might be bigger
// than the replication factor, so use the bigger or the two.
replicationFactor := r.cfg.ReplicationFactor
if len(ingesters) > replicationFactor {
replicationFactor = len(ingesters)
}
minSuccess := (replicationFactor / 2) + 1
maxFailure = replicationFactor - minSuccess

// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed ingesters
// will cause the whole write to fail.
liveIngesters = make([]*IngesterDesc, 0, len(ingesters))
for _, ingester := range ingesters {
if r.IsHealthy(ingester, op) {
liveIngesters = append(liveIngesters, ingester)
} else {
maxFailure--
}
}

// This is just a shortcut - if there are not minSuccess available ingesters,
// after filtering out dead ones, don't even bother trying.
if maxFailure < 0 || len(liveIngesters) < minSuccess {
err = fmt.Errorf("at least %d live ingesters required, could only find %d",
minSuccess, len(liveIngesters))
return
}

return
}

// IsHealthy checks whether an ingester appears to be alive and heartbeating
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool {
if op == Write && ingester.State != ACTIVE {
return false
} else if op == Read && ingester.State == JOINING {
return false
}
return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.cfg.HeartbeatTimeout
}

// ReplicationFactor of the ring.
func (r *Ring) ReplicationFactor() int {
return r.cfg.ReplicationFactor
}
Loading