Skip to content

Commit a99bf41

Browse files
authored
Merge pull request #92 from weaveworks/readiness
Add readiness probe
2 parents 75255a0 + 086dd4c commit a99bf41

File tree

4 files changed

+49
-6
lines changed

4 files changed

+49
-6
lines changed

cmd/cortex/main.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,18 @@ func main() {
107107
log.Fatalf("Error initializing Consul client: %v", err)
108108
}
109109
consul = ring.PrefixClient(consul, cfg.consulPrefix)
110+
r := ring.New(consul, cfg.distributorConfig.HeartbeatTimeout)
111+
defer r.Stop()
110112

111113
switch cfg.mode {
112114
case modeDistributor:
113-
ring := ring.New(consul, cfg.distributorConfig.HeartbeatTimeout)
114-
cfg.distributorConfig.Ring = ring
115+
cfg.distributorConfig.Ring = r
115116
cfg.distributorConfig.ClientFactory = func(address string) (*cortex.IngesterClient, error) {
116117
return cortex.NewIngesterClient(address, cfg.remoteTimeout)
117118
}
118-
defer ring.Stop()
119119
setupDistributor(cfg.distributorConfig, chunkStore, cfg.logSuccess)
120120
case modeIngester:
121+
cfg.ingesterConfig.Ring = r
121122
registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.numTokens)
122123
if err != nil {
123124
// This only happens for errors in configuration & set-up, not for
@@ -267,6 +268,7 @@ func setupIngester(
267268
http.Handle("/query", instrument(logSuccess, cortex.QueryHandler(ingester)))
268269
http.Handle("/label_values", instrument(logSuccess, cortex.LabelValuesHandler(ingester)))
269270
http.Handle("/user_stats", instrument(logSuccess, cortex.IngesterUserStatsHandler(ingester.UserStats)))
271+
http.Handle("/ready", instrument(logSuccess, cortex.IngesterReadinessHandler(ingester)))
270272
return ingester
271273
}
272274

ingester/ingester.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import (
1010
"github.com/prometheus/common/log"
1111
"github.com/prometheus/common/model"
1212
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"
13-
cortex "github.com/weaveworks/cortex/chunk"
14-
"github.com/weaveworks/cortex/user"
13+
"github.com/prometheus/prometheus/storage/metric"
1514
"golang.org/x/net/context"
1615

17-
"github.com/prometheus/prometheus/storage/metric"
16+
cortex "github.com/weaveworks/cortex/chunk"
17+
"github.com/weaveworks/cortex/ring"
18+
"github.com/weaveworks/cortex/user"
1819
)
1920

2021
const (
@@ -78,6 +79,7 @@ type Config struct {
7879
FlushCheckPeriod time.Duration
7980
MaxChunkAge time.Duration
8081
RateUpdatePeriod time.Duration
82+
Ring *ring.Ring
8183
}
8284

8385
// UserStats models ingestion statistics for one user.
@@ -154,6 +156,12 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
154156
return i, nil
155157
}
156158

159+
// Ready is used to indicate to k8s when the ingesters are ready for
160+
// the addition / removal of another ingester.
161+
func (i *Ingester) Ready() bool {
162+
return i.cfg.Ring.Ready()
163+
}
164+
157165
func (i *Ingester) getStateFor(ctx context.Context) (*userState, error) {
158166
userID, err := user.GetID(ctx)
159167
if err != nil {

ring/ring.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,26 @@ func (r *Ring) GetAll() []IngesterDesc {
184184
return ingesters
185185
}
186186

187+
// Ready is true when all ingesters are active and healthy.
188+
func (r *Ring) Ready() bool {
189+
r.mtx.RLock()
190+
defer r.mtx.RUnlock()
191+
192+
for _, ingester := range r.ringDesc.Ingesters {
193+
if time.Now().Sub(ingester.Timestamp) > r.heartbeatTimeout {
194+
return false
195+
}
196+
}
197+
198+
for _, token := range r.ringDesc.Tokens {
199+
if token.State != Active {
200+
return false
201+
}
202+
}
203+
204+
return len(r.ringDesc.Tokens) > 0
205+
}
206+
187207
func (r *Ring) search(key uint32) int {
188208
i := sort.Search(len(r.ringDesc.Tokens), func(x int) bool {
189209
return r.ringDesc.Tokens[x].Token > key

server.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,19 @@ func IngesterUserStatsHandler(statsFn func(context.Context) (*ingester.UserStats
260260
})
261261
}
262262

263+
// IngesterReadinessHandler returns 204 when the ingester is ready,
264+
// 500 otherwise. Its use by kubernetes to indicate if the ingester
265+
// pool is ready to have ingesters added / removed.
266+
func IngesterReadinessHandler(i *ingester.Ingester) http.Handler {
267+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
268+
if i.Ready() {
269+
w.WriteHeader(http.StatusNoContent)
270+
} else {
271+
w.WriteHeader(http.StatusInternalServerError)
272+
}
273+
})
274+
}
275+
263276
// DistributorUserStatsHandler handles user stats to the Distributor.
264277
func DistributorUserStatsHandler(statsFn func(context.Context) (*ingester.UserStats, error)) http.Handler {
265278
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

0 commit comments

Comments
 (0)