Skip to content

Commit 425fd24

Browse files
committed
For the series() call don't fetch all the data too
Workaround for upstream issue prometheus/prometheus#4057
1 parent 4eb82d5 commit 425fd24

File tree

4 files changed

+165
-3
lines changed

4 files changed

+165
-3
lines changed

promclient/client.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,26 @@ func GetData(ctx context.Context, url string, client *http.Client, labelset mode
4848
}
4949
}
5050

51-
func GetSeries(ctx context.Context, url string, client *http.Client) ([]model.LabelSet, error) {
51+
func GetSeries(ctx context.Context, url string, client *http.Client, labelset model.LabelSet) (model.Value, error) {
5252
promResp := &SeriesResult{}
5353
if err := DoRequest(ctx, url, client, promResp); err == nil {
5454
if promResp.Status != promhttputil.StatusSuccess {
5555
return nil, fmt.Errorf(promResp.Error)
5656
}
57-
return promResp.Data, nil
57+
58+
// convert to vector (there aren't points, but this way we don't have to make more merging functions)
59+
retVector := make(model.Vector, len(promResp.Data))
60+
for j, labelset := range promResp.Data {
61+
retVector[j] = &model.Sample{
62+
Metric: model.Metric(labelset),
63+
}
64+
}
65+
66+
if err := promhttputil.ValueAddLabelSet(retVector, labelset); err != nil {
67+
return nil, err
68+
}
69+
70+
return retVector, nil
5871
} else {
5972
return nil, err
6073
}

proxyquerier/querier.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/jacksontj/promxy/promclient"
99
"github.com/jacksontj/promxy/servergroup"
1010
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/common/model"
1112
"github.com/prometheus/prometheus/pkg/labels"
1213
"github.com/prometheus/prometheus/pkg/timestamp"
1314
"github.com/prometheus/prometheus/storage"
@@ -45,7 +46,18 @@ func (h *ProxyQuerier) Select(selectParams *storage.SelectParams, matchers ...*l
4546
}).Debug("Select")
4647
}()
4748

48-
result, err := h.ServerGroups.GetValue(h.Ctx, timestamp.Time(selectParams.Start), timestamp.Time(selectParams.End), matchers)
49+
var result model.Value
50+
var err error
51+
// Select() is a combined API call for query/query_range/series.
52+
// as of right now there is no great way of differentiating between a
53+
// data call (query/query_range) and a metadata call (series). For now
54+
// the working workaround is to switch based on the selectParams.
55+
// https://github.com/prometheus/prometheus/issues/4057
56+
if selectParams == nil {
57+
result, err = h.ServerGroups.GetSeries(h.Ctx, h.Start, h.End, matchers)
58+
} else {
59+
result, err = h.ServerGroups.GetValue(h.Ctx, timestamp.Time(selectParams.Start), timestamp.Time(selectParams.End), matchers)
60+
}
4961
if err != nil {
5062
return nil, err
5163
}

