From e2f5554d804b9a974c52e0af9a88d84f14b03531 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Jul 2025 12:11:23 -0400 Subject: [PATCH] deployment watcher: refactoring testing While investigating whether the deploymentwatcher would need updates to implement system deployments, I discovered that some of the tests are racy and make assertions about called functions without waiting. Update these tests to wait where needed, and generally clean them up while we're in here. In particular I've removed the heavyweight mocking in lieu of checking the call counts and then asserting the expected state store changes. Ref: https://hashicorp.atlassian.net/browse/NMD-892 --- nomad/deploymentwatcher/deployment_watcher.go | 2 +- .../deploymentwatcher/deployments_watcher.go | 2 +- .../deployments_watcher_test.go | 1368 +++++------------ nomad/deploymentwatcher/testutil_test.go | 233 +-- 4 files changed, 441 insertions(+), 1164 deletions(-) diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 77bc1c8ebe1..e1f799802d4 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -156,7 +156,7 @@ func (w *deploymentWatcher) getDeployment() *structs.Deployment { return w.d } -func (w *deploymentWatcher) SetAllocHealth( +func (w *deploymentWatcher) setAllocHealth( req *structs.DeploymentAllocHealthRequest, resp *structs.DeploymentUpdateResponse) error { diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index 6c909111c91..d68c8a73275 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -374,7 +374,7 @@ func (w *Watcher) SetAllocHealth(req *structs.DeploymentAllocHealthRequest, resp return err } - return watcher.SetAllocHealth(req, resp) + return watcher.setAllocHealth(req, resp) } // PromoteDeployment is used to promote a deployment. If promote is false, diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index a6b880441f7..da0df26b3fe 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -5,22 +5,20 @@ package deploymentwatcher import ( "fmt" + "strings" "testing" "time" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" - "github.com/stretchr/testify/assert" - mocker "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) func testDeploymentWatcher(t *testing.T, qps float64, batchDur time.Duration) (*Watcher, *mockBackend) { @@ -36,18 +34,13 @@ func defaultTestDeploymentWatcher(t *testing.T) (*Watcher, *mockBackend) { // Tests that the watcher properly watches for deployments and reconciles them func TestWatcher_WatchDeployments(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create three jobs j1, j2, j3 := mock.Job(), mock.Job(), mock.Job() - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, j1)) - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, j2)) - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, 102, nil, j3)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, j1)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, j2)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, 102, nil, j3)) // Create three deployments all running d1, d2, d3 := mock.Deployment(), mock.Deployment(), mock.Deployment() @@ -56,14 +49,14 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3.JobID = j3.ID // Upsert the first deployment - require.Nil(m.state.UpsertDeployment(103, d1)) + must.NoError(t, m.state.UpsertDeployment(103, d1)) // Next list 3 block1 := make(chan time.Time) go func() { <-block1 - require.Nil(m.state.UpsertDeployment(104, d2)) - require.Nil(m.state.UpsertDeployment(105, d3)) + must.NoError(t, m.state.UpsertDeployment(104, d2)) + must.NoError(t, m.state.UpsertDeployment(105, d3)) }() //// Next list 3 but have one be terminal @@ -72,37 +65,28 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3terminal.Status = structs.DeploymentStatusFailed go func() { <-block2 - require.Nil(m.state.UpsertDeployment(106, d3terminal)) + must.NoError(t, m.state.UpsertDeployment(106, d3terminal)) }() w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "1 deployment returned") }) + waitForWatchers(t, w, 1) close(block1) - testutil.WaitForResult(func() (bool, error) { return 3 == watchersCount(w), nil }, - func(err error) { require.Equal(3, watchersCount(w), "3 deployment returned") }) + waitForWatchers(t, w, 3) close(block2) - testutil.WaitForResult(func() (bool, error) { return 2 == watchersCount(w), nil }, - func(err error) { require.Equal(3, watchersCount(w), "3 deployment returned - 1 terminal") }) + waitForWatchers(t, w, 2) } // Tests that calls against an unknown deployment fail func TestWatcher_UnknownDeployment(t *testing.T) { ci.Parallel(t) - assert := assert.New(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) w.SetEnabled(true, m.state) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // The expected error is that it should be an unknown deployment dID := uuid.Generate() - expected := fmt.Sprintf("unknown deployment %q", dID) + expectedErr := fmt.Sprintf("unknown deployment %q", dID) // Request setting the health against an unknown deployment req := &structs.DeploymentAllocHealthRequest{ @@ -111,9 +95,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) // Request promoting against an unknown deployment req2 := &structs.DeploymentPromoteRequest{ @@ -121,9 +103,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { All: true, } err = w.PromoteDeployment(req2, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) // Request pausing against an unknown deployment req3 := &structs.DeploymentPauseRequest{ @@ -131,172 +111,117 @@ func TestWatcher_UnknownDeployment(t *testing.T) { Pause: true, } err = w.PauseDeployment(req3, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) // Request failing against an unknown deployment req4 := &structs.DeploymentFailRequest{ DeploymentID: dID, } err = w.FailDeployment(req4, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) } // Test setting an unknown allocation's health func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { ci.Parallel(t) - assert := assert.New(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) // require that we get a call to UpsertDeploymentAllocHealth a := mock.Alloc() - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{a.ID}, - Eval: true, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set an unknown alloc healthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - if assert.NotNil(err, "Set health of unknown allocation") { - require.Contains(err.Error(), "unknown") - } - require.Equal(1, watchersCount(w), "Deployment should still be active") + must.ErrorContains(t, err, "unknown alloc") + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) } // Test setting allocation health func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{a.ID}, - Eval: true, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set the alloc healthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)) + must.NoError(t, w.SetAllocHealth(req, &resp)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, 1, d.TaskGroups["web"].HealthyAllocs) + must.Eq(t, 0, d.TaskGroups["web"].UnhealthyAllocs) } // Test setting allocation unhealthy func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Unhealthy: []string{a.ID}, - Eval: true, - DeploymentUpdate: &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - }, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set the alloc unhealthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, UnhealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") + must.NoError(t, w.SetAllocHealth(req, &resp)) + + waitForWatchers(t, w, 0) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) - m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, 0, d.TaskGroups["web"].HealthyAllocs) + must.Eq(t, 1, d.TaskGroups["web"].UnhealthyAllocs) } // Test setting allocation unhealthy and that there should be a rollback func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -309,9 +234,9 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() @@ -319,51 +244,37 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { // Modify the job to make its specification different j2.Meta["foo"] = "bar" - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Unhealthy: []string{a.ID}, - Eval: true, - DeploymentUpdate: &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - }, - JobVersion: pointer.Of(uint64(0)), - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set the alloc unhealthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, UnhealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") + must.NoError(t, w.SetAllocHealth(req, &resp)) + + waitForWatchers(t, w, 0) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) - m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, 0, d.TaskGroups["web"].HealthyAllocs) + must.Eq(t, 1, d.TaskGroups["web"].UnhealthyAllocs) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRollback( + structs.DeploymentStatusDescriptionFailedAllocations, 0), d.StatusDescription) + + m.assertCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test setting allocation unhealthy on job with identical spec and there should be no rollback func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -376,34 +287,18 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() j2.Stable = false - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Unhealthy: []string{a.ID}, - Eval: true, - DeploymentUpdate: &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - }, - JobVersion: nil, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -411,24 +306,20 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") + must.NoError(t, w.SetAllocHealth(req, &resp)) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) - m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) + waitForWatchers(t, w, 0) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunning, d.StatusDescription) + + m.assertCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test promoting a deployment func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, canary alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -444,51 +335,32 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { Healthy: pointer.Of(true), } a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentPromotion - matchConfig := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher := matchDeploymentPromoteRequest(matchConfig) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher)).Return(nil) - - // We may get an update for the desired transition. - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PromoteDeployment + // manually promote req := &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, All: true, } var resp structs.DeploymentUpdateResponse - err := w.PromoteDeployment(req, &resp) - require.Nil(err, "PromoteDeployment") - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) + must.NoError(t, w.PromoteDeployment(req, &resp)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.True(t, d.TaskGroups["web"].Promoted) } // Test promoting a deployment with unhealthy canaries func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, canary alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -501,39 +373,30 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { d.TaskGroups[a.TaskGroup].PlacedCanaries = []string{a.ID} d.TaskGroups[a.TaskGroup].DesiredCanaries = 2 a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentPromotion - matchConfig := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher := matchDeploymentPromoteRequest(matchConfig) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually promote req := &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, All: true, } var resp structs.DeploymentUpdateResponse err := w.PromoteDeployment(req, &resp) - if assert.NotNil(t, err, "PromoteDeployment") { - // 0/2 because the old version has been stopped but the canary isn't marked healthy yet - require.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`, "Should error because canary isn't marked healthy") - } + // 0/2 because the old version has been stopped but the canary isn't marked healthy yet + must.ErrorContains(t, err, `Task group "web" has 0/2 healthy allocations`, + must.Sprint("Should error because canary isn't marked healthy")) + + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) + d, err = m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.False(t, d.TaskGroups["web"].Promoted) } func TestWatcher_AutoPromoteDeployment(t *testing.T) { @@ -612,57 +475,13 @@ func TestWatcher_AutoPromoteDeployment(t *testing.T) { d.TaskGroups[ca1.TaskGroup].PlacedCanaries = []string{ca1.ID, ca2.ID} d.TaskGroups[ca1.TaskGroup].DesiredCanaries = 2 d.TaskGroups[ra1.TaskGroup].PlacedAllocs = 2 - require.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ra1, ra2}), "UpsertAllocs") - - // ============================================================= - // Support method calls - - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - - matchConfig0 := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline, - Eval: true, - } - matcher0 := matchDeploymentStatusUpdateRequest(matchConfig0) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher0)).Return(nil) - - matchConfig1 := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{ca1.ID, ca2.ID, ra1.ID, ra2.ID}, - Eval: true, - } - matcher1 := matchDeploymentAllocHealthRequest(matchConfig1) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher1)).Return(nil) - - matchConfig2 := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher2 := matchDeploymentPromoteRequest(matchConfig2) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)).Return(nil) - // ============================================================= + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ra1, ra2})) // Start the deployment w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { - w.l.RLock() - defer w.l.RUnlock() - return 1 == len(w.watchers), nil - }, - func(err error) { - w.l.RLock() - defer w.l.RUnlock() - require.Equal(t, 1, len(w.watchers), "Should have 1 deployment") - }, - ) + waitForWatchers(t, w, 1) // Mark the canaries healthy req := &structs.DeploymentAllocHealthRequest{ @@ -672,33 +491,40 @@ func TestWatcher_AutoPromoteDeployment(t *testing.T) { var resp structs.DeploymentUpdateResponse // Calls w.raft.UpdateDeploymentAllocHealth, which is implemented by StateStore in // state.UpdateDeploymentAllocHealth via a raft shim? - err := w.SetAllocHealth(req, &resp) - require.NoError(t, err) + must.NoError(t, w.SetAllocHealth(req, &resp)) - ws := memdb.NewWatchSet() - - testutil.WaitForResult( - func() (bool, error) { - ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) - d = ds[0] - return 2 == d.TaskGroups["web"].HealthyAllocs, nil - }, - func(err error) { require.NoError(t, err) }, - ) + // Wait for the promotion + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + ws := memdb.NewWatchSet() + ds, err := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) + if err != nil { + return err + } + d = ds[0] + if 2 != d.TaskGroups["web"].HealthyAllocs { + return fmt.Errorf("expected 2 healthy allocs") + } + if !d.TaskGroups["web"].Promoted { + return fmt.Errorf("expected task group to be promoted") + } + if d.Status != structs.DeploymentStatusRunning { + return fmt.Errorf("expected deployment to be running") + } + return nil - require.Equal(t, 1, len(w.watchers), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)) + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected promotion request submitted")) - require.Equal(t, "running", d.Status) - require.True(t, d.TaskGroups["web"].Promoted) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - a1, _ := m.state.AllocByID(ws, ca1.ID) - require.False(t, a1.DeploymentStatus.Canary) - require.Equal(t, "pending", a1.ClientStatus) - require.Equal(t, "run", a1.DesiredStatus) + a1, _ := m.state.AllocByID(nil, ca1.ID) + must.False(t, a1.DeploymentStatus.Canary) + must.Eq(t, "pending", a1.ClientStatus) + must.Eq(t, "run", a1.DesiredStatus) - b1, _ := m.state.AllocByID(ws, ca2.ID) - require.False(t, b1.DeploymentStatus.Canary) + b1, _ := m.state.AllocByID(nil, ca2.ID) + must.False(t, b1.DeploymentStatus.Canary) } func TestWatcher_AutoPromoteDeployment_UnhealthyCanaries(t *testing.T) { @@ -747,57 +573,13 @@ func TestWatcher_AutoPromoteDeployment_UnhealthyCanaries(t *testing.T) { d.TaskGroups[ca1.TaskGroup].PlacedCanaries = []string{ca1.ID, ca2.ID, ca3.ID} d.TaskGroups[ca1.TaskGroup].DesiredCanaries = 2 - require.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ca3}), "UpsertAllocs") - - // ============================================================= - // Support method calls - - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - - matchConfig0 := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline, - Eval: true, - } - matcher0 := matchDeploymentStatusUpdateRequest(matchConfig0) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher0)).Return(nil) - - matchConfig1 := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{ca1.ID, ca2.ID}, - Eval: true, - } - matcher1 := matchDeploymentAllocHealthRequest(matchConfig1) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher1)).Return(nil) - - matchConfig2 := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher2 := matchDeploymentPromoteRequest(matchConfig2) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)).Return(nil) - // ============================================================= + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ca3})) // Start the deployment w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { - w.l.RLock() - defer w.l.RUnlock() - return 1 == len(w.watchers), nil - }, - func(err error) { - w.l.RLock() - defer w.l.RUnlock() - require.Equal(t, 1, len(w.watchers), "Should have 1 deployment") - }, - ) + waitForWatchers(t, w, 1) // Mark only 2 canaries as healthy req := &structs.DeploymentAllocHealthRequest{ @@ -807,125 +589,105 @@ func TestWatcher_AutoPromoteDeployment_UnhealthyCanaries(t *testing.T) { var resp structs.DeploymentUpdateResponse // Calls w.raft.UpdateDeploymentAllocHealth, which is implemented by StateStore in // state.UpdateDeploymentAllocHealth via a raft shim? - err := w.SetAllocHealth(req, &resp) - require.NoError(t, err) - - ws := memdb.NewWatchSet() + must.NoError(t, w.SetAllocHealth(req, &resp)) - testutil.WaitForResult( - func() (bool, error) { - ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) - d = ds[0] - return 2 == d.TaskGroups["web"].HealthyAllocs, nil - }, - func(err error) { require.NoError(t, err) }, - ) + // Wait for the promotion + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + ws := memdb.NewWatchSet() + ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) + d = ds[0] + if 2 != d.TaskGroups["web"].HealthyAllocs { + return fmt.Errorf("expected 2 healthy allocs") + } + if !d.TaskGroups["web"].Promoted { + return fmt.Errorf("expected task group to be promoted") + } + if d.Status != structs.DeploymentStatusRunning { + return fmt.Errorf("expected deployment to be running") + } + return nil - // Verify that a promotion request was submitted. - require.Equal(t, 1, len(w.watchers), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)) + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected promotion request submitted")) - require.Equal(t, "running", d.Status) - require.True(t, d.TaskGroups["web"].Promoted) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - a1, _ := m.state.AllocByID(ws, ca1.ID) - require.False(t, a1.DeploymentStatus.Canary) - require.Equal(t, "pending", a1.ClientStatus) - require.Equal(t, "run", a1.DesiredStatus) + a1, _ := m.state.AllocByID(nil, ca1.ID) + must.False(t, a1.DeploymentStatus.Canary) + must.Eq(t, "pending", a1.ClientStatus) + must.Eq(t, "run", a1.DesiredStatus) - b1, _ := m.state.AllocByID(ws, ca2.ID) - require.False(t, b1.DeploymentStatus.Canary) + b1, _ := m.state.AllocByID(nil, ca2.ID) + must.False(t, b1.DeploymentStatus.Canary) } // Test pausing a deployment that is running func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusPaused, - StatusDescription: structs.DeploymentStatusDescriptionPaused, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually pause req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: true, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) + + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusPaused, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionPaused, d.StatusDescription) } // Test pausing a deployment that is paused func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusPaused, - StatusDescription: structs.DeploymentStatusDescriptionPaused, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually pause req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: true, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusPaused, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionPaused, d.StatusDescription) } // Test unpausing a deployment that is paused func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment @@ -933,120 +695,93 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusRunning, - StatusDescription: structs.DeploymentStatusDescriptionRunning, - Eval: true, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually unpause req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: false, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunning, d.StatusDescription) } // Test unpausing a deployment that is running func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusRunning, - StatusDescription: structs.DeploymentStatusDescriptionRunning, - Eval: true, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually unpause the deployment req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: false, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunning, d.StatusDescription) } // Test failing a deployment that is running func TestWatcher_FailDeployment_Running(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedByUser, - Eval: true, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually fail the deployment req := &structs.DeploymentFailRequest{ DeploymentID: d.ID, } var resp structs.DeploymentUpdateResponse - err := w.FailDeployment(req, &resp) - require.Nil(err, "FailDeployment") + must.NoError(t, w.FailDeployment(req, &resp)) + + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionFailedByUser, d.StatusDescription) } // Tests that the watcher properly watches for allocation changes and takes the // proper actions func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1061,64 +796,32 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Meta["foo"] = "bar" j2.Stable = false - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we will get a update allocation call only once. This will - // verify that the watcher is batching allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - - // require that we get a call to UpsertDeploymentStatusUpdate - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0), - JobVersion: pointer.Of(uint64(0)), - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the allocs health to healthy which should create an evaluation - for i := 0; i < 5; i++ { + for range 5 { req := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req)) } - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) // Update the allocs health to unhealthy which should create a job rollback, // status update and eval @@ -1128,45 +831,37 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2)) + waitForEvals(t, m.state, j, 1) - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err + // Wait for the deployment to be failed + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + d, _ := m.state.DeploymentByID(nil, d.ID) + if d.Status != structs.DeploymentStatusFailed { + return fmt.Errorf("bad status %q", d.Status) } - - if l := len(evals); l != 2 { - return false, fmt.Errorf("Got %d evals; want 1", l) + if !strings.Contains(d.StatusDescription, + structs.DeploymentStatusDescriptionFailedAllocations) { + return fmt.Errorf("bad status description %q", d.StatusDescription) } + return nil + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected deployment to be failed")) - return true, nil - }, func(err error) { - t.Fatal(err) - }) - - m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) - - // After we upsert the job version will go to 2. So use this to require the - // original call happened. - c2 := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0), - JobVersion: pointer.Of(uint64(2)), - Eval: true, - } - m3 := matchDeploymentStatusUpdateRequest(c2) - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(m3)) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) + waitForWatchers(t, w, 0) + // verify that the watcher is batching allocation changes + m.assertCalls(t, "UpdateAllocDesiredTransition", 1) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRollback( + structs.DeploymentStatusDescriptionFailedAllocations, 0), d.StatusDescription) } func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1183,23 +878,12 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { a.CreateTime = now.UnixNano() a.ModifyTime = now.UnixNano() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentStatusUpdate - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline, - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy and require that nothing happens. a2 := a.Copy() @@ -1207,48 +891,25 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { Healthy: pointer.Of(false), Timestamp: now, } - require.Nil(m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 100, []*structs.Allocation{a2})) + must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 100, []*structs.Allocation{a2})) // Wait for the deployment to be failed - testutil.WaitForResult(func() (bool, error) { - d, err := m.state.DeploymentByID(nil, d.ID) - if err != nil { - return false, err - } - - return d.Status == structs.DeploymentStatusFailed, fmt.Errorf("bad status %q", d.Status) - }, func(err error) { - t.Fatal(err) - }) - - // require there are is only one evaluation - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + must.Wait(t, wait.InitialSuccess(wait.BoolFunc(func() bool { + d, _ := m.state.DeploymentByID(nil, d.ID) + return d.Status == structs.DeploymentStatusFailed && + d.StatusDescription == structs.DeploymentStatusDescriptionProgressDeadline + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected deployment to be failed")) + + waitForEvals(t, m.state, j, 1) } // Test that progress deadline handling works when there are multiple groups func TestDeploymentWatcher_ProgressCutoff(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Count = 1 @@ -1278,83 +939,73 @@ func TestDeploymentWatcher_ProgressCutoff(t *testing.T) { a2.ModifyTime = now.UnixNano() a2.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a, a2}), "UpsertAllocs") - - // We may get an update for the desired transition. - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a, a2})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) watcher, err := w.getOrCreateWatcher(d.ID) - require.NoError(err) - require.NotNil(watcher) + must.NoError(t, err) + must.NotNil(t, watcher) d1, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) + must.NoError(t, err) done := watcher.doneGroups(d1) - require.Contains(done, "web") - require.False(done["web"]) - require.Contains(done, "foo") - require.False(done["foo"]) + must.MapContainsKey(t, done, "web") + must.False(t, done["web"]) + must.MapContainsKey(t, done, "foo") + must.False(t, done["foo"]) cutoff1 := watcher.getDeploymentProgressCutoff(d1) - require.False(cutoff1.IsZero()) + must.False(t, cutoff1.IsZero()) // Update the first allocation to be healthy a3 := a.Copy() a3.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a3}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a3})) // Get the updated deployment d2, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) + must.NoError(t, err) done = watcher.doneGroups(d2) - require.Contains(done, "web") - require.True(done["web"]) - require.Contains(done, "foo") - require.False(done["foo"]) + must.MapContainsKey(t, done, "web") + must.True(t, done["web"]) + must.MapContainsKey(t, done, "foo") + must.False(t, done["foo"]) cutoff2 := watcher.getDeploymentProgressCutoff(d2) - require.False(cutoff2.IsZero()) - require.True(cutoff1.UnixNano() < cutoff2.UnixNano()) + must.False(t, cutoff2.IsZero()) + must.True(t, cutoff1.UnixNano() < cutoff2.UnixNano()) // Update the second allocation to be healthy a4 := a2.Copy() a4.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a4}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a4})) // Get the updated deployment d3, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) + must.NoError(t, err) done = watcher.doneGroups(d3) - require.Contains(done, "web") - require.True(done["web"]) - require.Contains(done, "foo") - require.True(done["foo"]) + must.MapContainsKey(t, done, "web") + must.True(t, done["web"]) + must.MapContainsKey(t, done, "foo") + must.True(t, done["foo"]) cutoff3 := watcher.getDeploymentProgressCutoff(d2) - require.True(cutoff3.IsZero()) + must.True(t, cutoff3.IsZero()) } // Test that we will allow the progress deadline to be reached when the canaries // are healthy but we haven't promoted func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -1372,18 +1023,12 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { a.CreateTime = now.UnixNano() a.ModifyTime = now.UnixNano() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we will get a createEvaluation call only once. This will - // verify that the watcher is batching allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy and require that nothing happens. a2 := a.Copy() @@ -1391,52 +1036,32 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { Healthy: pointer.Of(true), Timestamp: now, } - require.Nil(m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) + must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the deployment to cross the deadline dout, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.NotNil(dout) + must.NoError(t, err) + must.NotNil(t, dout) state := dout.TaskGroups["web"] - require.NotNil(state) + must.NotNil(t, state) time.Sleep(state.RequireProgressBy.Add(time.Second).Sub(now)) // Require the deployment is still running dout, err = m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.NotNil(dout) - require.Equal(structs.DeploymentStatusRunning, dout.Status) - require.Equal(structs.DeploymentStatusDescriptionRunningNeedsPromotion, dout.StatusDescription) - - // require there are is only one evaluation - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } + must.NoError(t, err) + must.NotNil(t, dout) + must.Eq(t, structs.DeploymentStatusRunning, dout.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunningNeedsPromotion, dout.StatusDescription) - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) } // Test that a promoted deployment with alloc healthy updates create // evals to move the deployment forward func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Count = 2 @@ -1464,16 +1089,12 @@ func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { Healthy: pointer.Of(true), Timestamp: now, } - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) - - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Twice() + waitForWatchers(t, w, 1) // Create another alloc a2 := a.Copy() @@ -1486,49 +1107,29 @@ func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { Timestamp: now, } d.TaskGroups["web"].RequireProgressBy = time.Now().Add(2 * time.Second) - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) // Wait until batch eval period passes before updating another alloc time.Sleep(1 * time.Second) - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the deployment to cross the deadline dout, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.NotNil(dout) + must.NoError(t, err) + must.NotNil(t, dout) state := dout.TaskGroups["web"] - require.NotNil(state) + must.NotNil(t, state) time.Sleep(state.RequireProgressBy.Add(time.Second).Sub(now)) - // There should be two evals - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 2 { - return false, fmt.Errorf("Got %d evals; want 2", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 2) } func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { ci.Parallel(t) - require := require.New(t) mtype := structs.MsgTypeTestSetup w, m := defaultTestDeploymentWatcher(t) w.SetEnabled(true, m.state) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - progressTimeout := time.Millisecond * 1000 j := mock.Job() j.TaskGroups[0].Name = "group1" @@ -1564,21 +1165,8 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { }, } - require.NoError(m.state.UpsertJob(mtype, m.nextIndex(), nil, j)) - require.NoError(m.state.UpsertDeployment(m.nextIndex(), d)) - - // require that we get a call to UpsertDeploymentPromotion - matchConfig := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher := matchDeploymentPromoteRequest(matchConfig) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher)).Return(nil) - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil) + must.NoError(t, m.state.UpsertJob(mtype, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) // create canaries @@ -1599,8 +1187,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { canary2.ModifyTime = now.UnixNano() allocs := []*structs.Allocation{canary1, canary2} - err := m.state.UpsertAllocs(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, m.state.UpsertAllocs(mtype, m.nextIndex(), allocs)) // 2nd group's canary becomes healthy @@ -1615,8 +1202,8 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { } allocs = []*structs.Allocation{canary2} - err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) - require.NoError(err) + err := m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) + must.NoError(t, err) // wait for long enough to ensure we read deployment update channel // this sleep creates the race condition associated with #7058 @@ -1635,7 +1222,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { allocs = []*structs.Allocation{canary1} err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, err) // ensure progress_deadline has definitely expired time.Sleep(progressTimeout) @@ -1647,7 +1234,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { All: true, } err = w.PromoteDeployment(req, &structs.DeploymentUpdateResponse{}) - require.NoError(err) + must.NoError(t, err) // wait for long enough to ensure we read deployment update channel time.Sleep(50 * time.Millisecond) @@ -1674,7 +1261,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { allocs = []*structs.Allocation{alloc1a, alloc1b} err = m.state.UpsertAllocs(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, err) // allocs become healthy @@ -1700,7 +1287,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { allocs = []*structs.Allocation{alloc1a, alloc1b} err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, err) // ensure any progress deadline has expired time.Sleep(progressTimeout) @@ -1708,32 +1295,27 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { // without a scheduler running we'll never mark the deployment as // successful, so test that healthy == desired and that we haven't failed deployment, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.Equal(structs.DeploymentStatusRunning, deployment.Status) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, deployment.Status) group1 := deployment.TaskGroups["group1"] - - require.Equal(group1.DesiredTotal, group1.HealthyAllocs, "not enough healthy") - require.Equal(group1.DesiredTotal, group1.PlacedAllocs, "not enough placed") - require.Equal(0, group1.UnhealthyAllocs) + must.Eq(t, group1.DesiredTotal, group1.HealthyAllocs, must.Sprint("not enough healthy")) + must.Eq(t, group1.DesiredTotal, group1.PlacedAllocs, must.Sprint("not enough placed")) + must.Eq(t, 0, group1.UnhealthyAllocs) + must.True(t, group1.Promoted) group2 := deployment.TaskGroups["group2"] - require.Equal(group2.DesiredTotal, group2.HealthyAllocs, "not enough healthy") - require.Equal(group2.DesiredTotal, group2.PlacedAllocs, "not enough placed") - require.Equal(0, group2.UnhealthyAllocs) + must.Eq(t, group2.DesiredTotal, group2.HealthyAllocs, must.Sprint("not enough healthy")) + must.Eq(t, group2.DesiredTotal, group2.PlacedAllocs, must.Sprint("not enough placed")) + must.Eq(t, 0, group2.UnhealthyAllocs) } // Test scenario where deployment initially has no progress deadline // After the deployment is updated, a failed alloc's DesiredTransition should be set func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -1743,26 +1325,21 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) a := mock.Alloc() a.CreateTime = time.Now().UnixNano() a.DeploymentID = d.ID - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) d.TaskGroups["web"].ProgressDeadline = 500 * time.Millisecond // Update the deployment with a progress deadline - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // Match on DesiredTransition set to Reschedule for the failed alloc - m1 := matchUpdateAllocDesiredTransitionReschedule([]string{a.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy a2 := a.Copy() @@ -1770,20 +1347,23 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { Healthy: pointer.Of(false), Timestamp: time.Now(), } - require.Nil(m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) + must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the alloc's DesiredState to set reschedule - testutil.WaitForResult(func() (bool, error) { + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { a, err := m.state.AllocByID(nil, a.ID) if err != nil { - return false, err + return err } dt := a.DesiredTransition - shouldReschedule := dt.Reschedule != nil && *dt.Reschedule - return shouldReschedule, fmt.Errorf("Desired Transition Reschedule should be set but got %v", shouldReschedule) - }, func(err error) { - t.Fatal(err) - }) + if dt.Reschedule == nil || !*dt.Reschedule { + return fmt.Errorf("Desired Transition Reschedule should be set: %+v", dt) + } + return nil + }), + wait.Gap(10*time.Millisecond), + wait.Timeout(3*time.Second))) + } // Test that we exit before hitting the Progress Deadline when we run out of reschedule attempts @@ -1814,19 +1394,8 @@ func TestDeploymentWatcher_Watch_FailEarly(t *testing.T) { must.Nil(t, m.state.UpsertDeployment(m.nextIndex(), d), must.Sprint("UpsertDeployment")) must.Nil(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), must.Sprint("UpsertAllocs")) - // require that we get a call to UpsertDeploymentStatusUpdate - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) - w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { must.Eq(t, 1, watchersCount(w), must.Sprint("Should have 1 deployment")) }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy a2 := a.Copy() @@ -1837,43 +1406,25 @@ func TestDeploymentWatcher_Watch_FailEarly(t *testing.T) { must.Nil(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the deployment to be failed - testutil.WaitForResult(func() (bool, error) { - d, err := m.state.DeploymentByID(nil, d.ID) - if err != nil { - return false, err - } - + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + d, _ := m.state.DeploymentByID(nil, d.ID) if d.Status != structs.DeploymentStatusFailed { - return false, fmt.Errorf("bad status %q", d.Status) + return fmt.Errorf("bad status %q", d.Status) } - - return d.StatusDescription == structs.DeploymentStatusDescriptionFailedAllocations, fmt.Errorf("bad status description %q", d.StatusDescription) - }, func(err error) { - t.Fatal(err) - }) - - // require there are is only one evaluation - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) + if d.StatusDescription != structs.DeploymentStatusDescriptionFailedAllocations { + return fmt.Errorf("bad status description %q", d.StatusDescription) } + return nil + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected deployment to be failed")) - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) } // Tests that the watcher fails rollback when the spec hasn't changed func TestDeploymentWatcher_RollbackFailed(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1888,63 +1439,31 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Stable = false - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we will get a createEvaluation call only once. This will - // verify that the watcher is batching allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - - // require that we get a call to UpsertDeploymentStatusUpdate with roll back failed as the status - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionRollbackNoop(structs.DeploymentStatusDescriptionFailedAllocations, 0), - JobVersion: nil, - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the allocs health to healthy which should create an evaluation - for i := 0; i < 5; i++ { + for range 5 { req := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req)) } - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) // Update the allocs health to unhealthy which will cause attempting a rollback, // fail in that step, do status update and eval @@ -1954,42 +1473,29 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2), "UpsertDeploymentAllocHealth") - - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 2 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2)) - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 2) - m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) + // verify that the watcher is batching allocation changes + m.assertCalls(t, "UpdateAllocDesiredTransition", 1) // verify that the job version hasn't changed after upsert m.state.JobByID(nil, structs.DefaultNamespace, j.ID) - require.Equal(uint64(0), j.Version, "Expected job version 0 but got ", j.Version) + must.Eq(t, uint64(0), j.Version) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRollbackNoop( + structs.DeploymentStatusDescriptionFailedAllocations, 0), d.StatusDescription) } // Test allocation updates and evaluation creation is batched between watchers func TestWatcher_BatchAllocUpdates(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Second) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, for two deployments j1 := mock.Job() j1.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -2011,22 +1517,15 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { a2.JobID = j2.ID a2.DeploymentID = d2.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j1), "UpsertJob") - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d1), "UpsertDeployment") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d2), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") - - // require that we will get a createEvaluation call only once and it contains - // both deployments. This will verify that the watcher is batching - // allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d1.ID, d2.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j1)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d1)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d2)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a1})) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 2 == watchersCount(w), nil }, - func(err error) { require.Equal(2, watchersCount(w), "Should have 2 deployment") }) + waitForWatchers(t, w, 2) // Update the allocs health to healthy which should create an evaluation req := &structs.ApplyDeploymentAllocHealthRequest{ @@ -2035,7 +1534,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a1.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req)) req2 := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ @@ -2043,43 +1542,19 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a2.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2)) - // Wait for there to be one eval for each job - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals1, err := m.state.EvalsByJob(ws, j1.Namespace, j1.ID) - if err != nil { - return false, err - } - - evals2, err := m.state.EvalsByJob(ws, j2.Namespace, j2.ID) - if err != nil { - return false, err - } - - if l := len(evals1); l != 1 { - return false, fmt.Errorf("Got %d evals for job %v; want 1", l, j1.ID) - } + waitForEvals(t, m.state, j1, 1) + waitForEvals(t, m.state, j2, 1) + waitForWatchers(t, w, 2) - if l := len(evals2); l != 1 { - return false, fmt.Errorf("Got %d evals for job 2; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) - - m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) - testutil.WaitForResult(func() (bool, error) { return 2 == watchersCount(w), nil }, - func(err error) { require.Equal(2, watchersCount(w), "Should have 2 deployment") }) + // verify that the watcher is batching allocation changes + m.assertCalls(t, "UpdateAllocDesiredTransition", 1) } func watchersCount(w *Watcher) int { - w.l.Lock() - defer w.l.Unlock() - + w.l.RLock() + defer w.l.RUnlock() return len(w.watchers) } @@ -2088,9 +1563,6 @@ func TestWatcher_PurgeDeployment(t *testing.T) { ci.Parallel(t) w, m := defaultTestDeploymentWatcher(t) - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - // Create a job and a deployment j := mock.Job() d := mock.Deployment() @@ -2098,37 +1570,33 @@ func TestWatcher_PurgeDeployment(t *testing.T) { must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusPaused, - StatusDescription: structs.DeploymentStatusDescriptionPaused, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) - w.SetEnabled(true, m.state) - must.Wait(t, wait.InitialSuccess( - wait.ErrorFunc(func() error { - if watchersCount(w) != 1 { - return fmt.Errorf("expected 1 deployment") - } - return nil - }), - wait.Attempts(100), - wait.Gap(10*time.Millisecond), - )) + waitForWatchers(t, w, 1) must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID)) + waitForWatchers(t, w, 0) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Nil(t, d) +} +func waitForWatchers(t *testing.T, w *Watcher, expect int) { + t.Helper() must.Wait(t, wait.InitialSuccess( - wait.ErrorFunc(func() error { - if watchersCount(w) != 0 { - return fmt.Errorf("expected deployment watcher to be stopped") - } - return nil - }), - wait.Attempts(500), + wait.BoolFunc(func() bool { return expect == watchersCount(w) }), + wait.Gap(10*time.Millisecond), + wait.Timeout(time.Second)), must.Sprintf("expected %d deployments", expect)) +} + +func waitForEvals(t *testing.T, store *state.StateStore, job *structs.Job, expect int) { + t.Helper() + must.Wait(t, wait.InitialSuccess(wait.BoolFunc(func() bool { + ws := memdb.NewWatchSet() + evals, _ := store.EvalsByJob(ws, job.Namespace, job.ID) + return len(evals) == expect + }), wait.Gap(10*time.Millisecond), - )) + wait.Timeout(5*time.Second), // some of these need to wait quite a while + ), must.Sprintf("expected %d evals before timeout", expect)) } diff --git a/nomad/deploymentwatcher/testutil_test.go b/nomad/deploymentwatcher/testutil_test.go index 7af7df9cafd..922a14c96c6 100644 --- a/nomad/deploymentwatcher/testutil_test.go +++ b/nomad/deploymentwatcher/testutil_test.go @@ -4,29 +4,27 @@ package deploymentwatcher import ( - "reflect" - "strings" "sync" "testing" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - mocker "github.com/stretchr/testify/mock" + "github.com/shoenig/test/must" ) type mockBackend struct { - mocker.Mock index uint64 state *state.StateStore l sync.Mutex + calls map[string]int } func newMockBackend(t *testing.T) *mockBackend { m := &mockBackend{ index: 10000, state: state.TestStateStore(t), + calls: map[string]int{}, } - m.Test(t) return m } @@ -38,235 +36,46 @@ func (m *mockBackend) nextIndex() uint64 { return i } -func (m *mockBackend) UpdateAllocDesiredTransition(u *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) { - m.Called(u) - i := m.nextIndex() - return i, m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, i, u.Allocs, u.Evals) +func (m *mockBackend) trackCall(method string) { + m.l.Lock() + defer m.l.Unlock() + m.calls[method]++ } -// matchUpdateAllocDesiredTransitions is used to match an upsert request -func matchUpdateAllocDesiredTransitions(deploymentIDs []string) func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - return func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - if len(update.Evals) != len(deploymentIDs) { - return false - } - - dmap := make(map[string]struct{}, len(deploymentIDs)) - for _, d := range deploymentIDs { - dmap[d] = struct{}{} - } - - for _, e := range update.Evals { - if _, ok := dmap[e.DeploymentID]; !ok { - return false - } - - delete(dmap, e.DeploymentID) - } - - return true - } +func (m *mockBackend) assertCalls(t *testing.T, method string, expect int) { + t.Helper() + m.l.Lock() + defer m.l.Unlock() + must.Eq(t, expect, m.calls[method], + must.Sprintf("expected %d calls for method=%s. got=%+v", expect, method, m.calls)) } -// matchUpdateAllocDesiredTransitionReschedule is used to match allocs that have their DesiredTransition set to Reschedule -func matchUpdateAllocDesiredTransitionReschedule(allocIDs []string) func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - return func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - amap := make(map[string]struct{}, len(allocIDs)) - for _, d := range allocIDs { - amap[d] = struct{}{} - } - - for allocID, dt := range update.Allocs { - if _, ok := amap[allocID]; !ok { - return false - } - if !*dt.Reschedule { - return false - } - } - - return true - } +func (m *mockBackend) UpdateAllocDesiredTransition(u *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) { + m.trackCall("UpdateAllocDesiredTransition") + i := m.nextIndex() + return i, m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, i, u.Allocs, u.Evals) } func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) { - m.Called(job) + m.trackCall("UpsertJob") i := m.nextIndex() return i, m.state.UpsertJob(structs.MsgTypeTestSetup, i, nil, job) } func (m *mockBackend) UpdateDeploymentStatus(u *structs.DeploymentStatusUpdateRequest) (uint64, error) { - m.Called(u) + m.trackCall("UpdateDeploymentStatus") i := m.nextIndex() return i, m.state.UpdateDeploymentStatus(structs.MsgTypeTestSetup, i, u) } -// matchDeploymentStatusUpdateConfig is used to configure the matching -// function -type matchDeploymentStatusUpdateConfig struct { - // DeploymentID is the expected ID - DeploymentID string - - // Status is the desired status - Status string - - // StatusDescription is the desired status description - StatusDescription string - - // JobVersion marks whether we expect a roll back job at the given version - JobVersion *uint64 - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentStatusUpdateRequest is used to match an update request -func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) func(args *structs.DeploymentStatusUpdateRequest) bool { - return func(args *structs.DeploymentStatusUpdateRequest) bool { - if args.DeploymentUpdate.DeploymentID != c.DeploymentID { - return false - } - - if args.DeploymentUpdate.Status != c.Status && args.DeploymentUpdate.StatusDescription != c.StatusDescription { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - if c.JobVersion != nil { - if args.Job == nil { - return false - } else if args.Job.Version != *c.JobVersion { - return false - } - } else if c.JobVersion == nil && args.Job != nil { - return false - } - - return true - } -} - func (m *mockBackend) UpdateDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) { - m.Called(req) + m.trackCall("UpdateDeploymentPromotion") i := m.nextIndex() return i, m.state.UpdateDeploymentPromotion(structs.MsgTypeTestSetup, i, req) } -// matchDeploymentPromoteRequestConfig is used to configure the matching -// function -type matchDeploymentPromoteRequestConfig struct { - // Promotion holds the expected promote request - Promotion *structs.DeploymentPromoteRequest - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentPromoteRequest is used to match a promote request -func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(args *structs.ApplyDeploymentPromoteRequest) bool { - return func(args *structs.ApplyDeploymentPromoteRequest) bool { - if !reflect.DeepEqual(*c.Promotion, args.DeploymentPromoteRequest) { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - return true - } -} func (m *mockBackend) UpdateDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) { - m.Called(req) + m.trackCall("UpdateDeploymentAllocHealth") i := m.nextIndex() return i, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, i, req) } - -// matchDeploymentAllocHealthRequestConfig is used to configure the matching -// function -type matchDeploymentAllocHealthRequestConfig struct { - // DeploymentID is the expected ID - DeploymentID string - - // Healthy and Unhealthy contain the expected allocation IDs that are having - // their health set - Healthy, Unhealthy []string - - // DeploymentUpdate holds the expected values of status and description. We - // don't check for exact match but string contains - DeploymentUpdate *structs.DeploymentStatusUpdate - - // JobVersion marks whether we expect a roll back job at the given version - JobVersion *uint64 - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentAllocHealthRequest is used to match an update request -func matchDeploymentAllocHealthRequest(c *matchDeploymentAllocHealthRequestConfig) func(args *structs.ApplyDeploymentAllocHealthRequest) bool { - return func(args *structs.ApplyDeploymentAllocHealthRequest) bool { - if args.DeploymentID != c.DeploymentID { - return false - } - - // Require a timestamp - if args.Timestamp.IsZero() { - return false - } - - if len(c.Healthy) != len(args.HealthyAllocationIDs) { - return false - } - if len(c.Unhealthy) != len(args.UnhealthyAllocationIDs) { - return false - } - - hmap, umap := make(map[string]struct{}, len(c.Healthy)), make(map[string]struct{}, len(c.Unhealthy)) - for _, h := range c.Healthy { - hmap[h] = struct{}{} - } - for _, u := range c.Unhealthy { - umap[u] = struct{}{} - } - - for _, h := range args.HealthyAllocationIDs { - if _, ok := hmap[h]; !ok { - return false - } - } - for _, u := range args.UnhealthyAllocationIDs { - if _, ok := umap[u]; !ok { - return false - } - } - - if c.DeploymentUpdate != nil { - if args.DeploymentUpdate == nil { - return false - } - - if !strings.Contains(args.DeploymentUpdate.Status, c.DeploymentUpdate.Status) { - return false - } - if !strings.Contains(args.DeploymentUpdate.StatusDescription, c.DeploymentUpdate.StatusDescription) { - return false - } - } else if args.DeploymentUpdate != nil { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil { - return false - } - - return true - } -}