|
21 | 21 | package integration
|
22 | 22 |
|
23 | 23 | import (
|
| 24 | + "encoding/json" |
24 | 25 | "errors"
|
25 | 26 | "fmt"
|
| 27 | + "io/ioutil" |
26 | 28 | "net/http"
|
27 | 29 | "sort"
|
28 | 30 | "sync"
|
29 | 31 | "testing"
|
30 |
| - "time" |
31 | 32 |
|
32 | 33 | "github.com/m3db/m3/src/aggregator/aggregator"
|
33 | 34 | "github.com/m3db/m3/src/aggregator/aggregator/handler"
|
@@ -246,14 +247,41 @@ func (ts *testServerSetup) newClient() *client {
|
246 | 247 | return newClient(ts.rawTCPAddr, ts.opts.ClientBatchSize(), connectTimeout)
|
247 | 248 | }
|
248 | 249 |
|
| 250 | +func (ts *testServerSetup) getStatusResponse(path string, response interface{}) error { |
| 251 | + resp, err := http.Get("http://" + ts.httpAddr + path) //nolint |
| 252 | + if err != nil { |
| 253 | + return err |
| 254 | + } |
| 255 | + |
| 256 | + defer resp.Body.Close() //nolint:errcheck |
| 257 | + b, err := ioutil.ReadAll(resp.Body) |
| 258 | + if err != nil { |
| 259 | + return err |
| 260 | + } |
| 261 | + if resp.StatusCode != http.StatusOK { |
| 262 | + return fmt.Errorf("got a non-200 status code: %v", resp.StatusCode) |
| 263 | + } |
| 264 | + return json.Unmarshal(b, response) |
| 265 | +} |
| 266 | + |
249 | 267 | func (ts *testServerSetup) waitUntilServerIsUp() error {
|
250 |
| - c := ts.newClient() |
251 |
| - defer c.close() |
| 268 | + isUp := func() bool { |
| 269 | + var resp httpserver.Response |
| 270 | + if err := ts.getStatusResponse(httpserver.HealthPath, &resp); err != nil { |
| 271 | + return false |
| 272 | + } |
252 | 273 |
|
253 |
| - serverIsUp := func() bool { return c.testConnection() } |
254 |
| - if waitUntil(serverIsUp, ts.opts.ServerStateChangeTimeout()) { |
| 274 | + if resp.State == "OK" { |
| 275 | + return true |
| 276 | + } |
| 277 | + |
| 278 | + return false |
| 279 | + } |
| 280 | + |
| 281 | + if waitUntil(isUp, ts.opts.ServerStateChangeTimeout()) { |
255 | 282 | return nil
|
256 | 283 | }
|
| 284 | + |
257 | 285 | return errServerStartTimedOut
|
258 | 286 | }
|
259 | 287 |
|
@@ -302,20 +330,22 @@ func (ts *testServerSetup) startServer() error {
|
302 | 330 |
|
303 | 331 | func (ts *testServerSetup) waitUntilLeader() error {
|
304 | 332 | isLeader := func() bool {
|
305 |
| - leader, err := ts.leaderService.Leader(ts.electionKey) |
306 |
| - if err != nil { |
| 333 | + var resp httpserver.StatusResponse |
| 334 | + if err := ts.getStatusResponse(httpserver.StatusPath, &resp); err != nil { |
307 | 335 | return false
|
308 | 336 | }
|
309 |
| - return leader == ts.leaderValue |
| 337 | + |
| 338 | + if resp.Status.FlushStatus.ElectionState == aggregator.LeaderState { |
| 339 | + return true |
| 340 | + } |
| 341 | + return false |
310 | 342 | }
|
311 |
| - if !waitUntil(isLeader, ts.opts.ElectionStateChangeTimeout()) { |
312 |
| - return errLeaderElectionTimeout |
| 343 | + |
| 344 | + if waitUntil(isLeader, ts.opts.ElectionStateChangeTimeout()) { |
| 345 | + return nil |
313 | 346 | }
|
314 |
| - // TODO(xichen): replace the sleep here by using HTTP client to explicit |
315 |
| - // curl the server for election status. |
316 |
| - // Give the server some time to transition into leader state if needed. |
317 |
| - time.Sleep(time.Second) |
318 |
| - return nil |
| 347 | + |
| 348 | + return errLeaderElectionTimeout |
319 | 349 | }
|
320 | 350 |
|
321 | 351 | func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy {
|
|
0 commit comments