Skip to content

API endpoints for the series deletion API using block storage #4370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f5deb2e
Add endpoints for blocks series deletion API
ilangofman Jul 13, 2021
220a47e
Add unit tests and address PR comments
ilangofman Jul 14, 2021
6f7d749
change 1 function name
ilangofman Jul 14, 2021
1f89173
Remove the deletion during get requests
ilangofman Jul 15, 2021
2feb3ce
make the hashing consistent no matter the selector order
ilangofman Jul 15, 2021
342d673
fix edge case
ilangofman Jul 18, 2021
361d166
Fix feature flag
ilangofman Jul 18, 2021
f1fc186
refactor minor
ilangofman Jul 19, 2021
f9c16f6
minor refactor
ilangofman Jul 19, 2021
8f0338f
Fix lint errors
ilangofman Jul 19, 2021
a5e62f0
fix lint errors and add changelog
ilangofman Jul 19, 2021
6836cfb
Merge branch 'master' into block_deletion_endpoints
ilangofman Jul 19, 2021
9d090a3
Latest changes from master branch
ilangofman Aug 6, 2021
16729d3
update changelog
ilangofman Aug 6, 2021
4bafb4e
fix Changelog
ilangofman Aug 6, 2021
1dfaf28
Remove extra print statement
ilangofman Aug 6, 2021
08b4c1e
Minor refactor
ilangofman Aug 16, 2021
b811d2e
Add a tombstone manager to simplify the creation/getting tombstones
ilangofman Aug 16, 2021
4aa7dd3
Merge branch 'master' into block_deletion_endpoints
ilangofman Aug 17, 2021
a0524cd
remove unneeded variable
ilangofman Aug 17, 2021
06aec3d
Merge branch 'block_deletion_endpoints' of https://github.com/ilangof…
ilangofman Aug 17, 2021
3b0ec49
fix comment
ilangofman Aug 17, 2021
212f876
Address PR comments
ilangofman Aug 19, 2021
7d6963d
reverse change of combining blocks purger and tenant deletion
ilangofman Sep 20, 2021
fd11ec1
Undo change of combining tenant deletion and series deletion API files.
ilangofman Sep 20, 2021
d9c5b92
Merge branch 'master' into block_deletion_endpoints
Jul 11, 2022
9263ca6
Make changes based on PR comments
Jul 13, 2022
06cb921
fix return content type of get delete requests
Jul 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- `-memberlist.dead-node-reclaim-time`
* [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317
* [FEATURE] Query Frontend: Add `cortex_query_fetched_series_total` and `cortex_query_fetched_chunks_bytes_total` per-user counters to expose the number of series and bytes fetched as part of queries. These metrics can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #4343
* [FEATURE] Block Storage: Added Prometheus style API endpoints for series deletion. Needs to be enabled first by setting `--purger.enable` to `true`. This only handles the creating, getting and cancelling requests. Actual deletion and query time filtering will be part of future PRs. #4370
* [CHANGE] Update Go version to 1.16.6. #4362
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345
Expand Down
18 changes: 15 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,21 @@ func (a *API) RegisterChunksPurger(store *purger.DeleteStore, deleteRequestCance
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(deleteRequestHandler.CancelDeleteRequestHandler), true, "PUT", "POST")
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
a.RegisterRoute("/purger/delete_tenant", http.HandlerFunc(api.DeleteTenant), true, "POST")
a.RegisterRoute("/purger/delete_tenant_status", http.HandlerFunc(api.DeleteTenantStatus), true, "GET")
func (a *API) RegisterBlocksPurger(blocksPurger *purger.BlocksPurgerAPI, seriesDeletionEnabled bool) {

if seriesDeletionEnabled {
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "PUT", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.GetAllDeleteRequestsHandler), true, "GET")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(blocksPurger.CancelDeleteRequestHandler), true, "PUT", "POST")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "PUT", "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "GET")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(blocksPurger.CancelDeleteRequestHandler), true, "PUT", "POST")
}
// Tenant Deletion
a.RegisterRoute("/purger/delete_tenant", http.HandlerFunc(blocksPurger.DeleteTenant), true, "POST")
a.RegisterRoute("/purger/delete_tenant_status", http.HandlerFunc(blocksPurger.DeleteTenantStatus), true, "GET")
}

// RegisterRuler registers routes associated with the Ruler service.
Expand Down
325 changes: 325 additions & 0 deletions pkg/chunk/purger/blocks_purger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
package purger

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
fmt "fmt"
"net/http"
"sort"
"strconv"
strings "strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

