Skip to content

Commit f308855

Browse files
integration test for deletion of series (#2946)
* integration test for deletion of series Signed-off-by: Sandeep Sukhani <[email protected]> * update changelog Signed-off-by: Sandeep Sukhani <[email protected]> * Reuse existing timeout Signed-off-by: Marco Pracucci <[email protected]> * Removed Println() Signed-off-by: Marco Pracucci <[email protected]> * Assert on Purger metrics too Signed-off-by: Marco Pracucci <[email protected]> * Removed time.Sleep() from delete series integration test Signed-off-by: Marco Pracucci <[email protected]> * Moved CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent e36e2ae commit f308855

File tree

7 files changed

+268
-2
lines changed

7 files changed

+268
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* `cortex_ingester_received_files`
88
* `cortex_ingester_received_bytes_total`
99
* `cortex_ingester_sent_bytes_total`
10+
* [CHANGE] Experimental Delete Series: `/api/v1/admin/tsdb/delete_series` and `/api/v1/admin/tsdb/cancel_delete_request` purger APIs to return status code `204` instead of `200` for success. #2946
1011
* [ENHANCEMENT] Add support for azure storage in China, German and US Government environments. #2988
1112
* [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994
1213
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990

integration/asserts.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
AlertManager
2424
Ruler
2525
StoreGateway
26+
Purger
2627
)
2728

2829
var (
@@ -36,6 +37,7 @@ var (
3637
AlertManager: {"cortex_alertmanager"},
3738
Ruler: {},
3839
StoreGateway: {"!cortex_storegateway_client", "cortex_storegateway"}, // The metrics prefix cortex_storegateway_client may be used by other components so we ignore it.
40+
Purger: {"cortex_purger"},
3941
}
4042

4143
// Blacklisted metrics prefixes across any Cortex service.
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// +build requires_docker
2+
3+
package main
4+
5+
import (
6+
"fmt"
7+
"testing"
8+
"time"
9+
10+
"github.com/prometheus/common/model"
11+
"github.com/prometheus/prometheus/pkg/labels"
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/cortexproject/cortex/integration/e2e"
17+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
18+
"github.com/cortexproject/cortex/integration/e2ecortex"
19+
)
20+
21+
func TestDeleteSeriesAllIndexBackends(t *testing.T) {
22+
s, err := e2e.NewScenario(networkName)
23+
require.NoError(t, err)
24+
defer s.Close()
25+
26+
// Start dependencies.
27+
dynamo := e2edb.NewDynamoDB()
28+
bigtable := e2edb.NewBigtable()
29+
cassandra := e2edb.NewCassandra()
30+
31+
stores := []string{"aws-dynamo", "bigtable", "cassandra"}
32+
perStoreDuration := 7 * 24 * time.Hour
33+
34+
consul := e2edb.NewConsul()
35+
require.NoError(t, s.StartAndWaitReady(cassandra, dynamo, bigtable, consul))
36+
37+
// lets build config for each type of Index Store.
38+
now := time.Now()
39+
oldestStoreStartTime := now.Add(time.Duration(-len(stores)) * perStoreDuration)
40+
41+
storeConfigs := make([]storeConfig, len(stores))
42+
for i, store := range stores {
43+
storeConfigs[i] = storeConfig{From: oldestStoreStartTime.Add(time.Duration(i) * perStoreDuration).UTC().Format("2006-01-02"), IndexStore: store}
44+
}
45+
46+
flags := mergeFlags(ChunksStorageFlags, map[string]string{
47+
"-cassandra.addresses": cassandra.NetworkHTTPEndpoint(),
48+
"-cassandra.keyspace": "tests", // keyspace gets created on startup if it does not exist
49+
"-purger.enable": "true",
50+
"-deletes.store": "bigtable",
51+
})
52+
53+
// bigtable client needs to set an environment variable when connecting to an emulator.
54+
bigtableFlag := map[string]string{"BIGTABLE_EMULATOR_HOST": bigtable.NetworkHTTPEndpoint()}
55+
56+
// here we are starting and stopping table manager for each index store
57+
// this is a workaround to make table manager create tables for each config since it considers only latest schema config while creating tables
58+
for i := range storeConfigs {
59+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(buildSchemaConfigWith(storeConfigs[i:i+1]))))
60+
61+
tableManager := e2ecortex.NewTableManager("table-manager", mergeFlags(flags, map[string]string{
62+
"-table-manager.retention-period": "2520h", // setting retention high enough
63+
"-log.level": "warn",
64+
}), "")
65+
tableManager.HTTPService.SetEnvVars(bigtableFlag)
66+
require.NoError(t, s.StartAndWaitReady(tableManager))
67+
68+
// Wait until the first table-manager sync has completed, so that we're
69+
// sure the tables have been created.
70+
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))
71+
require.NoError(t, s.Stop(tableManager))
72+
}
73+
74+
// Start rest of the Cortex components.
75+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(buildSchemaConfigWith(storeConfigs))))
76+
77+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
78+
"-ingester.retain-period": "0s", // we want to make ingester not retain any chunks in memory after they are flushed so that queries get data only from the store
79+
"-log.level": "warn",
80+
}), "")
81+
ingester.HTTPService.SetEnvVars(bigtableFlag)
82+
83+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
84+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
85+
querier.HTTPService.SetEnvVars(bigtableFlag)
86+
87+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
88+
89+
// Wait until both the distributor and querier have updated the ring.
90+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
91+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
92+
93+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
94+
require.NoError(t, err)
95+
96+
seriesToPush := []struct {
97+
name string
98+
lables []prompb.Label
99+
pushedVectors []model.Vector
100+
}{
101+
{
102+
name: "series_1",
103+
lables: []prompb.Label{{Name: "common", Value: "label"}, {Name: "distinct", Value: "label1"}},
104+
},
105+
{
106+
name: "series_2",
107+
lables: []prompb.Label{{Name: "common", Value: "label"}, {Name: "distinct", Value: "label2"}},
108+
},
109+
{
110+
name: "delete_series",
111+
lables: []prompb.Label{{Name: "common", Value: "label"}, {Name: "distinct", Value: "label3"}},
112+
},
113+
}
114+
115+
// Push some series for each day starting from oldest start time from configs until now so that we test all the Index Stores.
116+
for ts := oldestStoreStartTime; ts.Before(now); ts = ts.Add(24 * time.Hour) {
117+
for i, s := range seriesToPush {
118+
series, expectedVector := generateSeries(s.name, ts, s.lables...)
119+
120+
res, err := client.Push(series)
121+
require.NoError(t, err)
122+
require.Equal(t, 200, res.StatusCode)
123+
124+
seriesToPush[i].pushedVectors = append(seriesToPush[i].pushedVectors, expectedVector)
125+
}
126+
127+
// lets make ingester flush the chunks immediately to the store.
128+
res, err := e2e.GetRequest("http://" + ingester.HTTPEndpoint() + "/flush")
129+
require.NoError(t, err)
130+
require.Equal(t, 204, res.StatusCode)
131+
132+
// Let's wait until all chunks are flushed.
133+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingester_flush_queue_length"))
134+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingester_flush_series_in_progress"))
135+
}
136+
137+
// call flush again because chunks are sometimes still retained in memory due to chunks flush operation being async while chunk cleanup from memory is not.
138+
res, err := e2e.GetRequest("http://" + ingester.HTTPEndpoint() + "/flush")
139+
require.NoError(t, err)
140+
require.Equal(t, 204, res.StatusCode)
141+
142+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingester_memory_chunks"))
143+
144+
// start a purger
145+
purger := e2ecortex.NewPurger("purger", flags, "")
146+
purger.HTTPService.SetEnvVars(bigtableFlag)
147+
148+
require.NoError(t, s.StartAndWaitReady(purger))
149+
150+
// perform deletion for 2 days interval for each week covering all the Index Stores.
151+
deletionDuration := 2 * 24 * time.Hour
152+
deletedIntervals := intervals{}
153+
endpoint := "http://" + purger.HTTPEndpoint() + "/prometheus/api/v1/admin/tsdb/delete_series?match[]=delete_series{common=\"label\"}&start=%f&end=%f"
154+
for ts := now; ts.After(oldestStoreStartTime); ts = ts.Add(-perStoreDuration) {
155+
// expand the interval by a second on both the ends since the requests are getting aligned with pushed samples at ms precision which
156+
// sometimes does not match exactly in purger during parsing time from a string.
157+
deletedInterval := interval{e2e.TimeToMilliseconds(ts.Add(-(deletionDuration + time.Second))), e2e.TimeToMilliseconds(ts.Add(time.Second))}
158+
deletedIntervals = append(deletedIntervals, deletedInterval)
159+
160+
res, err := client.PostRequest(fmt.Sprintf(endpoint, float64(deletedInterval.start)/float64(1000), float64(deletedInterval.end)/float64(1000)), nil)
161+
require.NoError(t, err)
162+
require.Equal(t, 204, res.StatusCode)
163+
}
164+
165+
// check whether purger has received expected number of delete requests.
166+
require.NoError(t, purger.WaitSumMetrics(e2e.Equals(float64(len(deletedIntervals))), "cortex_purger_delete_requests_received_total"))
167+
168+
// stop the purger and recreate it since we load requests for deletion every hour.
169+
require.NoError(t, s.Stop(purger))
170+
171+
purger = e2ecortex.NewPurger("purger", mergeFlags(flags, map[string]string{
172+
"-purger.delete-request-cancel-period": "-1m", // we retain delete requests for a day(default)+1m, changing it to a negative value so that delete requests get picked up immediately.
173+
}), "")
174+
purger.HTTPService.SetEnvVars(bigtableFlag)
175+
require.NoError(t, s.StartAndWaitReady(purger))
176+
177+
require.NoError(t, purger.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_purger_load_pending_requests_attempts_total"},
178+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "status", "success")),
179+
e2e.WaitMissingMetrics))
180+
181+
require.NoError(t, purger.WaitSumMetricsWithOptions(e2e.Equals(float64(len(deletedIntervals))), []string{"cortex_purger_delete_requests_processed_total"},
182+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")),
183+
e2e.WaitMissingMetrics))
184+
185+
// query and verify that only delete series for delete interval are gone.
186+
for _, s := range seriesToPush {
187+
for _, expectedVector := range s.pushedVectors {
188+
result, err := client.Query(s.name, expectedVector[0].Timestamp.Time())
189+
require.NoError(t, err)
190+
191+
require.Equal(t, model.ValVector, result.Type())
192+
if s.name == "delete_series" && deletedIntervals.includes(e2e.TimeToMilliseconds(expectedVector[0].Timestamp.Time())) {
193+
require.Len(t, result.(model.Vector), 0)
194+
} else {
195+
assert.Equal(t, expectedVector, result.(model.Vector))
196+
}
197+
}
198+
}
199+
200+
// Ensure no service-specific metrics prefix is used by the wrong service.
201+
assertServiceMetricsPrefixes(t, Distributor, distributor)
202+
assertServiceMetricsPrefixes(t, Ingester, ingester)
203+
assertServiceMetricsPrefixes(t, Querier, querier)
204+
assertServiceMetricsPrefixes(t, Purger, purger)
205+
}
206+
207+
type interval struct {
208+
start, end int64
209+
}
210+
211+
type intervals []interval
212+
213+
func (i intervals) includes(ts int64) bool {
214+
for _, interval := range i {
215+
if ts < interval.start || ts > interval.end {
216+
continue
217+
}
218+
return true
219+
}
220+
return false
221+
}

