Skip to content

Commit ceaefc8

Browse files
authored
Implement quorum reads and merging for /v2/alerts and /v2/alerts/groups. (#4127)
* Implement quorum reads and merging for /v2/alerts and /v2/alerts/groups. Reading alert listings via /v2/alerts and /v2/alerts/groups will now read from a quorum of replicas and return a merged response. Alerts are merged by returning the union of alerts over all responses. When multiple responses contain the same alert, the alert with the most recent "updatedAt" timestamp is chosen. Groups are returned by returning the union of groups over all responses. When the same group is present in multiple responses (same receiver and labels fingerprint), the groups are merged into by merging the sets of alerts in each. Signed-off-by: Steve Simpson <[email protected]> * Explicitly require some modules used in alertmanager. Signed-off-by: Steve Simpson <[email protected]>
1 parent 050ddda commit ceaefc8

File tree

10 files changed

+558
-18
lines changed

10 files changed

+558
-18
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ require (
2020
github.com/felixge/fgprof v0.9.1
2121
github.com/fsouza/fake-gcs-server v1.7.0
2222
github.com/go-kit/kit v0.10.0
23+
github.com/go-openapi/strfmt v0.20.0
24+
github.com/go-openapi/swag v0.19.14
2325
github.com/go-redis/redis/v8 v8.2.3
2426
github.com/gocql/gocql v0.0.0-20200526081602-cd04bd7f22a7
2527
github.com/gogo/protobuf v1.3.2

integration/alertmanager_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -505,21 +505,23 @@ func TestAlertmanagerSharding(t *testing.T) {
505505
// Therefore, the alerts we posted should always be visible.
506506

507507
for _, c := range clients {
508-
list, err := c.GetAlerts(context.Background())
508+
list, err := c.GetAlertsV1(context.Background())
509509
require.NoError(t, err)
510510
assert.ElementsMatch(t, []string{"alert_1", "alert_2", "alert_3"}, alertNames(list))
511511
}
512512
}
513513

514-
// Endpoint: GET /alerts/groups
514+
// Endpoint: GET /v2/alerts
515515
{
516-
// Writes do not block for the write slowest replica, and reads do not
517-
// currently merge results from multiple replicas, so we have to wait.
518-
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
519-
e2e.Equals(float64(3*testCfg.replicationFactor)),
520-
[]string{"cortex_alertmanager_alerts_received_total"},
521-
e2e.SkipMissingMetrics))
516+
for _, c := range clients {
517+
list, err := c.GetAlertsV2(context.Background())
518+
require.NoError(t, err)
519+
assert.ElementsMatch(t, []string{"alert_1", "alert_2", "alert_3"}, alertNames(list))
520+
}
521+
}
522522

523+
// Endpoint: GET /v2/alerts/groups
524+
{
523525
for _, c := range clients {
524526
list, err := c.GetAlertGroups(context.Background())
525527
require.NoError(t, err)
@@ -535,7 +537,15 @@ func TestAlertmanagerSharding(t *testing.T) {
535537
require.Contains(t, groups, "group_2")
536538
assert.ElementsMatch(t, []string{"alert_3"}, alertNames(groups["group_2"]))
537539
}
540+
541+
// Note: /v1/alerts/groups does not exist.
538542
}
543+
544+
// Check the alerts were eventually written to every replica.
545+
require.NoError(t, alertmanagers.WaitSumMetricsWithOptions(
546+
e2e.Equals(float64(3*testCfg.replicationFactor)),
547+
[]string{"cortex_alertmanager_alerts_received_total"},
548+
e2e.SkipMissingMetrics))
539549
})
540550
}
541551
}

integration/e2ecortex/client.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ func (c *Client) SendAlertToAlermanager(ctx context.Context, alert *model.Alert)
509509
return nil
510510
}
511511

