Skip to content

Commit 1132a52

Browse files
authored
Merge pull request #1039 from grafana/trace-test-exporter
Instrument test exporter for OpenTracing.
2 parents ed37310 + 15820f5 commit 1132a52

File tree

9 files changed

+118
-63
lines changed

9 files changed

+118
-63
lines changed

Gopkg.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/test-exporter/main.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ package main
33
import (
44
"flag"
55
"math"
6+
"os"
67
"time"
78

9+
"github.com/go-kit/kit/log/level"
810
"github.com/prometheus/client_golang/prometheus"
9-
log "github.com/sirupsen/logrus"
10-
"github.com/weaveworks/common/server"
1111

1212
"github.com/cortexproject/cortex/pkg/querier/correctness"
13+
"github.com/cortexproject/cortex/pkg/util"
14+
"github.com/weaveworks/common/server"
15+
"github.com/weaveworks/common/tracing"
1316
)
1417

1518
var (
@@ -25,15 +28,23 @@ func main() {
2528
runnerConfig.RegisterFlags(flag.CommandLine)
2629
flag.Parse()
2730

31+
// Setting the environment variable JAEGER_AGENT_HOST enables tracing
32+
trace := tracing.NewFromEnv("test-exporter")
33+
defer trace.Close()
34+
35+
util.InitLogger(&serverConfig)
36+
2837
server, err := server.New(serverConfig)
2938
if err != nil {
30-
log.Fatal(err)
39+
level.Error(util.Logger).Log("msg", "error initializing server", "err", err)
40+
os.Exit(1)
3141
}
3242
defer server.Shutdown()
3343

3444
runner, err := correctness.NewRunner(runnerConfig)
3545
if err != nil {
36-
log.Fatal(err)
46+
level.Error(util.Logger).Log("msg", "error initializing runner", "err", err)
47+
os.Exit(1)
3748
}
3849
defer runner.Stop()
3950

pkg/chunk/chunk_store.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cortexproject/cortex/pkg/chunk/cache"
1818
"github.com/cortexproject/cortex/pkg/util"
1919
"github.com/cortexproject/cortex/pkg/util/extract"
20+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
2021
"github.com/weaveworks/common/user"
2122
)
2223

@@ -165,7 +166,7 @@ func (c *store) calculateIndexEntries(userID string, from, through model.Time, c
165166

166167
// Get implements Store
167168
func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) {
168-
log, ctx := newSpanLogger(ctx, "ChunkStore.Get")
169+
log, ctx := spanlogger.New(ctx, "ChunkStore.Get")
169170
defer log.Span.Finish()
170171
level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers))
171172

@@ -188,7 +189,7 @@ func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers .
188189
}
189190

190191
func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time) (shortcut bool, err error) {
191-
log, ctx := newSpanLogger(ctx, "store.validateQuery")
192+
log, ctx := spanlogger.New(ctx, "store.validateQuery")
192193
defer log.Span.Finish()
193194

194195
now := model.Now()
@@ -221,7 +222,7 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod
221222
}
222223

223224
func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) {
224-
log, ctx := newSpanLogger(ctx, "ChunkStore.getMetricNameChunks")
225+
log, ctx := spanlogger.New(ctx, "ChunkStore.getMetricNameChunks")
225226
defer log.Finish()
226227
level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "matchers", len(allMatchers))
227228

@@ -254,7 +255,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, from, through model.Tim
254255
}
255256

256257
func (c *store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) {
257-
log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName")
258+
log, ctx := spanlogger.New(ctx, "ChunkStore.lookupChunksByMetricName")
258259
defer log.Finish()
259260

260261
userID, err := user.ExtractOrgID(ctx)

pkg/chunk/chunk_store_utils.go

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@ import (
44
"context"
55
"sync"
66

7-
"github.com/go-kit/kit/log"
87
"github.com/go-kit/kit/log/level"
9-
ot "github.com/opentracing/opentracing-go"
10-
otlog "github.com/opentracing/opentracing-go/log"
118
"github.com/prometheus/common/model"
129
"github.com/prometheus/prometheus/pkg/labels"
1310
"github.com/prometheus/prometheus/promql"
1411

1512
"github.com/cortexproject/cortex/pkg/chunk/cache"
1613
"github.com/cortexproject/cortex/pkg/util"
14+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
1715
)
1816

1917
const chunkDecodeParallelism = 16
@@ -45,34 +43,6 @@ outer:
4543
return filteredChunks
4644
}
4745

