Skip to content

Add context parameter to DB interface functions #1394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (am *MultitenantAlertmanager) updateConfigs(now time.Time) error {
// poll the configuration server. Not re-entrant.
func (am *MultitenantAlertmanager) poll() (map[string]configs.View, error) {
configID := am.latestConfig
cfgs, err := am.configsAPI.GetAlerts(configID)
cfgs, err := am.configsAPI.GetAlerts(context.Background(), configID)
if err != nil {
level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: configs server poll failed", "err", err)
return nil, err
Expand Down
12 changes: 6 additions & 6 deletions pkg/configs/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (a *API) getConfig(w http.ResponseWriter, r *http.Request) {
}
logger := util.WithContext(r.Context(), util.Logger)

cfg, err := a.db.GetConfig(userID)
cfg, err := a.db.GetConfig(r.Context(), userID)
if err == sql.ErrNoRows {
http.Error(w, "No configuration", http.StatusNotFound)
return
Expand Down Expand Up @@ -132,7 +132,7 @@ func (a *API) setConfig(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("Invalid templates: %v", err), http.StatusBadRequest)
return
}
if err := a.db.SetConfig(userID, cfg); err != nil {
if err := a.db.SetConfig(r.Context(), userID, cfg); err != nil {
// XXX: Untested
level.Error(logger).Log("msg", "error storing config", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -206,15 +206,15 @@ func (a *API) getConfigs(w http.ResponseWriter, r *http.Request) {
logger := util.WithContext(r.Context(), util.Logger)
rawSince := r.FormValue("since")
if rawSince == "" {
cfgs, cfgErr = a.db.GetAllConfigs()
cfgs, cfgErr = a.db.GetAllConfigs(r.Context())
} else {
since, err := strconv.ParseUint(rawSince, 10, 0)
if err != nil {
level.Info(logger).Log("msg", "invalid config ID", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
cfgs, cfgErr = a.db.GetConfigs(configs.ID(since))
cfgs, cfgErr = a.db.GetConfigs(r.Context(), configs.ID(since))
}

if cfgErr != nil {
Expand All @@ -241,7 +241,7 @@ func (a *API) deactivateConfig(w http.ResponseWriter, r *http.Request) {
}
logger := util.WithContext(r.Context(), util.Logger)

if err := a.db.DeactivateConfig(userID); err != nil {
if err := a.db.DeactivateConfig(r.Context(), userID); err != nil {
if err == sql.ErrNoRows {
level.Info(logger).Log("msg", "deactivate config - no configuration", "userID", userID)
http.Error(w, "No configuration", http.StatusNotFound)
Expand All @@ -263,7 +263,7 @@ func (a *API) restoreConfig(w http.ResponseWriter, r *http.Request) {
}
logger := util.WithContext(r.Context(), util.Logger)

if err := a.db.RestoreConfig(userID); err != nil {
if err := a.db.RestoreConfig(r.Context(), userID); err != nil {
if err == sql.ErrNoRows {
level.Info(logger).Log("msg", "restore config - no configuration", "userID", userID)
http.Error(w, "No configuration", http.StatusNotFound)
Expand Down
21 changes: 11 additions & 10 deletions pkg/configs/client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -17,10 +18,10 @@ import (
type Client interface {
// GetRules returns all Cortex configurations from a configs API server
// that have been updated after the given configs.ID was last updated.
GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error)
GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error)

// GetAlerts fetches all the alerts that have changes since since.
GetAlerts(since configs.ID) (*ConfigsResponse, error)
GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error)
}

// New creates a new ConfigClient.
Expand Down Expand Up @@ -55,7 +56,7 @@ type configsClient struct {
}

// GetRules implements ConfigClient.
func (c configsClient) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
func (c configsClient) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
Expand All @@ -76,7 +77,7 @@ func (c configsClient) GetRules(since configs.ID) (map[string]configs.VersionedR
}

// GetAlerts implements ConfigClient.
func (c configsClient) GetAlerts(since configs.ID) (*ConfigsResponse, error) {
func (c configsClient) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
Expand Down Expand Up @@ -117,22 +118,22 @@ type dbStore struct {
}

// GetRules implements ConfigClient.
func (d dbStore) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
func (d dbStore) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
if since == 0 {
return d.db.GetAllRulesConfigs()
return d.db.GetAllRulesConfigs(ctx)
}
return d.db.GetRulesConfigs(since)
return d.db.GetRulesConfigs(ctx, since)
}

// GetAlerts implements ConfigClient.
func (d dbStore) GetAlerts(since configs.ID) (*ConfigsResponse, error) {
func (d dbStore) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
var resp map[string]configs.View
var err error
if since == 0 {
resp, err = d.db.GetAllConfigs()
resp, err = d.db.GetAllConfigs(ctx)

}
resp, err = d.db.GetConfigs(since)
resp, err = d.db.GetConfigs(ctx, since)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/configs/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ type instrumented struct {
next Client
}

func (i instrumented) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
func (i instrumented) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
var cfgs map[string]configs.VersionedRulesConfig
err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var err error
cfgs, err = i.next.GetRules(since) // Warning: this will produce an incorrect result if the configID ever overflows
cfgs, err = i.next.GetRules(ctx, since) // Warning: this will produce an incorrect result if the configID ever overflows
return err
})
return cfgs, err
}

func (i instrumented) GetAlerts(since configs.ID) (*ConfigsResponse, error) {
func (i instrumented) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) {
var cfgs *ConfigsResponse
err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var err error
cfgs, err = i.next.GetAlerts(since) // Warning: this will produce an incorrect result if the configID ever overflows
cfgs, err = i.next.GetAlerts(ctx, since) // Warning: this will produce an incorrect result if the configID ever overflows
return err
})
return cfgs, err
Expand Down
21 changes: 11 additions & 10 deletions pkg/configs/db/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"context"
"flag"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -31,28 +32,28 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// DB is the interface for the database.
type DB interface {
// GetRulesConfig gets the user's ruler config
GetRulesConfig(userID string) (configs.VersionedRulesConfig, error)
GetRulesConfig(ctx context.Context, userID string) (configs.VersionedRulesConfig, error)

// SetRulesConfig does a compare-and-swap (CAS) on the user's rules config.
// `oldConfig` must precisely match the current config in order to change the config to `newConfig`.
// Will return `true` if the config was updated, `false` otherwise.
SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error)
SetRulesConfig(ctx context.Context, userID string, oldConfig, newConfig configs.RulesConfig) (bool, error)

// GetAllRulesConfigs gets all of the ruler configs
GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error)
GetAllRulesConfigs(ctx context.Context) (map[string]configs.VersionedRulesConfig, error)

// GetRulesConfigs gets all of the configs that have been added or have
// changed since the provided config.
GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error)
GetRulesConfigs(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error)

GetConfig(userID string) (configs.View, error)
SetConfig(userID string, cfg configs.Config) error
GetConfig(ctx context.Context, userID string) (configs.View, error)
SetConfig(ctx context.Context, userID string, cfg configs.Config) error

GetAllConfigs() (map[string]configs.View, error)
GetConfigs(since configs.ID) (map[string]configs.View, error)
GetAllConfigs(ctx context.Context) (map[string]configs.View, error)
GetConfigs(ctx context.Context, since configs.ID) (map[string]configs.View, error)

DeactivateConfig(userID string) error
RestoreConfig(userID string) error
DeactivateConfig(ctx context.Context, userID string) error
RestoreConfig(ctx context.Context, userID string) error

Close() error
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/configs/db/memory/memory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memory

import (
"context"
"database/sql"
"fmt"
"time"
Expand All @@ -23,7 +24,7 @@ func New(_, _ string) (*DB, error) {
}

// GetConfig gets the user's configuration.
func (d *DB) GetConfig(userID string) (configs.View, error) {
func (d *DB) GetConfig(ctx context.Context, userID string) (configs.View, error) {
c, ok := d.cfgs[userID]
if !ok {
return configs.View{}, sql.ErrNoRows
Expand All @@ -32,7 +33,7 @@ func (d *DB) GetConfig(userID string) (configs.View, error) {
}

// SetConfig sets configuration for a user.
func (d *DB) SetConfig(userID string, cfg configs.Config) error {
func (d *DB) SetConfig(ctx context.Context, userID string, cfg configs.Config) error {
if !cfg.RulesConfig.FormatVersion.IsValid() {
return fmt.Errorf("invalid rule format version %v", cfg.RulesConfig.FormatVersion)
}
Expand All @@ -42,12 +43,12 @@ func (d *DB) SetConfig(userID string, cfg configs.Config) error {
}

// GetAllConfigs gets all of the configs.
func (d *DB) GetAllConfigs() (map[string]configs.View, error) {
func (d *DB) GetAllConfigs(ctx context.Context) (map[string]configs.View, error) {
return d.cfgs, nil
}

// GetConfigs gets all of the configs that have changed recently.
func (d *DB) GetConfigs(since configs.ID) (map[string]configs.View, error) {
func (d *DB) GetConfigs(ctx context.Context, since configs.ID) (map[string]configs.View, error) {
cfgs := map[string]configs.View{}
for user, c := range d.cfgs {
if c.ID > since {
Expand All @@ -60,8 +61,8 @@ func (d *DB) GetConfigs(since configs.ID) (map[string]configs.View, error) {
// SetDeletedAtConfig sets a deletedAt for configuration
// by adding a single new row with deleted_at set
// the same as SetConfig is actually insert
func (d *DB) SetDeletedAtConfig(userID string, deletedAt time.Time) error {
cv, err := d.GetConfig(userID)
func (d *DB) SetDeletedAtConfig(ctx context.Context, userID string, deletedAt time.Time) error {
cv, err := d.GetConfig(ctx, userID)
if err != nil {
return err
}
Expand All @@ -73,13 +74,13 @@ func (d *DB) SetDeletedAtConfig(userID string, deletedAt time.Time) error {
}

// DeactivateConfig deactivates configuration for a user by creating new configuration with DeletedAt set to now
func (d *DB) DeactivateConfig(userID string) error {
return d.SetDeletedAtConfig(userID, time.Now())
func (d *DB) DeactivateConfig(ctx context.Context, userID string) error {
return d.SetDeletedAtConfig(ctx, userID, time.Now())
}

// RestoreConfig restores deactivated configuration for a user by creating new configuration with empty DeletedAt
func (d *DB) RestoreConfig(userID string) error {
return d.SetDeletedAtConfig(userID, time.Time{})
func (d *DB) RestoreConfig(ctx context.Context, userID string) error {
return d.SetDeletedAtConfig(ctx, userID, time.Time{})
}

// Close finishes using the db. Noop.
Expand All @@ -88,7 +89,7 @@ func (d *DB) Close() error {
}

// GetRulesConfig gets the rules config for a user.
func (d *DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) {
func (d *DB) GetRulesConfig(ctx context.Context, userID string) (configs.VersionedRulesConfig, error) {
c, ok := d.cfgs[userID]
if !ok {
return configs.VersionedRulesConfig{}, sql.ErrNoRows
Expand All @@ -101,22 +102,22 @@ func (d *DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error)
}

// SetRulesConfig sets the rules config for a user.
func (d *DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) {
func (d *DB) SetRulesConfig(ctx context.Context, userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) {
c, ok := d.cfgs[userID]
if !ok {
return true, d.SetConfig(userID, configs.Config{RulesConfig: newConfig})
return true, d.SetConfig(ctx, userID, configs.Config{RulesConfig: newConfig})
}
if !oldConfig.Equal(c.Config.RulesConfig) {
return false, nil
}
return true, d.SetConfig(userID, configs.Config{
return true, d.SetConfig(ctx, userID, configs.Config{
AlertmanagerConfig: c.Config.AlertmanagerConfig,
RulesConfig: newConfig,
})
}

// GetAllRulesConfigs gets the rules configs for all users that have them.
func (d *DB) GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) {
func (d *DB) GetAllRulesConfigs(ctx context.Context) (map[string]configs.VersionedRulesConfig, error) {
cfgs := map[string]configs.VersionedRulesConfig{}
for user, c := range d.cfgs {
cfg := c.GetVersionedRulesConfig()
Expand All @@ -129,7 +130,7 @@ func (d *DB) GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, erro

// GetRulesConfigs gets the rules configs that have changed
// since the given config version.
func (d *DB) GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
func (d *DB) GetRulesConfigs(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
cfgs := map[string]configs.VersionedRulesConfig{}
for user, c := range d.cfgs {
if c.ID <= since {
Expand Down
Loading