type BlocksPurgerAPI struct {
bucketClient objstore.Bucket
logger log.Logger
cfgProvider bucket.TenantConfigProvider
deleteRequestCancelPeriod time.Duration
}

func NewBlocksPurgerAPI(storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer, cancellationPeriod time.Duration) (*BlocksPurgerAPI, error) {
bucketClient, err := createBucketClient(storageCfg, logger, reg)
if err != nil {
return nil, err
}

return newBlocksPurgerAPI(bucketClient, cfgProvider, logger, cancellationPeriod), nil
}

func newBlocksPurgerAPI(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, cancellationPeriod time.Duration) *BlocksPurgerAPI {
return &BlocksPurgerAPI{
bucketClient: bkt,
cfgProvider: cfgProvider,
logger: logger,
deleteRequestCancelPeriod: cancellationPeriod,
}
}

func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "create bucket client")
}

return bucketClient, nil
}

func (api *BlocksPurgerAPI) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

params := r.URL.Query()
match := params["match[]"]
if len(match) == 0 {
http.Error(w, "selectors not set", http.StatusBadRequest)
return
}

for i := range match {
_, err := parser.ParseMetricSelector(match[i])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}

startParam := params.Get("start")
startTime := int64(0)
if startParam != "" {
startTime, err = util.ParseTime(startParam)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}

endParam := params.Get("end")
endTime := int64(model.Now())

if endParam != "" {
endTime, err = util.ParseTime(endParam)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if endTime > int64(model.Now()) {
http.Error(w, "deletes in future not allowed", http.StatusBadRequest)
return
}
}

if startTime > endTime {
http.Error(w, "start time can't be greater than end time", http.StatusBadRequest)
return
}

requestID := getTombstoneRequestID(startTime, endTime, match)

// Since the request id is based on a hash of the parameters, there is a possibility that a tombstone could already exist for it
// if the request was previously cancelled, we need to remove the cancelled tombstone before adding the pending one
if err := cortex_tsdb.RemoveCancelledStateIfExists(ctx, api.bucketClient, userID, api.cfgProvider, requestID); err != nil {
level.Error(util_log.Logger).Log("msg", "removing cancelled tombstone state if it exists", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need to be careful to just propagate error back to caller; we might inadvertently expose some detail that should not be exposed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to a message instead of propagating the error.

return
}

prevT, err := cortex_tsdb.GetDeleteRequestByIDForUser(ctx, api.bucketClient, api.cfgProvider, userID, requestID)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting delete request by id", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if prevT != nil {
http.Error(w, "delete request tombstone with same information already exists", http.StatusBadRequest)
return
}

curTime := time.Now().Unix() * 1000
t := cortex_tsdb.NewTombstone(userID, curTime, curTime, startTime, endTime, match, requestID, cortex_tsdb.StatePending)

if err = cortex_tsdb.WriteTombstoneFile(ctx, api.bucketClient, userID, api.cfgProvider, t); err != nil {
level.Error(util_log.Logger).Log("msg", "error adding delete request to the object store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusNoContent)
}

func (api *BlocksPurgerAPI) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
Comment on lines +140 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty common code, I think existing Cortex code uses middleware.AuthenticateUser, any reason why we can't use the same function for delete series APIs?


deleteRequests, err := cortex_tsdb.GetAllDeleteRequestsForUser(ctx, api.bucketClient, api.cfgProvider, userID)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting delete requests from the block store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if err := json.NewEncoder(w).Encode(deleteRequests); err != nil {
level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err)
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
}

w.WriteHeader(http.StatusOK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://pkg.go.dev/net/http#ResponseWriter since you called json.NewEncoder(w), which I assume calls ResponseWriter.Write(), then status code of 200 is automatically written for you together with "guessed" content-type. So you don't really need to set the status at this line.

However, I think we should set the content type and the status header ourselves for clarity.

Also, I think the if block on line 155 is missing a return statement right? I would expect http.Error() writes that the status header to 500 already, and this w.WriteHeader(http.StatusOk) may have undefined behaviour?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, should be fixed now.


}

func (api *BlocksPurgerAPI) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