servergroup/servergroup.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,3 +460,90 @@ func (s *ServerGroup) GetValuesForLabelName(ctx context.Context, path string) ([
460460

461461
return result, nil
462462
}
463+
464+
func (s *ServerGroup) GetSeries(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, error) {
465+
filteredMatchers, ok := s.FilterMatchers(matchers)
466+
if !ok {
467+
return nil, nil
468+
}
469+
470+
// Create the query params
471+
values := url.Values{}
472+
473+
urlBase := "/api/v1/series"
474+
475+
// Add matchers
476+
// http://localhost:8080/api/v1/query?query=scrape_duration_seconds%7Bjob%3D%22prometheus%22%7D&time=1507412244.663&_=1507412096887
477+
pql, err := promhttputil.MatcherToString(filteredMatchers)
478+
if err != nil {
479+
return nil, err
480+
}
481+
values.Add("match[]", pql)
482+
483+
values.Add("start", model.Time(timestamp.FromTime(start)).String())
484+
values.Add("end", model.Time(timestamp.FromTime(end)).String())
485+
486+
targets := s.Targets()
487+
488+
childContext, childContextCancel := context.WithCancel(ctx)
489+
defer childContextCancel()
490+
resultChans := make([]chan interface{}, len(targets))
491+
492+
for i, target := range targets {
493+
resultChans[i] = make(chan interface{}, 1)
494+
parsedUrl, err := url.Parse(target + urlBase)
495+
if err != nil {
496+
return nil, err
497+
}
498+
parsedUrl.RawQuery = values.Encode()
499+
go func(retChan chan interface{}, stringUrl string) {
500+
start := time.Now()
501+
result, err := promclient.GetSeries(childContext, stringUrl, s.Client, s.Cfg.Labels)
502+
took := time.Now().Sub(start)
503+
if err != nil {
504+
serverGroupSummary.WithLabelValues(parsedUrl.Host, "getdata", "error").Observe(float64(took))
505+
retChan <- err
506+
} else {
507+
serverGroupSummary.WithLabelValues(parsedUrl.Host, "getdata", "success").Observe(float64(took))
508+
retChan <- result
509+
}
510+
}(resultChans[i], parsedUrl.String())
511+
}
512+
513+
// Wait for results as we get them
514+
var result model.Value
515+
var lastError error
516+
errCount := 0
517+
for i := 0; i < len(targets); i++ {
518+
select {
519+
case <-ctx.Done():
520+
return nil, ctx.Err()
521+
522+
case ret := <-resultChans[i]:
523+
switch retTyped := ret.(type) {
524+
case error:
525+
lastError = retTyped
526+
errCount++
527+
case model.Value:
528+
// TODO: check qData.ResultType
529+
if result == nil {
530+
result = retTyped
531+
} else {
532+
var err error
533+
result, err = promhttputil.MergeValues(s.Cfg.GetAntiAffinity(), result, retTyped)
534+
if err != nil {
535+
return nil, err
536+
}
537+
}
538+
default:
539+
return nil, fmt.Errorf("Unknown return type")
540+
}
541+
}
542+
}
543+
544+
if errCount != 0 && errCount == len(targets) {
545+
return nil, fmt.Errorf("Unable to fetch from downstream servers, lastError: %s", lastError.Error())
546+
}
547+
548+
return result, nil
549+
}

servergroup/servergroups.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,53 @@ func (s ServerGroups) GetValuesForLabelName(ctx context.Context, path string) ([
163163

164164
return result, nil
165165
}
166+
167+
func (s ServerGroups) GetSeries(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, error) {
168+
childContext, childContextCancel := context.WithCancel(ctx)
169+
defer childContextCancel()
170+
resultChans := make([]chan interface{}, len(s))
171+
172+
// Scatter out all the queries
173+
for i, serverGroup := range s {
174+
resultChans[i] = make(chan interface{}, 1)
175+
go func(retChan chan interface{}, serverGroup *ServerGroup) {
176+
result, err := serverGroup.GetSeries(childContext, start, end, matchers)
177+
if err != nil {
178+
retChan <- err
179+
} else {
180+
retChan <- result
181+
}
182+
}(resultChans[i], serverGroup)
183+
}
184+
185+
// Wait for results as we get them
186+
var result model.Value
187+
var lastError error
188+
errCount := 0
189+
for i := 0; i < len(s); i++ {
190+
select {
191+
case <-ctx.Done():
192+
return nil, ctx.Err()
193+
case ret := <-resultChans[i]:
194+
switch retTyped := ret.(type) {
195+
case error:
196+
lastError = retTyped
197+
errCount++
198+
case model.Value:
199+
var err error
200+
result, err = promhttputil.MergeValues(model.TimeFromUnix(0), result, retTyped)
201+
if err != nil {
202+
return nil, err
203+
}
204+
}
205+
}
206+
207+
}
208+
209+
// If we got only errors, lets return that
210+
if errCount == len(s) {
211+
return nil, fmt.Errorf("Unable to fetch from downstream servers, lastError: %s", lastError.Error())
212+
}
213+
214+
return result, nil
215+
}

0 commit comments

Comments
 (0)