48-
// spanLogger unifies tracing and logging, to reduce repetition.
49-
type spanLogger struct {
50-
log.Logger
51-
ot.Span
52-
}
53-
54-
func newSpanLogger(ctx context.Context, method string, kvps ...interface{}) (*spanLogger, context.Context) {
55-
span, ctx := ot.StartSpanFromContext(ctx, method)
56-
logger := &spanLogger{
57-
Logger: log.With(util.WithContext(ctx, util.Logger), "method", method),
58-
Span: span,
59-
}
60-
if len(kvps) > 0 {
61-
logger.Log(kvps...)
62-
}
63-
return logger, ctx
64-
}
65-
66-
func (s *spanLogger) Log(kvps ...interface{}) error {
67-
s.Logger.Log(kvps...)
68-
fields, err := otlog.InterleavedKVToFields(kvps...)
69-
if err != nil {
70-
return err
71-
}
72-
s.Span.LogFields(fields...)
73-
return nil
74-
}
75-
7646
// Fetcher deals with fetching chunk contents from the cache/store,
7747
// and writing back any misses to the cache. Also responsible for decoding
7848
// chunks from the cache, in parallel.
@@ -139,7 +109,7 @@ func (c *Fetcher) worker() {
139109

140110
// FetchChunks fetchers a set of chunks from cache and store.
141111
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) {
142-
log, ctx := newSpanLogger(ctx, "ChunkStore.fetchChunks")
112+
log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks")
143113
defer log.Span.Finish()
144114

145115
// Now fetch the actual chunk data from Memcache / S3

pkg/chunk/series_store.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/cortexproject/cortex/pkg/chunk/cache"
1515
"github.com/cortexproject/cortex/pkg/util"
1616
"github.com/cortexproject/cortex/pkg/util/extract"
17+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
1718
"github.com/weaveworks/common/user"
1819
)
1920

@@ -74,7 +75,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor
7475

7576
// Get implements Store
7677
func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) {
77-
log, ctx := newSpanLogger(ctx, "SeriesStore.Get")
78+
log, ctx := spanlogger.New(ctx, "SeriesStore.Get")
7879
defer log.Span.Finish()
7980
level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers))
8081

@@ -138,7 +139,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
138139
}
139140

140141
func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, metricName string, matchers []*labels.Matcher) ([]string, error) {
141-
log, ctx := newSpanLogger(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
142+
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
142143
defer log.Span.Finish()
143144

144145
// Just get series for metric if there are no matchers
@@ -202,7 +203,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
202203
}
203204

204205
func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, metricName string, matcher *labels.Matcher) ([]string, error) {
205-
log, ctx := newSpanLogger(ctx, "SeriesStore.lookupSeriesByMetricNameMatcher", "metricName", metricName, "matcher", matcher)
206+
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatcher", "metricName", metricName, "matcher", matcher)
206207
defer log.Span.Finish()
207208

208209
userID, err := user.ExtractOrgID(ctx)
@@ -263,7 +264,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
263264
}
264265