integration/e2ecortex/client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"io"
910
"io/ioutil"
1011
"net/http"
1112
"net/url"
@@ -368,3 +369,15 @@ func (c *Client) DeleteAlertmanagerConfig(ctx context.Context) error {
368369

369370
return nil
370371
}
372+
373+
func (c *Client) PostRequest(url string, body io.Reader) (*http.Response, error) {
374+
req, err := http.NewRequest("POST", url, body)
375+
if err != nil {
376+
return nil, err
377+
}
378+
379+
req.Header.Set("X-Scope-OrgID", c.orgID)
380+
381+
client := &http.Client{Timeout: c.timeout}
382+
return client.Do(req)
383+
}

integration/e2ecortex/services.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,31 @@ func NewRuler(name string, flags map[string]string, image string) *CortexService
302302
grpcPort,
303303
)
304304
}
305+
306+
func NewPurger(name string, flags map[string]string, image string) *CortexService {
307+
return NewPurgerWithConfigFile(name, "", flags, image)
308+
}
309+
310+
func NewPurgerWithConfigFile(name, configFile string, flags map[string]string, image string) *CortexService {
311+
if configFile != "" {
312+
flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile)
313+
}
314+
315+
if image == "" {
316+
image = GetDefaultImage()
317+
}
318+
319+
return NewCortexService(
320+
name,
321+
image,
322+
e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{
323+
"-target": "purger",
324+
"-log.level": "warn",
325+
"-purger.object-store-type": "filesystem",
326+
"-local.chunk-directory": e2e.ContainerSharedDir,
327+
}, flags))...),
328+
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
329+
httpPort,
330+
grpcPort,
331+
)
332+
}

pkg/chunk/purger/request_handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
113113
}
114114

115115
dm.metrics.deleteRequestsReceivedTotal.WithLabelValues(userID).Inc()
116+
w.WriteHeader(http.StatusNoContent)
116117
}
117118

118119
// GetAllDeleteRequestsHandler handles get all delete requests
@@ -177,5 +178,5 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter
177178
return
178179
}
179180

180-
w.WriteHeader(http.StatusOK)
181+
w.WriteHeader(http.StatusNoContent)
181182
}

pkg/testexporter/correctness/delete_series.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (d *DeleteSeriesTest) sendDeleteRequest() (err error) {
239239
return
240240
}
241241

242-
if resp.StatusCode != 200 {
242+
if resp.StatusCode != 204 {
243243
return fmt.Errorf("unexpected status code %d", resp.StatusCode)
244244
}
245245

0 commit comments

Comments
 (0)