Skip to content

Commit 094681c

Browse files
authored
Merge pull request #46 from weaveworks/36-improve-logging
Fix up logging and use metrics instead.
2 parents 31bb002 + 44ba361 commit 094681c

File tree

4 files changed

+56
-21
lines changed

4 files changed

+56
-21
lines changed

cmd/prism/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func main() {
124124
}
125125
case modeIngester:
126126
registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.numTokens)
127+
prometheus.MustRegister(registration)
127128
if err != nil {
128129
// This only happens for errors in configuration & set-up, not for
129130
// network errors.

distributor.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type Distributor struct {
4545
clients map[string]*IngesterClient
4646

4747
queryDuration *prometheus.HistogramVec
48-
consulUpdates prometheus.Counter
4948
receivedSamples prometheus.Counter
5049
sendDuration *prometheus.HistogramVec
5150
}
@@ -80,11 +79,6 @@ func NewDistributor(cfg DistributorConfig) *Distributor {
8079
Help: "Time spent executing expression queries.",
8180
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
8281
}, []string{"method", "status_code"}),
83-
consulUpdates: prometheus.NewCounter(prometheus.CounterOpts{
84-
Namespace: "prometheus",
85-
Name: "distributor_consul_updates_total",
86-
Help: "The total number of received Consul updates.",
87-
}),
8882
receivedSamples: prometheus.NewCounter(prometheus.CounterOpts{
8983
Namespace: "prometheus",
9084
Name: "distributor_received_samples_total",
@@ -256,7 +250,6 @@ func (*Distributor) NeedsThrottling(_ context.Context) bool {
256250
// Describe implements prometheus.Collector.
257251
func (d *Distributor) Describe(ch chan<- *prometheus.Desc) {
258252
d.queryDuration.Describe(ch)
259-
ch <- d.consulUpdates.Desc()
260253
ch <- d.receivedSamples.Desc()
261254
d.sendDuration.Describe(ch)
262255
d.ring.Describe(ch)
@@ -266,7 +259,6 @@ func (d *Distributor) Describe(ch chan<- *prometheus.Desc) {
266259
// Collect implements prometheus.Collector.
267260
func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
268261
d.queryDuration.Collect(ch)
269-
ch <- d.consulUpdates
270262
ch <- d.receivedSamples
271263
d.sendDuration.Collect(ch)
272264
d.ring.Collect(ch)

ring/ingester_lifecycle.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/prometheus/client_golang/prometheus"
2728
"github.com/prometheus/common/log"
2829
)
2930

@@ -42,6 +43,8 @@ type IngesterRegistration struct {
4243
hostname string
4344
quit chan struct{}
4445
wait sync.WaitGroup
46+
47+
consulHeartbeats prometheus.Counter
4548
}
4649

4750
// RegisterIngester registers an ingester with Consul.
@@ -65,6 +68,11 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In
6568
// the distributors know where to connect.
6669
hostname: fmt.Sprintf("%s:%d", addr, listenPort),
6770
quit: make(chan struct{}),
71+
72+
consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{
73+
Name: "prism_ingester_consul_heartbeats_total",
74+
Help: "The total number of heartbeats sent to consul.",
75+
}),
6876
}
6977

7078
r.wait.Add(1)
@@ -78,9 +86,10 @@ func (r *IngesterRegistration) Unregister() {
7886
log.Info("Removing ingester from consul")
7987

8088
// closing r.quit triggers loop() to exit, which in turn will trigger
81-
// the removal of out tokens.
89+
// the removal of our tokens.
8290
close(r.quit)
8391
r.wait.Wait()
92+
log.Infof("Ingester removed from consul")
8493
}
8594

8695
func (r *IngesterRegistration) loop() {
@@ -147,7 +156,7 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
147156
for {
148157
select {
149158
case <-ticker.C:
150-
log.Infof("Heartbeating to consul...")
159+
r.consulHeartbeats.Inc()
151160
if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil {
152161
log.Errorf("Failed to write to consul, sleeping: %v", err)
153162
}
@@ -224,3 +233,13 @@ func getFirstAddressOf(name string) (string, error) {
224233

225234
return "", fmt.Errorf("No address found for %s", name)
226235
}
236+
237+
// Describe implements prometheus.Collector.
238+
func (r *IngesterRegistration) Describe(ch chan<- *prometheus.Desc) {
239+
ch <- r.consulHeartbeats.Desc()
240+
}
241+
242+
// Collect implements prometheus.Collector.
243+
func (r *IngesterRegistration) Collect(ch chan<- prometheus.Metric) {
244+
ch <- r.consulHeartbeats
245+
}

ring/ring.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,6 @@ func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
3434
// ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
3535
var ErrEmptyRing = errors.New("empty circle")
3636

37-
var ingestorOwnershipDesc = prometheus.NewDesc(
38-
"prometheus_distributor_ingester_ownership_percent",
39-
"The percent ownership of the ring by ingestor",
40-
[]string{"ingester"}, nil,
41-
)
42-
4337
// CoordinationStateClient is an interface to getting changes to the coordination
4438
// state. Should allow us to swap out Consul for something else (mesh?) later.
4539
type CoordinationStateClient interface {
@@ -53,6 +47,10 @@ type Ring struct {
5347

5448
mtx sync.RWMutex
5549
ringDesc Desc
50+
51+
ingesterOwnershipDesc *prometheus.Desc
52+
ingesterTotalDesc *prometheus.Desc
53+
tokensTotalDesc *prometheus.Desc
5654
}
5755

5856
// New creates a new Ring
@@ -61,6 +59,21 @@ func New(client CoordinationStateClient) *Ring {
6159
client: client,
6260
quit: make(chan struct{}),
6361
done: make(chan struct{}),
62+
ingesterOwnershipDesc: prometheus.NewDesc(
63+
"prometheus_distributor_ingester_ownership_percent",
64+
"The percent ownership of the ring by ingester",
65+
[]string{"ingester"}, nil,
66+
),
67+
ingesterTotalDesc: prometheus.NewDesc(
68+
"prometheus_distributor_ingesters_total",
69+
"Number of ingesters in the ring",
70+
nil, nil,
71+
),
72+
tokensTotalDesc: prometheus.NewDesc(
73+
"prometheus_distributor_tokens_total",
74+
"Number of tokens in the ring",
75+
nil, nil,
76+
),
6477
}
6578
go r.loop()
6679
return r
@@ -81,9 +94,6 @@ func (r *Ring) loop() {
8194
}
8295

8396
ringDesc := value.(*Desc)
84-
log.Infof("Got update to ring - %d ingesters, %d tokens",
85-
len(ringDesc.Ingesters), len(ringDesc.Tokens))
86-
8797
r.mtx.Lock()
8898
defer r.mtx.Unlock()
8999
r.ringDesc = *ringDesc
@@ -126,7 +136,9 @@ func (r *Ring) search(key uint32) int {
126136

127137
// Describe implements prometheus.Collector.
128138
func (r *Ring) Describe(ch chan<- *prometheus.Desc) {
129-
ch <- ingestorOwnershipDesc
139+
ch <- r.ingesterOwnershipDesc
140+
ch <- r.ingesterTotalDesc
141+
ch <- r.tokensTotalDesc
130142
}
131143

132144
// Collect implements prometheus.Collector.
@@ -147,10 +159,21 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) {
147159

148160
for id, totalOwned := range owned {
149161
ch <- prometheus.MustNewConstMetric(
150-
ingestorOwnershipDesc,
162+
r.ingesterOwnershipDesc,
151163
prometheus.GaugeValue,
152164
float64(totalOwned)/float64(math.MaxUint32),
153165
id,
154166
)
155167
}
168+
169+
ch <- prometheus.MustNewConstMetric(
170+
r.ingesterTotalDesc,
171+
prometheus.GaugeValue,
172+
float64(len(r.ringDesc.Ingesters)),
173+
)
174+
ch <- prometheus.MustNewConstMetric(
175+
r.tokensTotalDesc,
176+
prometheus.GaugeValue,
177+
float64(len(r.ringDesc.Tokens)),
178+
)
156179
}

0 commit comments

Comments
 (0)