265266
func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through model.Time, seriesIDs []string) ([]string, error) {
266-
log, ctx := newSpanLogger(ctx, "SeriesStore.lookupChunksBySeries")
267+
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupChunksBySeries")
267268
defer log.Span.Finish()
268269

269270
userID, err := user.ExtractOrgID(ctx)

pkg/querier/correctness/case.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
type Case interface {
1414
prometheus.Collector
1515

16+
Name() string
1617
Query(ctx context.Context, client v1.API, selectors string, start time.Time, duration time.Duration) ([]model.SamplePair, error)
1718
ExpectedValueAt(time.Time) float64
1819
Quantized(time.Duration) time.Duration

pkg/querier/correctness/runner.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,24 @@ package correctness
33
import (
44
"context"
55
"flag"
6+
"fmt"
67
"math"
78
"math/rand"
89
"net/http"
910
"sync"
1011
"time"
1112

13+
"github.com/go-kit/kit/log/level"
14+
"github.com/opentracing-contrib/go-stdlib/nethttp"
15+
opentracing "github.com/opentracing/opentracing-go"
16+
otlog "github.com/opentracing/opentracing-go/log"
1217
"github.com/prometheus/client_golang/api"
1318
"github.com/prometheus/client_golang/api/prometheus/v1"
1419
"github.com/prometheus/client_golang/prometheus"
1520
"github.com/prometheus/client_golang/prometheus/promhttp"
1621
"github.com/prometheus/common/model"
17-
log "github.com/sirupsen/logrus"
18-
"github.com/weaveworks/common/instrument"
22+
23+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
1924
"github.com/weaveworks/common/user"
2025
)
2126

@@ -105,11 +110,14 @@ func NewRunner(cfg RunnerConfig) (*Runner, error) {
105110
Address: cfg.prometheusAddr,
106111
}
107112
if cfg.userID != "" {
108-
apiCfg.RoundTripper = promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
109-
user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cfg.userID), req)
110-
return http.DefaultTransport.RoundTrip(req)
111-
})
113+
apiCfg.RoundTripper = &nethttp.Transport{
114+
RoundTripper: promhttp.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
115+
user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cfg.userID), req)
116+
return api.DefaultRoundTripper.RoundTrip(req)
117+
}),
118+
}
112119
}
120+
113121
client, err := api.NewClient(apiCfg)
114122
if err != nil {
115123
return nil, err
@@ -118,13 +126,23 @@ func NewRunner(cfg RunnerConfig) (*Runner, error) {
118126
tc := &Runner{
119127
cfg: cfg,
120128
quit: make(chan struct{}),
121-
client: v1.NewAPI(client),
129+
client: v1.NewAPI(tracingClient{client}),
122130
}
123131
tc.wg.Add(1)
124132
go tc.verifyLoop()
125133
return tc, nil
126134
}
127135

136+
type tracingClient struct {
137+
api.Client
138+
}
139+
140+
func (t tracingClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
141+
req, tr := nethttp.TraceRequest(opentracing.GlobalTracer(), req)
142+
defer tr.Finish()
143+
return t.Client.Do(ctx, req)
144+
}
145+
128146
// Stop the checking goroutine.
129147
func (r *Runner) Stop() {
130148
close(r.quit)
@@ -177,6 +195,11 @@ func (r *Runner) runRandomTest() {
177195
tc := r.cases[rand.Intn(len(r.cases))]
178196
r.mtx.Unlock()
179197

198+
ctx := context.Background()
199+
log, ctx := spanlogger.New(ctx, "runRandomTest")
200+
level.Info(log).Log("name", tc.Name())
201+
defer log.Finish()
202+
180203
// pick a random time to start testStart and now
181204
// pick a random length between minDuration and maxDuration
182205
now := time.Now()
@@ -189,17 +212,14 @@ func (r *Runner) runRandomTest() {
189212
if duration < r.cfg.testQueryMinSize {
190213
return
191214
}
215+
level.Info(log).Log("start", start, "duration", duration)
192216

193-
var pairs []model.SamplePair
194-
err := instrument.TimeRequestHistogram(context.Background(), "Prometheus.Query", prometheusRequestDuration, func(ctx context.Context) error {
195-
var err error
196-
pairs, err = tc.Query(ctx, r.client, r.cfg.extraSelectors, start, duration)
197-
return err
198-
})
217+
pairs, err := tc.Query(ctx, r.client, r.cfg.extraSelectors, start, duration)
199218
if err != nil {
200-
log.Errorf("Error running test: %v", err)
219+
level.Info(log).Log("err", err)
201220
return
202221
}
222+
203223
failures := false
204224
for _, pair := range pairs {
205225
correct := r.timeEpsilonCorrect(tc.ExpectedValueAt, pair) || r.valueEpsilonCorrect(tc.ExpectedValueAt, pair)
@@ -208,13 +228,15 @@ func (r *Runner) runRandomTest() {
208228
} else {
209229
failures = true
210230
sampleResult.WithLabelValues(fail).Inc()
211-
log.Errorf("Wrong value: %f !~ %f", tc.ExpectedValueAt(pair.Timestamp.Time()), pair.Value)
231+
level.Error(log).Log("msg", "wrong value", "expected", tc.ExpectedValueAt(pair.Timestamp.Time()), "actual", pair.Value)
232+
log.LogFields(otlog.Error(fmt.Errorf("wrong value")))
212233
}
213234
}
214235

215236
expectedNumSamples := int(tc.Quantized(duration) / r.cfg.ScrapeInterval)
216237
if !epsilonCorrect(float64(len(pairs)), float64(expectedNumSamples), r.cfg.samplesEpsilon) {
217-
log.Errorf("Expected %d samples, got %d", expectedNumSamples, len(pairs))
238+
level.Error(log).Log("msg", "wrong number of samples", "expected", expectedNumSamples, "actual", len(pairs))
239+
log.LogFields(otlog.Error(fmt.Errorf("wrong number of samples")))
218240
failures = true
219241
}
220242

pkg/querier/correctness/simple.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
9+
"github.com/go-kit/kit/log/level"
810
"github.com/prometheus/client_golang/api/prometheus/v1"
911
"github.com/prometheus/client_golang/prometheus"
1012
"github.com/prometheus/common/model"
11-
log "github.com/sirupsen/logrus"
1213
)
1314

1415
const (
@@ -41,14 +42,21 @@ func NewSimpleTestCase(name string, f func(time.Time) float64) Case {
4142
}
4243
}
4344

45+
func (tc *simpleTestCase) Name() string {
46+
return tc.name
47+
}
48+
4449
func (tc *simpleTestCase) ExpectedValueAt(t time.Time) float64 {
4550
return tc.expectedValueAt(t)
4651
}
4752

4853
func (tc *simpleTestCase) Query(ctx context.Context, client v1.API, selectors string, start time.Time, duration time.Duration) ([]model.SamplePair, error) {
54+
log, ctx := spanlogger.New(ctx, "simpleTestCase.Query")
55+
defer log.Finish()
56+
4957
metricName := prometheus.BuildFQName(namespace, subsystem, tc.name)
5058
query := fmt.Sprintf("%s{%s}[%dm]", metricName, selectors, duration/time.Minute)
51-
log.Println(query, "@", start)
59+
level.Info(log).Log("query", query)
5260

5361
value, err := client.Query(ctx, query, start)
5462
if err != nil {

0 commit comments

Comments
 (0)