Skip to content

Unify the replication calculation between query and push. #681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 27, 2018
129 changes: 53 additions & 76 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type Config struct {
BillingConfig billing.Config
IngesterClientConfig ingester_client.Config

ReplicationFactor int
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
Expand All @@ -88,7 +87,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
cfg.BillingConfig.RegisterFlags(f)
cfg.IngesterClientConfig.RegisterFlags(f)
flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
Expand All @@ -98,9 +96,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// New constructs a new Distributor
func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
if 0 > cfg.ReplicationFactor {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = ingester_client.MakeIngesterClient
}
Expand Down Expand Up @@ -189,7 +184,13 @@ func (d *Distributor) Stop() {

func (d *Distributor) removeStaleIngesterClients() {
ingesters := map[string]struct{}{}
for _, ing := range d.ring.GetAll() {
replicationSet, err := d.ring.GetAll()
if err != nil {
level.Error(util.Logger).Log("msg", "error removing stale ingester clients", "err", err)
return
}

for _, ing := range replicationSet.Ingesters {
ingesters[ing.Addr] = struct{}{}
}

Expand Down Expand Up @@ -270,45 +271,17 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
return nil, errIngestionRateLimitExceeded
}

var ingesters [][]*ring.IngesterDesc
if err := instrument.TimeRequestHistogram(ctx, "Distributor.Push[ring-lookup]", d.sendDuration, func(context.Context) error {
var err error
ingesters, err = d.ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write)
if err != nil {
return err
}
return nil
}); err != nil {
replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
}

samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{}
for i := range samples {
// We need a response from a quorum of ingesters, which is n/2 + 1.
minSuccess := (len(ingesters[i]) / 2) + 1
samples[i].minSuccess = minSuccess
samples[i].maxFailures = len(ingesters[i]) - minSuccess

// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed ingesters
// will cause the whole write to fail.
liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i]))
for _, ingester := range ingesters[i] {
if d.ring.IsHealthy(ingester) {
liveIngesters = append(liveIngesters, ingester)
}
}

// This is just a shortcut - if there are not minSuccess available ingesters,
// after filtering out dead ones, don't even bother trying.
if len(liveIngesters) < minSuccess {
return nil, fmt.Errorf("wanted at least %d live ingesters to process write, had %d",
minSuccess, len(liveIngesters))
}

for _, liveIngester := range liveIngesters {
sampleForIngester := samplesByIngester[liveIngester]
samplesByIngester[liveIngester] = append(sampleForIngester, &samples[i])
for i, replicationSet := range replicationSets {
samples[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
samples[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Ingesters {
samplesByIngester[ingester] = append(samplesByIngester[ingester], &samples[i])
}
}

Expand Down Expand Up @@ -425,58 +398,53 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
}

// Get ingesters by metricName if one exists, otherwise get all ingesters
var ingesters []*ring.IngesterDesc
var replicationSet ring.ReplicationSet
if ok && metricNameMatcher.Type == labels.MatchEqual {
ingesters, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), d.cfg.ReplicationFactor, ring.Read)
if err != nil {
return promql.ErrStorage(err)
}
replicationSet, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), ring.Read)
} else {
ingesters = d.ring.GetAll()
replicationSet, err = d.ring.GetAll()
}
if err != nil {
return promql.ErrStorage(err)
}

matrix, err = d.queryIngesters(ctx, ingesters, d.cfg.ReplicationFactor, req)
matrix, err = d.queryIngesters(ctx, replicationSet, req)
return promql.ErrStorage(err)
})
return matrix, err
}

// Query implements Querier.
func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, replicationFactor int, req *client.QueryRequest) (model.Matrix, error) {
// We need a response from a quorum of ingesters, where maxErrs is n/2, where n is the replicationFactor
maxErrs := replicationFactor / 2
minSuccess := len(ingesters) - maxErrs
if len(ingesters) < minSuccess {
return nil, fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess)
}

