@@ -11,6 +11,7 @@ import (
11
11
12
12
"github.com/go-kit/kit/log"
13
13
"github.com/prometheus/alertmanager/api"
14
+ "github.com/prometheus/alertmanager/cluster"
14
15
"github.com/prometheus/alertmanager/config"
15
16
"github.com/prometheus/alertmanager/dispatch"
16
17
"github.com/prometheus/alertmanager/inhibit"
@@ -24,7 +25,6 @@ import (
24
25
"github.com/prometheus/client_golang/prometheus"
25
26
"github.com/prometheus/common/route"
26
27
"github.com/prometheus/prometheus/pkg/labels"
27
- "github.com/weaveworks/mesh"
28
28
)
29
29
30
30
const notificationLogMaintenancePeriod = 15 * time .Minute
@@ -35,31 +35,33 @@ type Config struct {
35
35
// Used to persist notification logs and silences on disk.
36
36
DataDir string
37
37
Logger log.Logger
38
- MeshRouter gossipRouter
39
38
Retention time.Duration
40
39
ExternalURL * url.URL
41
40
}
42
41
43
42
// An Alertmanager manages the alerts for one user.
44
43
type Alertmanager struct {
45
- cfg * Config
46
- api * api.API
47
- logger log.Logger
48
- nflog nflog.Log
49
- silences * silence.Silences
50
- marker types.Marker
51
- alerts * mem.Alerts
52
- dispatcher * dispatch.Dispatcher
53
- inhibitor * inhibit.Inhibitor
54
- stop chan struct {}
55
- wg sync.WaitGroup
56
- router * route.Router
44
+ cfg * Config
45
+ api * api.API
46
+ logger log.Logger
47
+ nflog * nflog.Log
48
+ silences * silence.Silences
49
+ marker types.Marker
50
+ alerts * mem.Alerts
51
+ dispatcher * dispatch.Dispatcher
52
+ inhibitor * inhibit.Inhibitor
53
+ stop chan struct {}
54
+ wg sync.WaitGroup
55
+ router * route.Router
56
+ peer * cluster.Peer
57
+ peerTimeout time.Duration
57
58
}
58
59
59
60
// New creates a new Alertmanager.
60
- func New (cfg * Config ) (* Alertmanager , error ) {
61
+ func New (peer * cluster. Peer , peerTimeout time. Duration , cfg * Config ) (* Alertmanager , error ) {
61
62
am := & Alertmanager {
62
63
cfg : cfg ,
64
+ peer : peer ,
63
65
logger : log .With (cfg .Logger , "user" , cfg .UserID ),
64
66
stop : make (chan struct {}),
65
67
}
@@ -68,9 +70,6 @@ func New(cfg *Config) (*Alertmanager, error) {
68
70
nflogID := fmt .Sprintf ("nflog:%s" , cfg .UserID )
69
71
var err error
70
72
am .nflog , err = nflog .New (
71
- nflog .WithMesh (func (g mesh.Gossiper ) mesh.Gossip {
72
- return cfg .MeshRouter .newGossip (nflogID , g )
73
- }),
74
73
nflog .WithRetention (cfg .Retention ),
75
74
nflog .WithSnapshot (filepath .Join (cfg .DataDir , nflogID )),
76
75
nflog .WithMaintenance (notificationLogMaintenancePeriod , am .stop , am .wg .Done ),
@@ -86,22 +85,25 @@ func New(cfg *Config) (*Alertmanager, error) {
86
85
87
86
am .marker = types .NewMarker ()
88
87
88
+ // TODO(cortex): Build a registry that can merge metrics from multiple users.
89
+ // For now, these metrics are ignored, as we can't register the same
90
+ // metric twice with a single registry.
91
+ localRegistry := prometheus .NewRegistry ()
92
+
89
93
silencesID := fmt .Sprintf ("silences:%s" , cfg .UserID )
90
94
am .silences , err = silence .New (silence.Options {
91
95
SnapshotFile : filepath .Join (cfg .DataDir , silencesID ),
92
96
Retention : cfg .Retention ,
93
97
Logger : log .With (am .logger , "component" , "silences" ),
94
- // TODO(cortex): Build a registry that can merge metrics from multiple users.
95
- // For now, these metrics are ignored, as we can't register the same
96
- // metric twice with a single registry.
97
- Metrics : prometheus .NewRegistry (),
98
- Gossip : func (g mesh.Gossiper ) mesh.Gossip {
99
- return cfg .MeshRouter .newGossip (silencesID , g )
100
- },
98
+ Metrics : localRegistry ,
101
99
})
102
100
if err != nil {
103
101
return nil , fmt .Errorf ("failed to create silences: %v" , err )
104
102
}
103
+ if peer != nil {
104
+ c := peer .AddState ("sil:" + cfg .UserID , am .silences , localRegistry )
105
+ am .silences .SetBroadcast (c .Broadcast )
106
+ }
105
107
106
108
am .wg .Add (1 )
107
109
go func () {
@@ -122,7 +124,7 @@ func New(cfg *Config) (*Alertmanager, error) {
122
124
return am .dispatcher .Groups (matchers )
123
125
},
124
126
marker .Status ,
125
- nil , // Passing a nil mesh router since we don't show mesh router information in Cortex anyway.
127
+ peer ,
126
128
log .With (am .logger , "component" , "api" ),
127
129
)
128
130
@@ -148,6 +150,14 @@ func New(cfg *Config) (*Alertmanager, error) {
148
150
return am , nil
149
151
}
150
152
153
+ // clusterWait returns a function that inspects the current peer state and returns
154
+ // a duration of one base timeout for each peer with a higher ID than ourselves.
155
+ func clusterWait (p * cluster.Peer , timeout time.Duration ) func () time.Duration {
156
+ return func () time.Duration {
157
+ return time .Duration (p .Position ()) * timeout
158
+ }
159
+ }
160
+
151
161
// ApplyConfig applies a new configuration to an Alertmanager.
152
162
func (am * Alertmanager ) ApplyConfig (conf * config.Config ) error {
153
163
var (
@@ -176,7 +186,7 @@ func (am *Alertmanager) ApplyConfig(conf *config.Config) error {
176
186
177
187
am .inhibitor = inhibit .NewInhibitor (am .alerts , conf .InhibitRules , am .marker , log .With (am .logger , "component" , "inhibitor" ))
178
188
179
- waitFunc := meshWait (am .cfg . MeshRouter , 5 * time . Second )
189
+ waitFunc := clusterWait (am .peer , am . peerTimeout )
180
190
timeoutFunc := func (d time.Duration ) time.Duration {
181
191
if d < notify .MinTimeout {
182
192
d = notify .MinTimeout
@@ -192,6 +202,7 @@ func (am *Alertmanager) ApplyConfig(conf *config.Config) error {
192
202
am .silences ,
193
203
am .nflog ,
194
204
am .marker ,
205
+ am .peer ,
195
206
log .With (am .logger , "component" , "pipeline" ),
196
207
)
197
208
am .dispatcher = dispatch .NewDispatcher (
0 commit comments