params := r.URL.Query()
requestID := params.Get("request_id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do input validation here? What if request_id is not given?


deleteRequest, err := cortex_tsdb.GetDeleteRequestByIDForUser(ctx, api.bucketClient, api.cfgProvider, userID, requestID)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting delete request from the object store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if deleteRequest == nil {
http.Error(w, "could not find delete request with given id", http.StatusBadRequest)
return
}

if deleteRequest.State == cortex_tsdb.StateCancelled {
http.Error(w, "the request has already been previously deleted", http.StatusBadRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, shouldn't we return 202? Because it's already in cancelled state, which is what this API should do right?

By the way, in our error messages to the user, we gotta use "cancel" and "delete" carefully. I think Here you actually mean "the series deletion request was cancelled previously" not "the series has already been deleted" right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks.

return
}

if deleteRequest.State == cortex_tsdb.StateProcessed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a "processing" state? Or we can rely on the deleteRequestCancelPeriod to infer the "processing" state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use the deleteRequestCancelPeriod to infer the processing state. Whenever the tombstone with statePending is older than deleteRequestCancelPeriod, then it is in processing state.

http.Error(w, "deletion of request which is already processed is not allowed", http.StatusBadRequest)
return
}

currentTime := int64(time.Now().Unix() * 1000)
timeElapsed := float64(currentTime - deleteRequest.RequestCreatedAt)

if timeElapsed > float64(api.deleteRequestCancelPeriod.Milliseconds()) {
http.Error(w, fmt.Sprintf("deletion of request past the deadline of %s since its creation is not allowed", api.deleteRequestCancelPeriod.String()), http.StatusBadRequest)
return
}

// create file with the cancelled state
_, err = cortex_tsdb.UpdateTombstoneState(ctx, api.bucketClient, api.cfgProvider, deleteRequest, cortex_tsdb.StateCancelled)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error cancelling the delete request", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusNoContent)
}

// Calculates the tombstone file name based on a hash of the start time, end time and selectors
func getTombstoneRequestID(startTime int64, endTime int64, selectors []string) string {
// First make a string of the tombstone info
var b strings.Builder
b.WriteString(strconv.FormatInt(startTime, 10))
b.WriteString(",")
b.WriteString(strconv.FormatInt(endTime, 10))

sort.Strings(selectors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is hashing matchers the right way to create a "deletion ID"? For example I can have matcher like match[]=process_start_time_seconds{job="prometheus"} and match[]=process_start_time_seconds{job= "prometheus"} (there are some whitespaces in the second query). They are logically the same matcher, but will end up with different hash value.

Is this intended that create deletion request de-duplication is a best effort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it so it parses the matchers and then create the hash based on the parsed matchers. So any logically equivalent deletion requests should now result in the same hash. The example you provided should be the same hash. Also split up matchers like match[]=process_start_time_seconds&match[]={job= "prometheus"} would result in the same hash as the example you provided (Since a deletion request is the "AND" of all matchers provided).

for _, s := range selectors {
b.WriteString(",")
b.WriteString(s)
}

return getTombstoneHash(b.String())
}

func getTombstoneHash(s string) string {
data := []byte(s)
md5Bytes := md5.Sum(data)
return hex.EncodeToString(md5Bytes[:])
}

func (api *BlocksPurgerAPI) DeleteTenant(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
// When Cortex is running, it uses Auth Middleware for checking X-Scope-OrgID and injecting tenant into context.
// Auth Middleware sends http.StatusUnauthorized if X-Scope-OrgID is missing, so we do too here, for consistency.
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

err = cortex_tsdb.WriteTenantDeletionMark(r.Context(), api.bucketClient, userID, api.cfgProvider, cortex_tsdb.NewTenantDeletionMark(time.Now()))
if err != nil {
level.Error(api.logger).Log("msg", "failed to write tenant deletion mark", "user", userID, "err", err)

http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

level.Info(api.logger).Log("msg", "tenant deletion mark in blocks storage created", "user", userID)

w.WriteHeader(http.StatusOK)
}

type DeleteTenantStatusResponse struct {
TenantID string `json:"tenant_id"`
BlocksDeleted bool `json:"blocks_deleted"`
}

func (api *BlocksPurgerAPI) DeleteTenantStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

result := DeleteTenantStatusResponse{}
result.TenantID = userID
result.BlocksDeleted, err = api.isBlocksForUserDeleted(ctx, userID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

util.WriteJSONResponse(w, result)
}

func (api *BlocksPurgerAPI) isBlocksForUserDeleted(ctx context.Context, userID string) (bool, error) {
var errBlockFound = errors.New("block found")

userBucket := bucket.NewUserBucketClient(userID, api.bucketClient, api.cfgProvider)
err := userBucket.Iter(ctx, "", func(s string) error {
s = strings.TrimSuffix(s, "/")

_, err := ulid.Parse(s)
if err != nil {
// not block, keep looking
return nil
}

// Used as shortcut to stop iteration.
return errBlockFound
})

if errors.Is(err, errBlockFound) {
return false, nil
}

if err != nil {
return false, err
}

// No blocks found, all good.
return true, nil
}
Loading