512-
func (c *Client) GetAlerts(ctx context.Context) ([]model.Alert, error) {
512+
func (c *Client) GetAlertsV1(ctx context.Context) ([]model.Alert, error) {
513513
u := c.alertmanagerClient.URL("api/prom/api/v1/alerts", nil)
514514

515515
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
@@ -547,6 +547,35 @@ func (c *Client) GetAlerts(ctx context.Context) ([]model.Alert, error) {
547547
return decoded.Data, nil
548548
}
549549

550+
func (c *Client) GetAlertsV2(ctx context.Context) ([]model.Alert, error) {
551+
u := c.alertmanagerClient.URL("api/prom/api/v2/alerts", nil)
552+
553+
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
554+
if err != nil {
555+
return nil, fmt.Errorf("error creating request: %v", err)
556+
}
557+
558+
resp, body, err := c.alertmanagerClient.Do(ctx, req)
559+
if err != nil {
560+
return nil, err
561+
}
562+
563+
if resp.StatusCode == http.StatusNotFound {
564+
return nil, ErrNotFound
565+
}
566+
567+
if resp.StatusCode/100 != 2 {
568+
return nil, fmt.Errorf("getting alerts failed with status %d and error %v", resp.StatusCode, string(body))
569+
}
570+
571+
decoded := []model.Alert{}
572+
if err := json.Unmarshal(body, &decoded); err != nil {
573+
return nil, err
574+
}
575+
576+
return decoded, nil
577+
}
578+
550579
type AlertGroup struct {
551580
Labels model.LabelSet `json:"labels"`
552581
Alerts []model.Alert `json:"alerts"`

pkg/alertmanager/distributor.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,17 @@ func (d *Distributor) isQuorumReadPath(p string) (bool, merger.Merger) {
8888
if strings.HasSuffix(p, "/v1/alerts") {
8989
return true, merger.V1Alerts{}
9090
}
91+
if strings.HasSuffix(p, "/v2/alerts") {
92+
return true, merger.V2Alerts{}
93+
}
94+
if strings.HasSuffix(p, "/v2/alerts/groups") {
95+
return true, merger.V2AlertGroups{}
96+
}
9197
return false, nil
9298
}
9399

94100
func (d *Distributor) isUnaryReadPath(p string) bool {
95-
return strings.HasSuffix(p, "/v2/alerts") ||
96-
strings.HasSuffix(p, "/alerts/groups") ||
97-
strings.HasSuffix(p, "/silences") ||
101+
return strings.HasSuffix(p, "/silences") ||
98102
strings.HasSuffix(path.Dir(p), "/silence") ||
99103
strings.HasSuffix(p, "/status") ||
100104
strings.HasSuffix(p, "/receivers")

pkg/alertmanager/distributor_test.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,34 @@ func TestDistributor_DistributeRequest(t *testing.T) {
8181
route: "/v1/alerts",
8282
responseBody: []byte(`{"status":"success","data":[]}`),
8383
}, {
84-
name: "Read /v2/alerts is sent to only 1 AM",
84+
name: "Read /v2/alerts is sent to 3 AMs",
8585
numAM: 5,
8686
numHappyAM: 5,
8787
replicationFactor: 3,
8888
isRead: true,
8989
expStatusCode: http.StatusOK,
90-
expectedTotalCalls: 1,
90+
expectedTotalCalls: 3,
9191
route: "/v2/alerts",
92+
responseBody: []byte(`[]`),
9293
}, {
93-
name: "Read /alerts/groups is sent to only 1 AM",
94+
name: "Read /v2/alerts/groups is sent to 3 AMs",
9495
numAM: 5,
9596
numHappyAM: 5,
9697
replicationFactor: 3,
9798
isRead: true,
9899
expStatusCode: http.StatusOK,
99-
expectedTotalCalls: 1,
100-
route: "/alerts/groups",
100+
expectedTotalCalls: 3,
101+
route: "/v2/alerts/groups",
102+
responseBody: []byte(`[]`),
103+
}, {
104+
name: "Read /v1/alerts/groups not supported",
105+
numAM: 5,
106+
numHappyAM: 5,
107+
replicationFactor: 3,
108+
expStatusCode: http.StatusNotFound,
109+
expectedTotalCalls: 0,
110+
headersNotPreserved: true,
111+
route: "/v1/alerts/groups",
101112
}, {
102113
name: "Write /alerts/groups not supported",
103114
numAM: 5,
@@ -256,7 +267,7 @@ func TestDistributor_DistributeRequest(t *testing.T) {
256267
func TestDistributor_IsPathSupported(t *testing.T) {
257268
supported := map[string]bool{
258269
"/alertmanager/api/v1/alerts": true,
259-
"/alertmanager/api/v1/alerts/groups": true,
270+
"/alertmanager/api/v1/alerts/groups": false,
260271
"/alertmanager/api/v1/silences": true,
261272
"/alertmanager/api/v1/silence/id": true,
262273
"/alertmanager/api/v1/silence/anything": true,
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package merger
2+
3+
import (
4+
"errors"
5+
"sort"
6+
7+
"github.com/go-openapi/swag"
8+
v2 "github.com/prometheus/alertmanager/api/v2"
9+
v2_models "github.com/prometheus/alertmanager/api/v2/models"
10+
prom_model "github.com/prometheus/common/model"
11+
)
12+
13+
// V2AlertGroups implements the Merger interface for GET /v2/alerts/groups. It returns
14+
// the union of alert groups over all the responses. When the same alert exists in the same
15+
// group for multiple responses, the instance of that alert with the most recent UpdatedAt
16+
// timestamp is returned in that group within the response.
17+
type V2AlertGroups struct{}
18+
19+
func (V2AlertGroups) MergeResponses(in [][]byte) ([]byte, error) {
20+
groups := make(v2_models.AlertGroups, 0)
21+
for _, body := range in {
22+
parsed := make(v2_models.AlertGroups, 0)
23+
if err := swag.ReadJSON(body, &parsed); err != nil {
24+
return nil, err
25+
}
26+
groups = append(groups, parsed...)
27+
}
28+
29+
merged, err := mergeV2AlertGroups(groups)
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
return swag.WriteJSON(merged)
35+
}
36+
37+
func mergeV2AlertGroups(in v2_models.AlertGroups) (v2_models.AlertGroups, error) {
38+
// Gather lists of all alerts for each distinct group.
39+
groups := make(map[groupKey]*v2_models.AlertGroup)
40+
for _, group := range in {
41+
if group.Receiver == nil {
42+
return nil, errors.New("unexpected nil receiver")
43+
}
44+
if group.Receiver.Name == nil {
45+
return nil, errors.New("unexpected nil receiver name")
46+
}
47+
48+
key := getGroupKey(group)
49+
if current, ok := groups[key]; ok {
50+
current.Alerts = append(current.Alerts, group.Alerts...)
51+
} else {
52+
groups[key] = group
53+
}
54+
}
55+
56+
// Merge duplicates of the same alert within each group.
57+
for _, group := range groups {
58+
var err error
59+
group.Alerts, err = mergeV2Alerts(group.Alerts)
60+
if err != nil {
61+
return nil, err
62+
}
63+
}
64+
65+
result := make(v2_models.AlertGroups, 0, len(groups))
66+
for _, group := range groups {
67+
result = append(result, group)
68+
}
69+
70+
// Mimic Alertmanager which returns groups ordered by labels and receiver.
71+
sort.Sort(byGroup(result))
72+
73+
return result, nil
74+
}
75+
76+
// getGroupKey returns an identity for a group which can be used to match it against other groups.
77+
// Only the receiver name is necessary to ensure grouping by receiver, and for the labels, we again
78+
// use the same method for matching the group labels as used internally, generating the fingerprint.
79+
func getGroupKey(group *v2_models.AlertGroup) groupKey {
80+
return groupKey{
81+
fingerprint: prom_model.LabelsToSignature(group.Labels),
82+
receiver: *group.Receiver.Name,
83+
}
84+
}
85+
86+
type groupKey struct {
87+
fingerprint uint64
88+
receiver string
89+
}
90+
91+
// byGroup implements the ordering of Alertmanager dispatch.AlertGroups on the OpenAPI type.
92+
type byGroup v2_models.AlertGroups
93+
94+
func (ag byGroup) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
95+
func (ag byGroup) Less(i, j int) bool {
96+
iLabels := v2.APILabelSetToModelLabelSet(ag[i].Labels)
97+
jLabels := v2.APILabelSetToModelLabelSet(ag[j].Labels)
98+
99+
if iLabels.Equal(jLabels) {
100+
return *ag[i].Receiver.Name < *ag[j].Receiver.Name
101+
}
102+
return iLabels.Before(jLabels)
103+
}
104+
func (ag byGroup) Len() int { return len(ag) }

0 commit comments

Comments
 (0)