func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (model.Matrix, error) {
// Fetch samples from multiple ingesters
var numErrs int32
errReceived := make(chan error)
results := make(chan model.Matrix, len(ingesters))

for _, ing := range ingesters {
errs := make(chan error, len(replicationSet.Ingesters))
results := make(chan model.Matrix, len(replicationSet.Ingesters))
for _, ing := range replicationSet.Ingesters {
go func(ing *ring.IngesterDesc) {
result, err := d.queryIngester(ctx, ing, req)
if err != nil {
if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) {
errReceived <- err
}
errs <- err
} else {
results <- result
}
}(ing)
}

// Only wait for minSuccess ingesters (or an error), and accumulate the samples
// Only wait for minSuccessful responses (or maxErrors), and accumulate the samples
// by fingerprint, merging them into any existing samples.
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
for i := 0; i < minSuccess; i++ {
minSuccess := len(replicationSet.Ingesters) - replicationSet.MaxErrors
var numErrs, numSuccess int
for numSuccess < minSuccess {
select {
case err := <-errReceived:
return nil, err
case err := <-errs:
numErrs++
if numErrs > replicationSet.MaxErrors {
return nil, err
}

case result := <-results:
numSuccess++
for _, ss := range result {
fp := ss.Metric.Fingerprint()
mss, ok := fpToSampleStream[fp]
Expand Down Expand Up @@ -516,9 +484,13 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc,

// forAllIngesters runs f, in parallel, for all ingesters
func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
replicationSet, err := d.ring.GetAll()
if err != nil {
return nil, err
}

resps, errs := make(chan interface{}), make(chan error)
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
for _, ingester := range replicationSet.Ingesters {
go func(ingester *ring.IngesterDesc) {
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
Expand All @@ -537,17 +509,19 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}

var lastErr error
result, numErrs := []interface{}{}, 0
for range ingesters {
for range replicationSet.Ingesters {
select {
case resp := <-resps:
result = append(result, resp)
case lastErr = <-errs:
numErrs++
}
}
if numErrs > d.cfg.ReplicationFactor/2 {

if numErrs > replicationSet.MaxErrors {
return nil, lastErr
}

return result, nil
}

Expand Down Expand Up @@ -624,8 +598,8 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
totalStats.NumSeries += resp.(*client.UserStatsResponse).NumSeries
}

totalStats.IngestionRate /= float64(d.cfg.ReplicationFactor)
totalStats.NumSeries /= uint64(d.cfg.ReplicationFactor)
totalStats.IngestionRate /= float64(d.ring.ReplicationFactor())
totalStats.NumSeries /= uint64(d.ring.ReplicationFactor())

return totalStats, nil
}
Expand All @@ -645,8 +619,11 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
req := &client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
// Not using d.forAllIngesters(), so we can fail after first error.
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
replicationSet, err := d.ring.GetAll()
if err != nil {
return nil, err
}
for _, ingester := range replicationSet.Ingesters {
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return nil, err
Expand Down
38 changes: 21 additions & 17 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,36 @@ import (
// mockRing doesn't do any consistent hashing, just returns same ingesters for every query.
type mockRing struct {
prometheus.Counter
ingesters []*ring.IngesterDesc
heartbeatTimeout time.Duration
ingesters []*ring.IngesterDesc
}

func (r mockRing) Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) {
return r.ingesters[:n], nil
func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters[:3],
MaxErrors: 1,
}, nil
}

func (r mockRing) BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) {
result := [][]*ring.IngesterDesc{}
func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
result := []ring.ReplicationSet{}
for i := 0; i < len(keys); i++ {
result = append(result, r.ingesters[:n])
result = append(result, ring.ReplicationSet{
Ingesters: r.ingesters[:3],
MaxErrors: 1,
})
}
return result, nil
}

func (r mockRing) GetAll() []*ring.IngesterDesc {
return r.ingesters
func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
}, nil
}

func (r mockRing) IsHealthy(ingester *ring.IngesterDesc) bool {
return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout
func (r mockRing) ReplicationFactor() int {
return 3
}

type mockIngester struct {
Expand Down Expand Up @@ -159,12 +167,10 @@ func TestDistributorPush(t *testing.T) {
Counter: prometheus.NewCounter(prometheus.CounterOpts{
Name: "foo",
}),
ingesters: ingesterDescs,
heartbeatTimeout: 1 * time.Minute,
ingesters: ingesterDescs,
}

d, err := New(Config{
ReplicationFactor: 3,
RemoteTimeout: 1 * time.Minute,
ClientCleanupPeriod: 1 * time.Minute,
IngestionRateLimit: 10000,
Expand Down Expand Up @@ -299,12 +305,10 @@ func TestDistributorQuery(t *testing.T) {
Counter: prometheus.NewCounter(prometheus.CounterOpts{
Name: "foo",
}),
ingesters: ingesterDescs,
heartbeatTimeout: 1 * time.Minute,
ingesters: ingesterDescs,
}

d, err := New(Config{
ReplicationFactor: 3,
RemoteTimeout: 1 * time.Minute,
ClientCleanupPeriod: 1 * time.Minute,
IngestionRateLimit: 10000,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ing := r.ringDesc.Ingesters[id]
timestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(ing) {
if !r.IsHealthy(ing, Reporting) {
state = unhealthy
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/ring/replication_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ring

import (
"fmt"
"time"
)

// replicationStrategy decides, given the set of ingesters eligible for a key,
// which ingesters you will try and write to and how many failures you will
// tolerate.
// - Filters out dead ingesters so the one doesn't even try to write to them.
// - Checks there is enough ingesters for an operation to succeed.
func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) (
liveIngesters []*IngesterDesc, maxFailure int, err error,
) {
// We need a response from a quorum of ingesters, which is n/2 + 1. In the
// case of a node joining/leaving, the actual replica set might be bigger
// than the replication factor, so use the bigger or the two.
replicationFactor := r.cfg.ReplicationFactor
if len(ingesters) > replicationFactor {
replicationFactor = len(ingesters)
}
minSuccess := (replicationFactor / 2) + 1
maxFailure = replicationFactor - minSuccess

// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed ingesters
// will cause the whole write to fail.
liveIngesters = make([]*IngesterDesc, 0, len(ingesters))
for _, ingester := range ingesters {
if r.IsHealthy(ingester, op) {
liveIngesters = append(liveIngesters, ingester)
} else {
maxFailure--
}
}

// This is just a shortcut - if there are not minSuccess available ingesters,
// after filtering out dead ones, don't even bother trying.
if maxFailure < 0 || len(liveIngesters) < minSuccess {
err = fmt.Errorf("at least %d live ingesters required, could only find %d",
minSuccess, len(liveIngesters))
return
}

return
}

// IsHealthy checks whether an ingester appears to be alive and heartbeating
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool {
if op == Write && ingester.State != ACTIVE {
return false
} else if op == Read && ingester.State == JOINING {
return false
}
return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.cfg.HeartbeatTimeout
}

// ReplicationFactor of the ring.
func (r *Ring) ReplicationFactor() int {
return r.cfg.ReplicationFactor
}
Loading