diff --git a/ingester_client.go b/ingester_client.go index 2233ecdc9d3..06a12e7242f 100644 --- a/ingester_client.go +++ b/ingester_client.go @@ -77,8 +77,7 @@ func (c *IngesterClient) Append(ctx context.Context, samples []*model.Sample) er req.Timeseries = append(req.Timeseries, ts) } - var resp remote.WriteResponse - return c.doRequest(ctx, "/push", req, &resp, true) + return c.doRequest(ctx, "/push", req, nil, true) } // Query implements Querier. diff --git a/server.go b/server.go index 8cd9eee5262..d94500ad406 100644 --- a/server.go +++ b/server.go @@ -126,22 +126,6 @@ func AppenderHandler(appender SampleAppender) http.Handler { http.Error(w, err.Error(), http.StatusInternalServerError) return } - - respBuf, err := proto.Marshal(&remote.WriteResponse{}) - if err != nil { - log.Errorf("marshall err: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - if _, err := snappy.NewWriter(w).Write(respBuf); err != nil { - log.Errorf("write err: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Add("Content-Encoding", "snappy") - w.WriteHeader(http.StatusOK) }) } diff --git a/vendor/github.com/prometheus/prometheus/config/config.go b/vendor/github.com/prometheus/prometheus/config/config.go index 6a82ade0ec0..c83b7310982 100644 --- a/vendor/github.com/prometheus/prometheus/config/config.go +++ b/vendor/github.com/prometheus/prometheus/config/config.go @@ -192,7 +192,7 @@ type Config struct { RuleFiles []string `yaml:"rule_files,omitempty"` ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"` - RemoteWriteConfig []RemoteWriteConfig `yaml:"remote_write,omitempty"` + RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -1075,11 +1075,12 @@ func (re Regexp) MarshalYAML() (interface{}, error) { // RemoteWriteConfig is the configuration for remote storage. type RemoteWriteConfig struct { - URL URL `yaml:"url,omitempty"` - RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` - BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` - TLSConfig TLSConfig `yaml:"tls_config,omitempty"` - ProxyURL URL `yaml:"proxy_url,omitempty"` + URL *URL `yaml:"url,omitempty"` + RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` + BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` + ProxyURL URL `yaml:"proxy_url,omitempty"` + WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` diff --git a/vendor/github.com/prometheus/prometheus/relabel/LICENSE b/vendor/github.com/prometheus/prometheus/relabel/LICENSE new file mode 100644 index 00000000000..261eeb9e9f8 --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/relabel/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/prometheus/prometheus/relabel/relabel.go b/vendor/github.com/prometheus/prometheus/relabel/relabel.go new file mode 100644 index 00000000000..0e5d616ca30 --- /dev/null +++ b/vendor/github.com/prometheus/prometheus/relabel/relabel.go @@ -0,0 +1,102 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package relabel + +import ( + "crypto/md5" + "fmt" + "strings" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/config" +) + +// Process returns a relabeled copy of the given label set. The relabel configurations +// are applied in order of input. +// If a label set is dropped, nil is returned. +func Process(labels model.LabelSet, cfgs ...*config.RelabelConfig) model.LabelSet { + out := model.LabelSet{} + for ln, lv := range labels { + out[ln] = lv + } + for _, cfg := range cfgs { + if out = relabel(out, cfg); out == nil { + return nil + } + } + return out +} + +func relabel(labels model.LabelSet, cfg *config.RelabelConfig) model.LabelSet { + values := make([]string, 0, len(cfg.SourceLabels)) + for _, ln := range cfg.SourceLabels { + values = append(values, string(labels[ln])) + } + val := strings.Join(values, cfg.Separator) + + switch cfg.Action { + case config.RelabelDrop: + if cfg.Regex.MatchString(val) { + return nil + } + case config.RelabelKeep: + if !cfg.Regex.MatchString(val) { + return nil + } + case config.RelabelReplace: + indexes := cfg.Regex.FindStringSubmatchIndex(val) + // If there is no match no replacement must take place. + if indexes == nil { + break + } + res := cfg.Regex.ExpandString([]byte{}, cfg.Replacement, val, indexes) + if len(res) == 0 { + delete(labels, cfg.TargetLabel) + } else { + labels[cfg.TargetLabel] = model.LabelValue(res) + } + case config.RelabelHashMod: + mod := sum64(md5.Sum([]byte(val))) % cfg.Modulus + labels[cfg.TargetLabel] = model.LabelValue(fmt.Sprintf("%d", mod)) + case config.RelabelLabelMap: + out := make(model.LabelSet, len(labels)) + // Take a copy to avoid infinite loops. + for ln, lv := range labels { + out[ln] = lv + } + for ln, lv := range labels { + if cfg.Regex.MatchString(string(ln)) { + res := cfg.Regex.ReplaceAllString(string(ln), cfg.Replacement) + out[model.LabelName(res)] = lv + } + } + labels = out + default: + panic(fmt.Errorf("retrieval.relabel: unknown relabel action type %q", cfg.Action)) + } + return labels +} + +// sum64 sums the md5 hash to an uint64. +func sum64(hash [md5.Size]byte) uint64 { + var s uint64 + + for i, b := range hash { + shift := uint64((md5.Size - i - 1) * 8) + + s |= uint64(b) << shift + } + return s +} diff --git a/vendor/github.com/prometheus/prometheus/storage/local/chunk/chunk.go b/vendor/github.com/prometheus/prometheus/storage/local/chunk/chunk.go index d26237394bf..738e678f153 100644 --- a/vendor/github.com/prometheus/prometheus/storage/local/chunk/chunk.go +++ b/vendor/github.com/prometheus/prometheus/storage/local/chunk/chunk.go @@ -273,6 +273,7 @@ type Chunk interface { Unmarshal(io.Reader) error UnmarshalFromBuf([]byte) error Encoding() Encoding + Utilization() float64 } // Iterator enables efficient access to the content of a chunk. It is diff --git a/vendor/github.com/prometheus/prometheus/storage/local/chunk/delta.go b/vendor/github.com/prometheus/prometheus/storage/local/chunk/delta.go index f9bbff4a8af..911ab63fad7 100644 --- a/vendor/github.com/prometheus/prometheus/storage/local/chunk/delta.go +++ b/vendor/github.com/prometheus/prometheus/storage/local/chunk/delta.go @@ -267,6 +267,11 @@ func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { // encoding implements chunk. func (c deltaEncodedChunk) Encoding() Encoding { return Delta } +// Utilization implements chunk. +func (c deltaEncodedChunk) Utilization() float64 { + return float64(len(c)) / float64(cap(c)) +} + func (c deltaEncodedChunk) timeBytes() deltaBytes { return deltaBytes(c[deltaHeaderTimeBytesOffset]) } diff --git a/vendor/github.com/prometheus/prometheus/storage/local/chunk/doubledelta.go b/vendor/github.com/prometheus/prometheus/storage/local/chunk/doubledelta.go index debf5232d91..51cc6034666 100644 --- a/vendor/github.com/prometheus/prometheus/storage/local/chunk/doubledelta.go +++ b/vendor/github.com/prometheus/prometheus/storage/local/chunk/doubledelta.go @@ -277,6 +277,11 @@ func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { // encoding implements chunk. func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta } +// Utilization implements chunk. +func (c doubleDeltaEncodedChunk) Utilization() float64 { + return float64(len(c)) / float64(cap(c)) +} + func (c doubleDeltaEncodedChunk) baseTime() model.Time { return model.Time( binary.LittleEndian.Uint64( diff --git a/vendor/github.com/prometheus/prometheus/storage/local/chunk/varbit.go b/vendor/github.com/prometheus/prometheus/storage/local/chunk/varbit.go index f5590abd3d8..21f7cd759b4 100644 --- a/vendor/github.com/prometheus/prometheus/storage/local/chunk/varbit.go +++ b/vendor/github.com/prometheus/prometheus/storage/local/chunk/varbit.go @@ -322,6 +322,12 @@ func (c varbitChunk) UnmarshalFromBuf(buf []byte) error { // encoding implements chunk. func (c varbitChunk) Encoding() Encoding { return Varbit } +// Utilization implements chunk. +func (c varbitChunk) Utilization() float64 { + // 15 bytes is the length of the chunk footer. + return math.Min(float64(c.nextSampleOffset()/8+15)/float64(cap(c)), 1) +} + // FirstTime implements chunk. func (c varbitChunk) FirstTime() model.Time { return model.Time( diff --git a/vendor/github.com/prometheus/prometheus/storage/local/storage.go b/vendor/github.com/prometheus/prometheus/storage/local/storage.go index 38467a805b5..1288389e03b 100644 --- a/vendor/github.com/prometheus/prometheus/storage/local/storage.go +++ b/vendor/github.com/prometheus/prometheus/storage/local/storage.go @@ -410,19 +410,19 @@ func (s *MemorySeriesStorage) WaitForIndexing() { // LastSampleForLabelMatchers implements Storage. func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { - fps := map[model.Fingerprint]struct{}{} + mergedFPs := map[model.Fingerprint]struct{}{} for _, matchers := range matcherSets { - fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...) if err != nil { return nil, err } - for fp := range fpToMetric { - fps[fp] = struct{}{} + for fp := range fps { + mergedFPs[fp] = struct{}{} } } - res := make(model.Vector, 0, len(fps)) - for fp := range fps { + res := make(model.Vector, 0, len(mergedFPs)) + for fp := range mergedFPs { s.fpLocker.Lock(fp) series, ok := s.fpToSeries.get(fp) @@ -480,13 +480,13 @@ func (bit *boundedIterator) Close() { // QueryRange implements Storage. func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForRange(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForRange(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -497,13 +497,13 @@ func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, sta from := ts.Add(-stalenessDelta) through := ts - fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) if err != nil { return nil, err } - iterators := make([]SeriesIterator, 0, len(fpToMetric)) - for fp := range fpToMetric { - it := s.preloadChunksForInstant(fp, from, through) + iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) + for _, pair := range fpSeriesPairs { + it := s.preloadChunksForInstant(pair, from, through) iterators = append(iterators, it) } return iterators, nil @@ -558,43 +558,43 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( return metrics, nil } -func (s *MemorySeriesStorage) metricsForLabelMatchers( - from, through model.Time, +// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked. +func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( matchers ...*metric.LabelMatcher, -) (map[model.Fingerprint]metric.Metric, error) { +) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) { sort.Sort(metric.LabelMatchers(matchers)) if len(matchers) == 0 || matchers[0].MatchesEmptyString() { // No matchers at all or even the best matcher matches the empty string. - return nil, nil + return nil, nil, nil } var ( matcherIdx int - remainingFPs map[model.Fingerprint]struct{} + candidateFPs map[model.Fingerprint]struct{} ) // Equal matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpEqualMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.Type != metric.Equal || m.MatchesEmptyString() { break } - remainingFPs = s.fingerprintsForLabelPair( + candidateFPs = s.fingerprintsForLabelPair( model.LabelPair{ Name: m.Name, Value: m.Value, }, nil, - remainingFPs, + candidateFPs, ) - if len(remainingFPs) == 0 { - return nil, nil + if len(candidateFPs) == 0 { + return nil, nil, nil } } // Other matchers. - for ; matcherIdx < len(matchers) && (remainingFPs == nil || len(remainingFPs) > fpOtherMatchThreshold); matcherIdx++ { + for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ { m := matchers[matcherIdx] if m.MatchesEmptyString() { break @@ -602,11 +602,11 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) if err != nil { - return nil, err + return nil, nil, err } lvs = m.Filter(lvs) if len(lvs) == 0 { - return nil, nil + return nil, nil, nil } fps := map[model.Fingerprint]struct{}{} for _, lv := range lvs { @@ -616,29 +616,104 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( Value: lv, }, fps, - remainingFPs, + candidateFPs, ) } - remainingFPs = fps - if len(remainingFPs) == 0 { - return nil, nil + candidateFPs = fps + if len(candidateFPs) == 0 { + return nil, nil, nil } } + return candidateFPs, matchers[matcherIdx:], nil +} - result := map[model.Fingerprint]metric.Metric{} - for fp := range remainingFPs { +func (s *MemorySeriesStorage) seriesForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) ([]fingerprintSeriesPair, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + + result := []fingerprintSeriesPair{} +FPLoop: + for fp := range candidateFPs { s.fpLocker.Lock(fp) - if met, _, ok := s.metricForRange(fp, from, through); ok { - result[fp] = metric.Metric{Metric: met} + series := s.seriesForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if series == nil { + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(series.metric[m.Name]) { + continue FPLoop + } } + result = append(result, fingerprintSeriesPair{fp, series}) + } + return result, nil +} + +func (s *MemorySeriesStorage) fpsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]struct{}, error) { + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + met, _, ok := s.metricForRange(fp, from, through) s.fpLocker.Unlock(fp) + + if !ok { + delete(candidateFPs, fp) + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(met[m.Name]) { + delete(candidateFPs, fp) + continue FPLoop + } + } } - for _, m := range matchers[matcherIdx:] { - for fp, met := range result { - if !m.Match(met.Metric[m.Name]) { - delete(result, fp) + return candidateFPs, nil +} + +func (s *MemorySeriesStorage) metricsForLabelMatchers( + from, through model.Time, + matchers ...*metric.LabelMatcher, +) (map[model.Fingerprint]metric.Metric, error) { + + candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) + if err != nil { + return nil, err + } + + result := map[model.Fingerprint]metric.Metric{} +FPLoop: + for fp := range candidateFPs { + s.fpLocker.Lock(fp) + met, _, ok := s.metricForRange(fp, from, through) + s.fpLocker.Unlock(fp) + + if !ok { + continue FPLoop + } + + for _, m := range matchersToCheck { + if !m.Match(met[m.Name]) { + continue FPLoop } } + result[fp] = metric.Metric{Metric: met} } return result, nil } @@ -696,14 +771,14 @@ func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelNa // DropMetricsForLabelMatchers implements Storage. func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { - fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) + fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...) if err != nil { return 0, err } - for fp := range fpToMetric { + for fp := range fps { s.purgeSeries(fp, nil, nil) } - return len(fpToMetric), nil + return len(fps), nil } var ( @@ -864,7 +939,7 @@ func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me return series, nil } -// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +// seriesForRange is a helper method for seriesForLabelMatchers. // // The caller must have locked the fp. func (s *MemorySeriesStorage) seriesForRange( @@ -883,16 +958,17 @@ func (s *MemorySeriesStorage) seriesForRange( } func (s *MemorySeriesStorage) preloadChunksForRange( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForRange(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) @@ -902,16 +978,17 @@ func (s *MemorySeriesStorage) preloadChunksForRange( } func (s *MemorySeriesStorage) preloadChunksForInstant( - fp model.Fingerprint, + pair fingerprintSeriesPair, from model.Time, through model.Time, ) SeriesIterator { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) - - series := s.seriesForRange(fp, from, through) + fp, series := pair.fp, pair.series if series == nil { return nopIter } + + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + iter, err := series.preloadChunksForInstant(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) diff --git a/vendor/github.com/prometheus/prometheus/storage/local/test_helpers.go b/vendor/github.com/prometheus/prometheus/storage/local/test_helpers.go index e28cf19ffd9..bdc26c555b0 100644 --- a/vendor/github.com/prometheus/prometheus/storage/local/test_helpers.go +++ b/vendor/github.com/prometheus/prometheus/storage/local/test_helpers.go @@ -66,3 +66,7 @@ func NewTestStorage(t testutil.T, encoding chunk.Encoding) (*MemorySeriesStorage return storage, closer } + +func makeFingerprintSeriesPair(s *MemorySeriesStorage, fp model.Fingerprint) fingerprintSeriesPair { + return fingerprintSeriesPair{fp, s.seriesForRange(fp, model.Earliest, model.Latest)} +} diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/client.go b/vendor/github.com/prometheus/prometheus/storage/remote/client.go index 5befc38bd09..771ecb696ca 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/client.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/client.go @@ -31,14 +31,13 @@ import ( // Client allows sending batches of Prometheus samples to an HTTP endpoint. type Client struct { - index int // Used to differentiate metrics url config.URL client *http.Client timeout time.Duration } // NewClient creates a new Client. -func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) { +func NewClient(conf config.RemoteWriteConfig) (*Client, error) { tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { return nil, err @@ -56,8 +55,7 @@ func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) { } return &Client{ - index: index, - url: conf.URL, + url: *conf.URL, client: httputil.NewClient(rt), timeout: time.Duration(conf.RemoteTimeout), }, nil @@ -117,10 +115,6 @@ func (c *Client) Store(samples model.Samples) error { } // Name identifies the client as a generic client. -// -// TODO: This client is going to be the only one soon - then this method -// will simply be removed in the restructuring and the whole "generic" naming -// will be gone for good. func (c Client) Name() string { - return fmt.Sprintf("generic:%d:%s", c.index, c.url) + return "generic" } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/remote.go b/vendor/github.com/prometheus/prometheus/storage/remote/remote.go index 784001c1b4b..4aedb794ffa 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/remote.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/remote.go @@ -24,6 +24,7 @@ import ( influx "github.com/influxdb/influxdb/client" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/storage/remote/graphite" "github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/opentsdb" @@ -33,6 +34,7 @@ import ( type Storage struct { queues []*StorageQueueManager externalLabels model.LabelSet + relabelConfigs []*config.RelabelConfig mtx sync.RWMutex } @@ -42,6 +44,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { defer s.mtx.Unlock() s.externalLabels = conf.GlobalConfig.ExternalLabels + s.relabelConfigs = conf.RemoteWriteConfig.WriteRelabelConfigs return nil } @@ -116,8 +119,14 @@ func (s *Storage) Append(smpl *model.Sample) error { snew.Metric[ln] = lv } } + snew.Metric = model.Metric( + relabel.Process(model.LabelSet(snew.Metric), s.relabelConfigs...)) s.mtx.RUnlock() + if snew.Metric == nil { + return nil + } + for _, q := range s.queues { q.Append(&snew) } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/remote.pb.go b/vendor/github.com/prometheus/prometheus/storage/remote/remote.pb.go index afb55fdfac2..a5a5356bf7c 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/remote.pb.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/remote.pb.go @@ -13,7 +13,6 @@ It has these top-level messages: LabelPair TimeSeries WriteRequest - WriteResponse */ package remote @@ -93,40 +92,29 @@ func (m *WriteRequest) GetTimeseries() []*TimeSeries { return nil } -type WriteResponse struct { -} - -func (m *WriteResponse) Reset() { *m = WriteResponse{} } -func (m *WriteResponse) String() string { return proto.CompactTextString(m) } -func (*WriteResponse) ProtoMessage() {} -func (*WriteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } - func init() { proto.RegisterType((*Sample)(nil), "remote.Sample") proto.RegisterType((*LabelPair)(nil), "remote.LabelPair") proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries") proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest") - proto.RegisterType((*WriteResponse)(nil), "remote.WriteResponse") } func init() { proto.RegisterFile("remote.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 250 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x91, 0x4f, 0x4b, 0x03, 0x31, - 0x10, 0xc5, 0xdd, 0xd6, 0xae, 0x74, 0xba, 0x2a, 0x0e, 0x15, 0x8a, 0x27, 0xcd, 0x69, 0xbd, 0xf4, - 0xb0, 0xa2, 0x57, 0xd1, 0xb3, 0x82, 0xa4, 0x82, 0x47, 0x49, 0x61, 0x0e, 0x0b, 0x9b, 0x66, 0xcd, - 0xa4, 0x7e, 0x7e, 0xb3, 0xf9, 0xd3, 0x2e, 0xde, 0x66, 0xde, 0xbc, 0xf9, 0xcd, 0x0b, 0x81, 0xca, - 0x92, 0x36, 0x8e, 0xd6, 0xbd, 0x35, 0xce, 0x60, 0x19, 0x3b, 0xf1, 0x02, 0xe5, 0x46, 0xe9, 0xbe, - 0x23, 0x5c, 0xc2, 0xec, 0x57, 0x75, 0x7b, 0x5a, 0x15, 0xb7, 0x45, 0x5d, 0xc8, 0xd8, 0xe0, 0x1d, - 0x54, 0xae, 0xd5, 0xc4, 0xce, 0x9b, 0xbe, 0x35, 0xaf, 0x26, 0x7e, 0x38, 0x95, 0x8b, 0x83, 0xf6, - 0xce, 0xe2, 0x11, 0xe6, 0x6f, 0x6a, 0x4b, 0xdd, 0x87, 0x6a, 0x2d, 0x22, 0x9c, 0xee, 0x94, 0x8e, - 0x90, 0xb9, 0x0c, 0xf5, 0x91, 0x3c, 0x09, 0x62, 0x6c, 0x84, 0x02, 0xf8, 0xf4, 0x94, 0x0d, 0xd9, - 0x96, 0x18, 0xef, 0xa1, 0xec, 0x06, 0x08, 0xfb, 0xcd, 0x69, 0xbd, 0x68, 0xae, 0xd6, 0x29, 0xee, - 0x01, 0x2d, 0x93, 0x01, 0x6b, 0x38, 0xe3, 0x10, 0x79, 0x48, 0x33, 0x78, 0x2f, 0xb2, 0x37, 0xbe, - 0x44, 0xe6, 0xb1, 0x78, 0x85, 0xea, 0xcb, 0xb6, 0x8e, 0x24, 0xfd, 0xec, 0x7d, 0x5c, 0x6c, 0x00, - 0x42, 0xf0, 0x70, 0x32, 0x1d, 0xc2, 0xbc, 0x7c, 0x0c, 0x23, 0x47, 0x2e, 0x71, 0x09, 0xe7, 0x89, - 0xc1, 0xbd, 0xd9, 0x31, 0x35, 0xcf, 0x30, 0x0b, 0x02, 0x3e, 0xe5, 0x62, 0x99, 0x11, 0xe3, 0x63, - 0x37, 0xd7, 0xff, 0xd4, 0xb8, 0x2e, 0x4e, 0xb6, 0x65, 0xf8, 0x81, 0x87, 0xbf, 0x00, 0x00, 0x00, - 0xff, 0xff, 0xed, 0x75, 0xbc, 0xc4, 0x91, 0x01, 0x00, 0x00, + // 216 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x4c, 0x90, 0x3f, 0x4f, 0x80, 0x30, + 0x10, 0xc5, 0x03, 0x68, 0x0d, 0x07, 0x31, 0xf1, 0xe2, 0xc0, 0xa8, 0x9d, 0x70, 0x61, 0xc0, 0xf8, + 0x01, 0x74, 0xd6, 0xc4, 0x14, 0x13, 0x47, 0x53, 0x92, 0x1b, 0x9a, 0xb4, 0x82, 0x6d, 0xf1, 0xf3, + 0x5b, 0x5a, 0xfe, 0xb8, 0xf5, 0xdd, 0xbd, 0x7b, 0xf7, 0xeb, 0x41, 0x6d, 0xc9, 0x4c, 0x9e, 0xba, + 0xd9, 0x4e, 0x7e, 0x42, 0x96, 0x14, 0x7f, 0x06, 0x36, 0x48, 0x33, 0x6b, 0xc2, 0x5b, 0xb8, 0xfc, + 0x95, 0x7a, 0xa1, 0x26, 0xbb, 0xcb, 0xda, 0x4c, 0x24, 0x81, 0xf7, 0x50, 0x7b, 0x65, 0xc8, 0xf9, + 0x60, 0xfa, 0x32, 0xae, 0xc9, 0x43, 0xb3, 0x10, 0xd5, 0x51, 0x7b, 0x73, 0xfc, 0x09, 0xca, 0x57, + 0x39, 0x92, 0x7e, 0x97, 0xca, 0x22, 0xc2, 0xc5, 0xb7, 0x34, 0x29, 0xa4, 0x14, 0xf1, 0x7d, 0x26, + 0xe7, 0xb1, 0x98, 0x04, 0x97, 0x00, 0x1f, 0x21, 0x65, 0x20, 0xab, 0xc8, 0xe1, 0x03, 0x30, 0xbd, + 0x86, 0xb8, 0x30, 0x59, 0xb4, 0x55, 0x7f, 0xd3, 0x6d, 0xb8, 0x47, 0xb4, 0xd8, 0x0c, 0xd8, 0xc2, + 0x95, 0x8b, 0xc8, 0x2b, 0xcd, 0xea, 0xbd, 0xde, 0xbd, 0xe9, 0x27, 0x62, 0x6f, 0xf3, 0x17, 0xa8, + 0x3f, 0xad, 0xf2, 0x24, 0xe8, 0x67, 0x09, 0xb8, 0xd8, 0x03, 0x44, 0xf0, 0xb8, 0x72, 0x5b, 0x84, + 0xfb, 0xf0, 0x09, 0x23, 0xfe, 0xb9, 0x46, 0x16, 0xef, 0xf5, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff, + 0x73, 0xb4, 0xd1, 0xb6, 0x3f, 0x01, 0x00, 0x00, } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/remote_reloadable.go b/vendor/github.com/prometheus/prometheus/storage/remote/remote_reloadable.go index 85fbed6d456..d42a187f325 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/remote_reloadable.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/remote_reloadable.go @@ -19,15 +19,16 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/relabel" ) // Storage collects multiple remote storage queues. type ReloadableStorage struct { mtx sync.RWMutex externalLabels model.LabelSet - conf []config.RemoteWriteConfig + conf config.RemoteWriteConfig - queues []*StorageQueueManager + queue *StorageQueueManager } // New returns a new remote Storage. @@ -42,31 +43,32 @@ func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error { // TODO: we should only stop & recreate queues which have changes, // as this can be quite disruptive. - newQueues := []*StorageQueueManager{} - for i, c := range conf.RemoteWriteConfig { - c, err := NewClient(i, c) + var newQueue *StorageQueueManager + + if conf.RemoteWriteConfig.URL != nil { + c, err := NewClient(conf.RemoteWriteConfig) if err != nil { return err } - newQueues = append(newQueues, NewStorageQueueManager(c, nil)) + newQueue = NewStorageQueueManager(c, nil) } - for _, q := range s.queues { - q.Stop() + if s.queue != nil { + s.queue.Stop() } - s.queues = newQueues - s.externalLabels = conf.GlobalConfig.ExternalLabels + s.queue = newQueue s.conf = conf.RemoteWriteConfig - for _, q := range s.queues { - q.Start() + s.externalLabels = conf.GlobalConfig.ExternalLabels + if s.queue != nil { + s.queue.Start() } return nil } // Stop the background processing of the storage queues. func (s *ReloadableStorage) Stop() { - for _, q := range s.queues { - q.Stop() + if s.queue != nil { + s.queue.Stop() } } @@ -84,9 +86,14 @@ func (s *ReloadableStorage) Append(smpl *model.Sample) error { snew.Metric[ln] = lv } } + snew.Metric = model.Metric( + relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...)) - for _, q := range s.queues { - q.Append(&snew) + if snew.Metric == nil { + return nil + } + if s.queue != nil { + s.queue.Append(&snew) } return nil } diff --git a/vendor/manifest b/vendor/manifest index 86923b34a11..1f3115d5500 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -549,7 +549,7 @@ "importpath": "github.com/prometheus/prometheus/config", "repository": "https://github.com/prometheus/prometheus", "vcs": "git", - "revision": "c0889fd92eb903dc54783229ac4add758a0da510", + "revision": "2844a8c7b547b674994927bd25c4bfb75c6761a0", "branch": "master", "path": "config", "notests": true @@ -563,11 +563,20 @@ "path": "/promql", "notests": true }, + { + "importpath": "github.com/prometheus/prometheus/relabel", + "repository": "https://github.com/prometheus/prometheus", + "vcs": "git", + "revision": "2844a8c7b547b674994927bd25c4bfb75c6761a0", + "branch": "master", + "path": "/relabel", + "notests": true + }, { "importpath": "github.com/prometheus/prometheus/storage", "repository": "https://github.com/prometheus/prometheus", "vcs": "git", - "revision": "91727281252c935522f61fec2329c049331976b7", + "revision": "2844a8c7b547b674994927bd25c4bfb75c6761a0", "branch": "master", "path": "storage", "notests": true