diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 77bc1c8ebe1..e1f799802d4 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -156,7 +156,7 @@ func (w *deploymentWatcher) getDeployment() *structs.Deployment { return w.d } -func (w *deploymentWatcher) SetAllocHealth( +func (w *deploymentWatcher) setAllocHealth( req *structs.DeploymentAllocHealthRequest, resp *structs.DeploymentUpdateResponse) error { diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index 6c909111c91..d68c8a73275 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -374,7 +374,7 @@ func (w *Watcher) SetAllocHealth(req *structs.DeploymentAllocHealthRequest, resp return err } - return watcher.SetAllocHealth(req, resp) + return watcher.setAllocHealth(req, resp) } // PromoteDeployment is used to promote a deployment. If promote is false, diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index a6b880441f7..da0df26b3fe 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -5,22 +5,20 @@ package deploymentwatcher import ( "fmt" + "strings" "testing" "time" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" - "github.com/stretchr/testify/assert" - mocker "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) func testDeploymentWatcher(t *testing.T, qps float64, batchDur time.Duration) (*Watcher, *mockBackend) { @@ -36,18 +34,13 @@ func defaultTestDeploymentWatcher(t *testing.T) (*Watcher, *mockBackend) { // Tests that the watcher properly watches for deployments and reconciles them func TestWatcher_WatchDeployments(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create three jobs j1, j2, j3 := mock.Job(), mock.Job(), mock.Job() - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, j1)) - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, j2)) - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, 102, nil, j3)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, j1)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, j2)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, 102, nil, j3)) // Create three deployments all running d1, d2, d3 := mock.Deployment(), mock.Deployment(), mock.Deployment() @@ -56,14 +49,14 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3.JobID = j3.ID // Upsert the first deployment - require.Nil(m.state.UpsertDeployment(103, d1)) + must.NoError(t, m.state.UpsertDeployment(103, d1)) // Next list 3 block1 := make(chan time.Time) go func() { <-block1 - require.Nil(m.state.UpsertDeployment(104, d2)) - require.Nil(m.state.UpsertDeployment(105, d3)) + must.NoError(t, m.state.UpsertDeployment(104, d2)) + must.NoError(t, m.state.UpsertDeployment(105, d3)) }() //// Next list 3 but have one be terminal @@ -72,37 +65,28 @@ func TestWatcher_WatchDeployments(t *testing.T) { d3terminal.Status = structs.DeploymentStatusFailed go func() { <-block2 - require.Nil(m.state.UpsertDeployment(106, d3terminal)) + must.NoError(t, m.state.UpsertDeployment(106, d3terminal)) }() w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "1 deployment returned") }) + waitForWatchers(t, w, 1) close(block1) - testutil.WaitForResult(func() (bool, error) { return 3 == watchersCount(w), nil }, - func(err error) { require.Equal(3, watchersCount(w), "3 deployment returned") }) + waitForWatchers(t, w, 3) close(block2) - testutil.WaitForResult(func() (bool, error) { return 2 == watchersCount(w), nil }, - func(err error) { require.Equal(3, watchersCount(w), "3 deployment returned - 1 terminal") }) + waitForWatchers(t, w, 2) } // Tests that calls against an unknown deployment fail func TestWatcher_UnknownDeployment(t *testing.T) { ci.Parallel(t) - assert := assert.New(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) w.SetEnabled(true, m.state) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // The expected error is that it should be an unknown deployment dID := uuid.Generate() - expected := fmt.Sprintf("unknown deployment %q", dID) + expectedErr := fmt.Sprintf("unknown deployment %q", dID) // Request setting the health against an unknown deployment req := &structs.DeploymentAllocHealthRequest{ @@ -111,9 +95,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) // Request promoting against an unknown deployment req2 := &structs.DeploymentPromoteRequest{ @@ -121,9 +103,7 @@ func TestWatcher_UnknownDeployment(t *testing.T) { All: true, } err = w.PromoteDeployment(req2, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) // Request pausing against an unknown deployment req3 := &structs.DeploymentPauseRequest{ @@ -131,172 +111,117 @@ func TestWatcher_UnknownDeployment(t *testing.T) { Pause: true, } err = w.PauseDeployment(req3, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) // Request failing against an unknown deployment req4 := &structs.DeploymentFailRequest{ DeploymentID: dID, } err = w.FailDeployment(req4, &resp) - if assert.NotNil(err, "should have error for unknown deployment") { - require.Contains(err.Error(), expected) - } + must.ErrorContains(t, err, expectedErr) } // Test setting an unknown allocation's health func TestWatcher_SetAllocHealth_Unknown(t *testing.T) { ci.Parallel(t) - assert := assert.New(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) // require that we get a call to UpsertDeploymentAllocHealth a := mock.Alloc() - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{a.ID}, - Eval: true, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set an unknown alloc healthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse err := w.SetAllocHealth(req, &resp) - if assert.NotNil(err, "Set health of unknown allocation") { - require.Contains(err.Error(), "unknown") - } - require.Equal(1, watchersCount(w), "Deployment should still be active") + must.ErrorContains(t, err, "unknown alloc") + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) } // Test setting allocation health func TestWatcher_SetAllocHealth_Healthy(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{a.ID}, - Eval: true, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set the alloc healthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)) + must.NoError(t, w.SetAllocHealth(req, &resp)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, 1, d.TaskGroups["web"].HealthyAllocs) + must.Eq(t, 0, d.TaskGroups["web"].UnhealthyAllocs) } // Test setting allocation unhealthy func TestWatcher_SetAllocHealth_Unhealthy(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Unhealthy: []string{a.ID}, - Eval: true, - DeploymentUpdate: &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - }, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set the alloc unhealthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, UnhealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") + must.NoError(t, w.SetAllocHealth(req, &resp)) + + waitForWatchers(t, w, 0) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) - m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, 0, d.TaskGroups["web"].HealthyAllocs) + must.Eq(t, 1, d.TaskGroups["web"].UnhealthyAllocs) } // Test setting allocation unhealthy and that there should be a rollback func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -309,9 +234,9 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() @@ -319,51 +244,37 @@ func TestWatcher_SetAllocHealth_Unhealthy_Rollback(t *testing.T) { // Modify the job to make its specification different j2.Meta["foo"] = "bar" - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Unhealthy: []string{a.ID}, - Eval: true, - DeploymentUpdate: &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - }, - JobVersion: pointer.Of(uint64(0)), - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually set the alloc unhealthy req := &structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, UnhealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") + must.NoError(t, w.SetAllocHealth(req, &resp)) + + waitForWatchers(t, w, 0) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) - m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, 0, d.TaskGroups["web"].HealthyAllocs) + must.Eq(t, 1, d.TaskGroups["web"].UnhealthyAllocs) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRollback( + structs.DeploymentStatusDescriptionFailedAllocations, 0), d.StatusDescription) + + m.assertCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test setting allocation unhealthy on job with identical spec and there should be no rollback func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -376,34 +287,18 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() j2.Stable = false - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we get a call to UpsertDeploymentAllocHealth - matchConfig := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Unhealthy: []string{a.ID}, - Eval: true, - DeploymentUpdate: &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - }, - JobVersion: nil, - } - matcher := matchDeploymentAllocHealthRequest(matchConfig) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Call SetAllocHealth req := &structs.DeploymentAllocHealthRequest{ @@ -411,24 +306,20 @@ func TestWatcher_SetAllocHealth_Unhealthy_NoRollback(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, } var resp structs.DeploymentUpdateResponse - err := w.SetAllocHealth(req, &resp) - require.Nil(err, "SetAllocHealth") + must.NoError(t, w.SetAllocHealth(req, &resp)) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) - m.AssertNumberOfCalls(t, "UpdateDeploymentAllocHealth", 1) + waitForWatchers(t, w, 0) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunning, d.StatusDescription) + + m.assertCalls(t, "UpdateDeploymentAllocHealth", 1) } // Test promoting a deployment func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, canary alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -444,51 +335,32 @@ func TestWatcher_PromoteDeployment_HealthyCanaries(t *testing.T) { Healthy: pointer.Of(true), } a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentPromotion - matchConfig := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher := matchDeploymentPromoteRequest(matchConfig) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher)).Return(nil) - - // We may get an update for the desired transition. - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PromoteDeployment + // manually promote req := &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, All: true, } var resp structs.DeploymentUpdateResponse - err := w.PromoteDeployment(req, &resp) - require.Nil(err, "PromoteDeployment") - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) + must.NoError(t, w.PromoteDeployment(req, &resp)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.True(t, d.TaskGroups["web"].Promoted) } // Test promoting a deployment with unhealthy canaries func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, canary alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -501,39 +373,30 @@ func TestWatcher_PromoteDeployment_UnhealthyCanaries(t *testing.T) { d.TaskGroups[a.TaskGroup].PlacedCanaries = []string{a.ID} d.TaskGroups[a.TaskGroup].DesiredCanaries = 2 a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentPromotion - matchConfig := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher := matchDeploymentPromoteRequest(matchConfig) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call SetAllocHealth + // manually promote req := &structs.DeploymentPromoteRequest{ DeploymentID: d.ID, All: true, } var resp structs.DeploymentUpdateResponse err := w.PromoteDeployment(req, &resp) - if assert.NotNil(t, err, "PromoteDeployment") { - // 0/2 because the old version has been stopped but the canary isn't marked healthy yet - require.Contains(err.Error(), `Task group "web" has 0/2 healthy allocations`, "Should error because canary isn't marked healthy") - } + // 0/2 because the old version has been stopped but the canary isn't marked healthy yet + must.ErrorContains(t, err, `Task group "web" has 0/2 healthy allocations`, + must.Sprint("Should error because canary isn't marked healthy")) + + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher)) + d, err = m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.False(t, d.TaskGroups["web"].Promoted) } func TestWatcher_AutoPromoteDeployment(t *testing.T) { @@ -612,57 +475,13 @@ func TestWatcher_AutoPromoteDeployment(t *testing.T) { d.TaskGroups[ca1.TaskGroup].PlacedCanaries = []string{ca1.ID, ca2.ID} d.TaskGroups[ca1.TaskGroup].DesiredCanaries = 2 d.TaskGroups[ra1.TaskGroup].PlacedAllocs = 2 - require.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ra1, ra2}), "UpsertAllocs") - - // ============================================================= - // Support method calls - - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - - matchConfig0 := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline, - Eval: true, - } - matcher0 := matchDeploymentStatusUpdateRequest(matchConfig0) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher0)).Return(nil) - - matchConfig1 := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{ca1.ID, ca2.ID, ra1.ID, ra2.ID}, - Eval: true, - } - matcher1 := matchDeploymentAllocHealthRequest(matchConfig1) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher1)).Return(nil) - - matchConfig2 := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher2 := matchDeploymentPromoteRequest(matchConfig2) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)).Return(nil) - // ============================================================= + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ra1, ra2})) // Start the deployment w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { - w.l.RLock() - defer w.l.RUnlock() - return 1 == len(w.watchers), nil - }, - func(err error) { - w.l.RLock() - defer w.l.RUnlock() - require.Equal(t, 1, len(w.watchers), "Should have 1 deployment") - }, - ) + waitForWatchers(t, w, 1) // Mark the canaries healthy req := &structs.DeploymentAllocHealthRequest{ @@ -672,33 +491,40 @@ func TestWatcher_AutoPromoteDeployment(t *testing.T) { var resp structs.DeploymentUpdateResponse // Calls w.raft.UpdateDeploymentAllocHealth, which is implemented by StateStore in // state.UpdateDeploymentAllocHealth via a raft shim? - err := w.SetAllocHealth(req, &resp) - require.NoError(t, err) + must.NoError(t, w.SetAllocHealth(req, &resp)) - ws := memdb.NewWatchSet() - - testutil.WaitForResult( - func() (bool, error) { - ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) - d = ds[0] - return 2 == d.TaskGroups["web"].HealthyAllocs, nil - }, - func(err error) { require.NoError(t, err) }, - ) + // Wait for the promotion + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + ws := memdb.NewWatchSet() + ds, err := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) + if err != nil { + return err + } + d = ds[0] + if 2 != d.TaskGroups["web"].HealthyAllocs { + return fmt.Errorf("expected 2 healthy allocs") + } + if !d.TaskGroups["web"].Promoted { + return fmt.Errorf("expected task group to be promoted") + } + if d.Status != structs.DeploymentStatusRunning { + return fmt.Errorf("expected deployment to be running") + } + return nil - require.Equal(t, 1, len(w.watchers), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)) + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected promotion request submitted")) - require.Equal(t, "running", d.Status) - require.True(t, d.TaskGroups["web"].Promoted) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - a1, _ := m.state.AllocByID(ws, ca1.ID) - require.False(t, a1.DeploymentStatus.Canary) - require.Equal(t, "pending", a1.ClientStatus) - require.Equal(t, "run", a1.DesiredStatus) + a1, _ := m.state.AllocByID(nil, ca1.ID) + must.False(t, a1.DeploymentStatus.Canary) + must.Eq(t, "pending", a1.ClientStatus) + must.Eq(t, "run", a1.DesiredStatus) - b1, _ := m.state.AllocByID(ws, ca2.ID) - require.False(t, b1.DeploymentStatus.Canary) + b1, _ := m.state.AllocByID(nil, ca2.ID) + must.False(t, b1.DeploymentStatus.Canary) } func TestWatcher_AutoPromoteDeployment_UnhealthyCanaries(t *testing.T) { @@ -747,57 +573,13 @@ func TestWatcher_AutoPromoteDeployment_UnhealthyCanaries(t *testing.T) { d.TaskGroups[ca1.TaskGroup].PlacedCanaries = []string{ca1.ID, ca2.ID, ca3.ID} d.TaskGroups[ca1.TaskGroup].DesiredCanaries = 2 - require.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ca3}), "UpsertAllocs") - - // ============================================================= - // Support method calls - - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - - matchConfig0 := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline, - Eval: true, - } - matcher0 := matchDeploymentStatusUpdateRequest(matchConfig0) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher0)).Return(nil) - - matchConfig1 := &matchDeploymentAllocHealthRequestConfig{ - DeploymentID: d.ID, - Healthy: []string{ca1.ID, ca2.ID}, - Eval: true, - } - matcher1 := matchDeploymentAllocHealthRequest(matchConfig1) - m.On("UpdateDeploymentAllocHealth", mocker.MatchedBy(matcher1)).Return(nil) - - matchConfig2 := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher2 := matchDeploymentPromoteRequest(matchConfig2) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)).Return(nil) - // ============================================================= + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{ca1, ca2, ca3})) // Start the deployment w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { - w.l.RLock() - defer w.l.RUnlock() - return 1 == len(w.watchers), nil - }, - func(err error) { - w.l.RLock() - defer w.l.RUnlock() - require.Equal(t, 1, len(w.watchers), "Should have 1 deployment") - }, - ) + waitForWatchers(t, w, 1) // Mark only 2 canaries as healthy req := &structs.DeploymentAllocHealthRequest{ @@ -807,125 +589,105 @@ func TestWatcher_AutoPromoteDeployment_UnhealthyCanaries(t *testing.T) { var resp structs.DeploymentUpdateResponse // Calls w.raft.UpdateDeploymentAllocHealth, which is implemented by StateStore in // state.UpdateDeploymentAllocHealth via a raft shim? - err := w.SetAllocHealth(req, &resp) - require.NoError(t, err) - - ws := memdb.NewWatchSet() + must.NoError(t, w.SetAllocHealth(req, &resp)) - testutil.WaitForResult( - func() (bool, error) { - ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) - d = ds[0] - return 2 == d.TaskGroups["web"].HealthyAllocs, nil - }, - func(err error) { require.NoError(t, err) }, - ) + // Wait for the promotion + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + ws := memdb.NewWatchSet() + ds, _ := m.state.DeploymentsByJobID(ws, j.Namespace, j.ID, true) + d = ds[0] + if 2 != d.TaskGroups["web"].HealthyAllocs { + return fmt.Errorf("expected 2 healthy allocs") + } + if !d.TaskGroups["web"].Promoted { + return fmt.Errorf("expected task group to be promoted") + } + if d.Status != structs.DeploymentStatusRunning { + return fmt.Errorf("expected deployment to be running") + } + return nil - // Verify that a promotion request was submitted. - require.Equal(t, 1, len(w.watchers), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentPromotion", mocker.MatchedBy(matcher2)) + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected promotion request submitted")) - require.Equal(t, "running", d.Status) - require.True(t, d.TaskGroups["web"].Promoted) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - a1, _ := m.state.AllocByID(ws, ca1.ID) - require.False(t, a1.DeploymentStatus.Canary) - require.Equal(t, "pending", a1.ClientStatus) - require.Equal(t, "run", a1.DesiredStatus) + a1, _ := m.state.AllocByID(nil, ca1.ID) + must.False(t, a1.DeploymentStatus.Canary) + must.Eq(t, "pending", a1.ClientStatus) + must.Eq(t, "run", a1.DesiredStatus) - b1, _ := m.state.AllocByID(ws, ca2.ID) - require.False(t, b1.DeploymentStatus.Canary) + b1, _ := m.state.AllocByID(nil, ca2.ID) + must.False(t, b1.DeploymentStatus.Canary) } // Test pausing a deployment that is running func TestWatcher_PauseDeployment_Pause_Running(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusPaused, - StatusDescription: structs.DeploymentStatusDescriptionPaused, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually pause req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: true, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) + + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusPaused, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionPaused, d.StatusDescription) } // Test pausing a deployment that is paused func TestWatcher_PauseDeployment_Pause_Paused(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusPaused, - StatusDescription: structs.DeploymentStatusDescriptionPaused, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually pause req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: true, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusPaused, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionPaused, d.StatusDescription) } // Test unpausing a deployment that is paused func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment @@ -933,120 +695,93 @@ func TestWatcher_PauseDeployment_Unpause_Paused(t *testing.T) { d := mock.Deployment() d.JobID = j.ID d.Status = structs.DeploymentStatusPaused - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusRunning, - StatusDescription: structs.DeploymentStatusDescriptionRunning, - Eval: true, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually unpause req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: false, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunning, d.StatusDescription) } // Test unpausing a deployment that is running func TestWatcher_PauseDeployment_Unpause_Running(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusRunning, - StatusDescription: structs.DeploymentStatusDescriptionRunning, - Eval: true, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually unpause the deployment req := &structs.DeploymentPauseRequest{ DeploymentID: d.ID, Pause: false, } var resp structs.DeploymentUpdateResponse - err := w.PauseDeployment(req, &resp) - require.Nil(err, "PauseDeployment") + must.NoError(t, w.PauseDeployment(req, &resp)) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunning, d.StatusDescription) } // Test failing a deployment that is running func TestWatcher_FailDeployment_Running(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := defaultTestDeploymentWatcher(t) // Create a job and a deployment j := mock.Job() d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedByUser, - Eval: true, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) - // Call PauseDeployment + // manually fail the deployment req := &structs.DeploymentFailRequest{ DeploymentID: d.ID, } var resp structs.DeploymentUpdateResponse - err := w.FailDeployment(req, &resp) - require.Nil(err, "FailDeployment") + must.NoError(t, w.FailDeployment(req, &resp)) + + must.Eq(t, 1, watchersCount(w), must.Sprint("watcher should still be active")) - require.Equal(1, watchersCount(w), "Deployment should still be active") - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(matcher)) + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionFailedByUser, d.StatusDescription) } // Tests that the watcher properly watches for allocation changes and takes the // proper actions func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1061,64 +796,32 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Meta["foo"] = "bar" j2.Stable = false - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we will get a update allocation call only once. This will - // verify that the watcher is batching allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - - // require that we get a call to UpsertDeploymentStatusUpdate - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0), - JobVersion: pointer.Of(uint64(0)), - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the allocs health to healthy which should create an evaluation - for i := 0; i < 5; i++ { + for range 5 { req := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req)) } - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) // Update the allocs health to unhealthy which should create a job rollback, // status update and eval @@ -1128,45 +831,37 @@ func TestDeploymentWatcher_Watch_NoProgressDeadline(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2)) + waitForEvals(t, m.state, j, 1) - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err + // Wait for the deployment to be failed + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + d, _ := m.state.DeploymentByID(nil, d.ID) + if d.Status != structs.DeploymentStatusFailed { + return fmt.Errorf("bad status %q", d.Status) } - - if l := len(evals); l != 2 { - return false, fmt.Errorf("Got %d evals; want 1", l) + if !strings.Contains(d.StatusDescription, + structs.DeploymentStatusDescriptionFailedAllocations) { + return fmt.Errorf("bad status description %q", d.StatusDescription) } + return nil + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected deployment to be failed")) - return true, nil - }, func(err error) { - t.Fatal(err) - }) - - m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) - - // After we upsert the job version will go to 2. So use this to require the - // original call happened. - c2 := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionRollback(structs.DeploymentStatusDescriptionFailedAllocations, 0), - JobVersion: pointer.Of(uint64(2)), - Eval: true, - } - m3 := matchDeploymentStatusUpdateRequest(c2) - m.AssertCalled(t, "UpdateDeploymentStatus", mocker.MatchedBy(m3)) - testutil.WaitForResult(func() (bool, error) { return 0 == watchersCount(w), nil }, - func(err error) { require.Equal(0, watchersCount(w), "Should have no deployment") }) + waitForWatchers(t, w, 0) + // verify that the watcher is batching allocation changes + m.assertCalls(t, "UpdateAllocDesiredTransition", 1) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRollback( + structs.DeploymentStatusDescriptionFailedAllocations, 0), d.StatusDescription) } func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1183,23 +878,12 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { a.CreateTime = now.UnixNano() a.ModifyTime = now.UnixNano() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we get a call to UpsertDeploymentStatusUpdate - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionProgressDeadline, - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy and require that nothing happens. a2 := a.Copy() @@ -1207,48 +891,25 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) { Healthy: pointer.Of(false), Timestamp: now, } - require.Nil(m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 100, []*structs.Allocation{a2})) + must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 100, []*structs.Allocation{a2})) // Wait for the deployment to be failed - testutil.WaitForResult(func() (bool, error) { - d, err := m.state.DeploymentByID(nil, d.ID) - if err != nil { - return false, err - } - - return d.Status == structs.DeploymentStatusFailed, fmt.Errorf("bad status %q", d.Status) - }, func(err error) { - t.Fatal(err) - }) - - // require there are is only one evaluation - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + must.Wait(t, wait.InitialSuccess(wait.BoolFunc(func() bool { + d, _ := m.state.DeploymentByID(nil, d.ID) + return d.Status == structs.DeploymentStatusFailed && + d.StatusDescription == structs.DeploymentStatusDescriptionProgressDeadline + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected deployment to be failed")) + + waitForEvals(t, m.state, j, 1) } // Test that progress deadline handling works when there are multiple groups func TestDeploymentWatcher_ProgressCutoff(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Count = 1 @@ -1278,83 +939,73 @@ func TestDeploymentWatcher_ProgressCutoff(t *testing.T) { a2.ModifyTime = now.UnixNano() a2.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a, a2}), "UpsertAllocs") - - // We may get an update for the desired transition. - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a, a2})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) watcher, err := w.getOrCreateWatcher(d.ID) - require.NoError(err) - require.NotNil(watcher) + must.NoError(t, err) + must.NotNil(t, watcher) d1, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) + must.NoError(t, err) done := watcher.doneGroups(d1) - require.Contains(done, "web") - require.False(done["web"]) - require.Contains(done, "foo") - require.False(done["foo"]) + must.MapContainsKey(t, done, "web") + must.False(t, done["web"]) + must.MapContainsKey(t, done, "foo") + must.False(t, done["foo"]) cutoff1 := watcher.getDeploymentProgressCutoff(d1) - require.False(cutoff1.IsZero()) + must.False(t, cutoff1.IsZero()) // Update the first allocation to be healthy a3 := a.Copy() a3.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a3}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a3})) // Get the updated deployment d2, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) + must.NoError(t, err) done = watcher.doneGroups(d2) - require.Contains(done, "web") - require.True(done["web"]) - require.Contains(done, "foo") - require.False(done["foo"]) + must.MapContainsKey(t, done, "web") + must.True(t, done["web"]) + must.MapContainsKey(t, done, "foo") + must.False(t, done["foo"]) cutoff2 := watcher.getDeploymentProgressCutoff(d2) - require.False(cutoff2.IsZero()) - require.True(cutoff1.UnixNano() < cutoff2.UnixNano()) + must.False(t, cutoff2.IsZero()) + must.True(t, cutoff1.UnixNano() < cutoff2.UnixNano()) // Update the second allocation to be healthy a4 := a2.Copy() a4.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a4}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a4})) // Get the updated deployment d3, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) + must.NoError(t, err) done = watcher.doneGroups(d3) - require.Contains(done, "web") - require.True(done["web"]) - require.Contains(done, "foo") - require.True(done["foo"]) + must.MapContainsKey(t, done, "web") + must.True(t, done["web"]) + must.MapContainsKey(t, done, "foo") + must.True(t, done["foo"]) cutoff3 := watcher.getDeploymentProgressCutoff(d2) - require.True(cutoff3.IsZero()) + must.True(t, cutoff3.IsZero()) } // Test that we will allow the progress deadline to be reached when the canaries // are healthy but we haven't promoted func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -1372,18 +1023,12 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { a.CreateTime = now.UnixNano() a.ModifyTime = now.UnixNano() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") - - // require that we will get a createEvaluation call only once. This will - // verify that the watcher is batching allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy and require that nothing happens. a2 := a.Copy() @@ -1391,52 +1036,32 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) { Healthy: pointer.Of(true), Timestamp: now, } - require.Nil(m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) + must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the deployment to cross the deadline dout, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.NotNil(dout) + must.NoError(t, err) + must.NotNil(t, dout) state := dout.TaskGroups["web"] - require.NotNil(state) + must.NotNil(t, state) time.Sleep(state.RequireProgressBy.Add(time.Second).Sub(now)) // Require the deployment is still running dout, err = m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.NotNil(dout) - require.Equal(structs.DeploymentStatusRunning, dout.Status) - require.Equal(structs.DeploymentStatusDescriptionRunningNeedsPromotion, dout.StatusDescription) - - // require there are is only one evaluation - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } + must.NoError(t, err) + must.NotNil(t, dout) + must.Eq(t, structs.DeploymentStatusRunning, dout.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRunningNeedsPromotion, dout.StatusDescription) - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) } // Test that a promoted deployment with alloc healthy updates create // evals to move the deployment forward func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, and a deployment j := mock.Job() j.TaskGroups[0].Count = 2 @@ -1464,16 +1089,12 @@ func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { Healthy: pointer.Of(true), Timestamp: now, } - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) - - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Twice() + waitForWatchers(t, w, 1) // Create another alloc a2 := a.Copy() @@ -1486,49 +1107,29 @@ func TestDeploymentWatcher_PromotedCanary_UpdatedAllocs(t *testing.T) { Timestamp: now, } d.TaskGroups["web"].RequireProgressBy = time.Now().Add(2 * time.Second) - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) // Wait until batch eval period passes before updating another alloc time.Sleep(1 * time.Second) - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the deployment to cross the deadline dout, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.NotNil(dout) + must.NoError(t, err) + must.NotNil(t, dout) state := dout.TaskGroups["web"] - require.NotNil(state) + must.NotNil(t, state) time.Sleep(state.RequireProgressBy.Add(time.Second).Sub(now)) - // There should be two evals - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 2 { - return false, fmt.Errorf("Got %d evals; want 2", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 2) } func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { ci.Parallel(t) - require := require.New(t) mtype := structs.MsgTypeTestSetup w, m := defaultTestDeploymentWatcher(t) w.SetEnabled(true, m.state) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - progressTimeout := time.Millisecond * 1000 j := mock.Job() j.TaskGroups[0].Name = "group1" @@ -1564,21 +1165,8 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { }, } - require.NoError(m.state.UpsertJob(mtype, m.nextIndex(), nil, j)) - require.NoError(m.state.UpsertDeployment(m.nextIndex(), d)) - - // require that we get a call to UpsertDeploymentPromotion - matchConfig := &matchDeploymentPromoteRequestConfig{ - Promotion: &structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: true, - } - matcher := matchDeploymentPromoteRequest(matchConfig) - m.On("UpdateDeploymentPromotion", mocker.MatchedBy(matcher)).Return(nil) - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil) + must.NoError(t, m.state.UpsertJob(mtype, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) // create canaries @@ -1599,8 +1187,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { canary2.ModifyTime = now.UnixNano() allocs := []*structs.Allocation{canary1, canary2} - err := m.state.UpsertAllocs(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, m.state.UpsertAllocs(mtype, m.nextIndex(), allocs)) // 2nd group's canary becomes healthy @@ -1615,8 +1202,8 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { } allocs = []*structs.Allocation{canary2} - err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) - require.NoError(err) + err := m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) + must.NoError(t, err) // wait for long enough to ensure we read deployment update channel // this sleep creates the race condition associated with #7058 @@ -1635,7 +1222,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { allocs = []*structs.Allocation{canary1} err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, err) // ensure progress_deadline has definitely expired time.Sleep(progressTimeout) @@ -1647,7 +1234,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { All: true, } err = w.PromoteDeployment(req, &structs.DeploymentUpdateResponse{}) - require.NoError(err) + must.NoError(t, err) // wait for long enough to ensure we read deployment update channel time.Sleep(50 * time.Millisecond) @@ -1674,7 +1261,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { allocs = []*structs.Allocation{alloc1a, alloc1b} err = m.state.UpsertAllocs(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, err) // allocs become healthy @@ -1700,7 +1287,7 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { allocs = []*structs.Allocation{alloc1a, alloc1b} err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs) - require.NoError(err) + must.NoError(t, err) // ensure any progress deadline has expired time.Sleep(progressTimeout) @@ -1708,32 +1295,27 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) { // without a scheduler running we'll never mark the deployment as // successful, so test that healthy == desired and that we haven't failed deployment, err := m.state.DeploymentByID(nil, d.ID) - require.NoError(err) - require.Equal(structs.DeploymentStatusRunning, deployment.Status) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusRunning, deployment.Status) group1 := deployment.TaskGroups["group1"] - - require.Equal(group1.DesiredTotal, group1.HealthyAllocs, "not enough healthy") - require.Equal(group1.DesiredTotal, group1.PlacedAllocs, "not enough placed") - require.Equal(0, group1.UnhealthyAllocs) + must.Eq(t, group1.DesiredTotal, group1.HealthyAllocs, must.Sprint("not enough healthy")) + must.Eq(t, group1.DesiredTotal, group1.PlacedAllocs, must.Sprint("not enough placed")) + must.Eq(t, 0, group1.UnhealthyAllocs) + must.True(t, group1.Promoted) group2 := deployment.TaskGroups["group2"] - require.Equal(group2.DesiredTotal, group2.HealthyAllocs, "not enough healthy") - require.Equal(group2.DesiredTotal, group2.PlacedAllocs, "not enough placed") - require.Equal(0, group2.UnhealthyAllocs) + must.Eq(t, group2.DesiredTotal, group2.HealthyAllocs, must.Sprint("not enough healthy")) + must.Eq(t, group2.DesiredTotal, group2.PlacedAllocs, must.Sprint("not enough placed")) + must.Eq(t, 0, group2.UnhealthyAllocs) } // Test scenario where deployment initially has no progress deadline // After the deployment is updated, a failed alloc's DesiredTransition should be set func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, and a deployment j := mock.Job() j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -1743,26 +1325,21 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { d := mock.Deployment() d.JobID = j.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) a := mock.Alloc() a.CreateTime = time.Now().UnixNano() a.DeploymentID = d.ID - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) d.TaskGroups["web"].ProgressDeadline = 500 * time.Millisecond // Update the deployment with a progress deadline - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - - // Match on DesiredTransition set to Reschedule for the failed alloc - m1 := matchUpdateAllocDesiredTransitionReschedule([]string{a.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy a2 := a.Copy() @@ -1770,20 +1347,23 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) { Healthy: pointer.Of(false), Timestamp: time.Now(), } - require.Nil(m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) + must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the alloc's DesiredState to set reschedule - testutil.WaitForResult(func() (bool, error) { + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { a, err := m.state.AllocByID(nil, a.ID) if err != nil { - return false, err + return err } dt := a.DesiredTransition - shouldReschedule := dt.Reschedule != nil && *dt.Reschedule - return shouldReschedule, fmt.Errorf("Desired Transition Reschedule should be set but got %v", shouldReschedule) - }, func(err error) { - t.Fatal(err) - }) + if dt.Reschedule == nil || !*dt.Reschedule { + return fmt.Errorf("Desired Transition Reschedule should be set: %+v", dt) + } + return nil + }), + wait.Gap(10*time.Millisecond), + wait.Timeout(3*time.Second))) + } // Test that we exit before hitting the Progress Deadline when we run out of reschedule attempts @@ -1814,19 +1394,8 @@ func TestDeploymentWatcher_Watch_FailEarly(t *testing.T) { must.Nil(t, m.state.UpsertDeployment(m.nextIndex(), d), must.Sprint("UpsertDeployment")) must.Nil(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), must.Sprint("UpsertAllocs")) - // require that we get a call to UpsertDeploymentStatusUpdate - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations, - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) - w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { must.Eq(t, 1, watchersCount(w), must.Sprint("Should have 1 deployment")) }) + waitForWatchers(t, w, 1) // Update the alloc to be unhealthy a2 := a.Copy() @@ -1837,43 +1406,25 @@ func TestDeploymentWatcher_Watch_FailEarly(t *testing.T) { must.Nil(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) // Wait for the deployment to be failed - testutil.WaitForResult(func() (bool, error) { - d, err := m.state.DeploymentByID(nil, d.ID) - if err != nil { - return false, err - } - + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + d, _ := m.state.DeploymentByID(nil, d.ID) if d.Status != structs.DeploymentStatusFailed { - return false, fmt.Errorf("bad status %q", d.Status) + return fmt.Errorf("bad status %q", d.Status) } - - return d.StatusDescription == structs.DeploymentStatusDescriptionFailedAllocations, fmt.Errorf("bad status description %q", d.StatusDescription) - }, func(err error) { - t.Fatal(err) - }) - - // require there are is only one evaluation - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) + if d.StatusDescription != structs.DeploymentStatusDescriptionFailedAllocations { + return fmt.Errorf("bad status description %q", d.StatusDescription) } + return nil + }), + wait.Gap(10*time.Millisecond), wait.Timeout(time.Second)), + must.Sprint("expected deployment to be failed")) - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) } // Tests that the watcher fails rollback when the spec hasn't changed func TestDeploymentWatcher_RollbackFailed(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond) // Create a job, alloc, and a deployment @@ -1888,63 +1439,31 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { d.TaskGroups["web"].AutoRevert = true a := mock.Alloc() a.DeploymentID = d.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), "UpsertAllocs") + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a})) // Upsert the job again to get a new version j2 := j.Copy() // Modify the job to make its specification different j2.Stable = false - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob2") - - // require that we will get a createEvaluation call only once. This will - // verify that the watcher is batching allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() - - // require that we get a call to UpsertDeploymentStatusUpdate with roll back failed as the status - c := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusFailed, - StatusDescription: structs.DeploymentStatusDescriptionRollbackNoop(structs.DeploymentStatusDescriptionFailedAllocations, 0), - JobVersion: nil, - Eval: true, - } - m2 := matchDeploymentStatusUpdateRequest(c) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil }, - func(err error) { require.Equal(1, watchersCount(w), "Should have 1 deployment") }) + waitForWatchers(t, w, 1) // Update the allocs health to healthy which should create an evaluation - for i := 0; i < 5; i++ { + for range 5 { req := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ DeploymentID: d.ID, HealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req)) } - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 1 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 1) // Update the allocs health to unhealthy which will cause attempting a rollback, // fail in that step, do status update and eval @@ -1954,42 +1473,29 @@ func TestDeploymentWatcher_RollbackFailed(t *testing.T) { UnhealthyAllocationIDs: []string{a.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2), "UpsertDeploymentAllocHealth") - - // Wait for there to be one eval - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID) - if err != nil { - return false, err - } - - if l := len(evals); l != 2 { - return false, fmt.Errorf("Got %d evals; want 1", l) - } + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2)) - return true, nil - }, func(err error) { - t.Fatal(err) - }) + waitForEvals(t, m.state, j, 2) - m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) + // verify that the watcher is batching allocation changes + m.assertCalls(t, "UpdateAllocDesiredTransition", 1) // verify that the job version hasn't changed after upsert m.state.JobByID(nil, structs.DefaultNamespace, j.ID) - require.Equal(uint64(0), j.Version, "Expected job version 0 but got ", j.Version) + must.Eq(t, uint64(0), j.Version) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Eq(t, structs.DeploymentStatusFailed, d.Status) + must.Eq(t, structs.DeploymentStatusDescriptionRollbackNoop( + structs.DeploymentStatusDescriptionFailedAllocations, 0), d.StatusDescription) } // Test allocation updates and evaluation creation is batched between watchers func TestWatcher_BatchAllocUpdates(t *testing.T) { ci.Parallel(t) - require := require.New(t) w, m := testDeploymentWatcher(t, 1000.0, 1*time.Second) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(func(args *structs.DeploymentStatusUpdateRequest) bool { - return true - })).Return(nil).Maybe() - // Create a job, alloc, for two deployments j1 := mock.Job() j1.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() @@ -2011,22 +1517,15 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { a2.JobID = j2.ID a2.DeploymentID = d2.ID - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j1), "UpsertJob") - require.Nil(m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2), "UpsertJob") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d1), "UpsertDeployment") - require.Nil(m.state.UpsertDeployment(m.nextIndex(), d2), "UpsertDeployment") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a1}), "UpsertAllocs") - require.Nil(m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}), "UpsertAllocs") - - // require that we will get a createEvaluation call only once and it contains - // both deployments. This will verify that the watcher is batching - // allocation changes - m1 := matchUpdateAllocDesiredTransitions([]string{d1.ID, d2.ID}) - m.On("UpdateAllocDesiredTransition", mocker.MatchedBy(m1)).Return(nil).Once() + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j1)) + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j2)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d1)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d2)) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a1})) + must.NoError(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2})) w.SetEnabled(true, m.state) - testutil.WaitForResult(func() (bool, error) { return 2 == watchersCount(w), nil }, - func(err error) { require.Equal(2, watchersCount(w), "Should have 2 deployment") }) + waitForWatchers(t, w, 2) // Update the allocs health to healthy which should create an evaluation req := &structs.ApplyDeploymentAllocHealthRequest{ @@ -2035,7 +1534,7 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a1.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req)) req2 := &structs.ApplyDeploymentAllocHealthRequest{ DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ @@ -2043,43 +1542,19 @@ func TestWatcher_BatchAllocUpdates(t *testing.T) { HealthyAllocationIDs: []string{a2.ID}, }, } - require.Nil(m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2), "UpsertDeploymentAllocHealth") + must.NoError(t, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, m.nextIndex(), req2)) - // Wait for there to be one eval for each job - testutil.WaitForResult(func() (bool, error) { - ws := memdb.NewWatchSet() - evals1, err := m.state.EvalsByJob(ws, j1.Namespace, j1.ID) - if err != nil { - return false, err - } - - evals2, err := m.state.EvalsByJob(ws, j2.Namespace, j2.ID) - if err != nil { - return false, err - } - - if l := len(evals1); l != 1 { - return false, fmt.Errorf("Got %d evals for job %v; want 1", l, j1.ID) - } + waitForEvals(t, m.state, j1, 1) + waitForEvals(t, m.state, j2, 1) + waitForWatchers(t, w, 2) - if l := len(evals2); l != 1 { - return false, fmt.Errorf("Got %d evals for job 2; want 1", l) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) - - m.AssertCalled(t, "UpdateAllocDesiredTransition", mocker.MatchedBy(m1)) - testutil.WaitForResult(func() (bool, error) { return 2 == watchersCount(w), nil }, - func(err error) { require.Equal(2, watchersCount(w), "Should have 2 deployment") }) + // verify that the watcher is batching allocation changes + m.assertCalls(t, "UpdateAllocDesiredTransition", 1) } func watchersCount(w *Watcher) int { - w.l.Lock() - defer w.l.Unlock() - + w.l.RLock() + defer w.l.RUnlock() return len(w.watchers) } @@ -2088,9 +1563,6 @@ func TestWatcher_PurgeDeployment(t *testing.T) { ci.Parallel(t) w, m := defaultTestDeploymentWatcher(t) - // clear UpdateDeploymentStatus default expectation - m.Mock.ExpectedCalls = nil - // Create a job and a deployment j := mock.Job() d := mock.Deployment() @@ -2098,37 +1570,33 @@ func TestWatcher_PurgeDeployment(t *testing.T) { must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) - // require that we get a call to UpsertDeploymentStatusUpdate - matchConfig := &matchDeploymentStatusUpdateConfig{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusPaused, - StatusDescription: structs.DeploymentStatusDescriptionPaused, - } - matcher := matchDeploymentStatusUpdateRequest(matchConfig) - m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) - w.SetEnabled(true, m.state) - must.Wait(t, wait.InitialSuccess( - wait.ErrorFunc(func() error { - if watchersCount(w) != 1 { - return fmt.Errorf("expected 1 deployment") - } - return nil - }), - wait.Attempts(100), - wait.Gap(10*time.Millisecond), - )) + waitForWatchers(t, w, 1) must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID)) + waitForWatchers(t, w, 0) + + d, err := m.state.DeploymentByID(nil, d.ID) + must.NoError(t, err) + must.Nil(t, d) +} +func waitForWatchers(t *testing.T, w *Watcher, expect int) { + t.Helper() must.Wait(t, wait.InitialSuccess( - wait.ErrorFunc(func() error { - if watchersCount(w) != 0 { - return fmt.Errorf("expected deployment watcher to be stopped") - } - return nil - }), - wait.Attempts(500), + wait.BoolFunc(func() bool { return expect == watchersCount(w) }), + wait.Gap(10*time.Millisecond), + wait.Timeout(time.Second)), must.Sprintf("expected %d deployments", expect)) +} + +func waitForEvals(t *testing.T, store *state.StateStore, job *structs.Job, expect int) { + t.Helper() + must.Wait(t, wait.InitialSuccess(wait.BoolFunc(func() bool { + ws := memdb.NewWatchSet() + evals, _ := store.EvalsByJob(ws, job.Namespace, job.ID) + return len(evals) == expect + }), wait.Gap(10*time.Millisecond), - )) + wait.Timeout(5*time.Second), // some of these need to wait quite a while + ), must.Sprintf("expected %d evals before timeout", expect)) } diff --git a/nomad/deploymentwatcher/testutil_test.go b/nomad/deploymentwatcher/testutil_test.go index 7af7df9cafd..922a14c96c6 100644 --- a/nomad/deploymentwatcher/testutil_test.go +++ b/nomad/deploymentwatcher/testutil_test.go @@ -4,29 +4,27 @@ package deploymentwatcher import ( - "reflect" - "strings" "sync" "testing" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - mocker "github.com/stretchr/testify/mock" + "github.com/shoenig/test/must" ) type mockBackend struct { - mocker.Mock index uint64 state *state.StateStore l sync.Mutex + calls map[string]int } func newMockBackend(t *testing.T) *mockBackend { m := &mockBackend{ index: 10000, state: state.TestStateStore(t), + calls: map[string]int{}, } - m.Test(t) return m } @@ -38,235 +36,46 @@ func (m *mockBackend) nextIndex() uint64 { return i } -func (m *mockBackend) UpdateAllocDesiredTransition(u *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) { - m.Called(u) - i := m.nextIndex() - return i, m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, i, u.Allocs, u.Evals) +func (m *mockBackend) trackCall(method string) { + m.l.Lock() + defer m.l.Unlock() + m.calls[method]++ } -// matchUpdateAllocDesiredTransitions is used to match an upsert request -func matchUpdateAllocDesiredTransitions(deploymentIDs []string) func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - return func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - if len(update.Evals) != len(deploymentIDs) { - return false - } - - dmap := make(map[string]struct{}, len(deploymentIDs)) - for _, d := range deploymentIDs { - dmap[d] = struct{}{} - } - - for _, e := range update.Evals { - if _, ok := dmap[e.DeploymentID]; !ok { - return false - } - - delete(dmap, e.DeploymentID) - } - - return true - } +func (m *mockBackend) assertCalls(t *testing.T, method string, expect int) { + t.Helper() + m.l.Lock() + defer m.l.Unlock() + must.Eq(t, expect, m.calls[method], + must.Sprintf("expected %d calls for method=%s. got=%+v", expect, method, m.calls)) } -// matchUpdateAllocDesiredTransitionReschedule is used to match allocs that have their DesiredTransition set to Reschedule -func matchUpdateAllocDesiredTransitionReschedule(allocIDs []string) func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - return func(update *structs.AllocUpdateDesiredTransitionRequest) bool { - amap := make(map[string]struct{}, len(allocIDs)) - for _, d := range allocIDs { - amap[d] = struct{}{} - } - - for allocID, dt := range update.Allocs { - if _, ok := amap[allocID]; !ok { - return false - } - if !*dt.Reschedule { - return false - } - } - - return true - } +func (m *mockBackend) UpdateAllocDesiredTransition(u *structs.AllocUpdateDesiredTransitionRequest) (uint64, error) { + m.trackCall("UpdateAllocDesiredTransition") + i := m.nextIndex() + return i, m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, i, u.Allocs, u.Evals) } func (m *mockBackend) UpsertJob(job *structs.Job) (uint64, error) { - m.Called(job) + m.trackCall("UpsertJob") i := m.nextIndex() return i, m.state.UpsertJob(structs.MsgTypeTestSetup, i, nil, job) } func (m *mockBackend) UpdateDeploymentStatus(u *structs.DeploymentStatusUpdateRequest) (uint64, error) { - m.Called(u) + m.trackCall("UpdateDeploymentStatus") i := m.nextIndex() return i, m.state.UpdateDeploymentStatus(structs.MsgTypeTestSetup, i, u) } -// matchDeploymentStatusUpdateConfig is used to configure the matching -// function -type matchDeploymentStatusUpdateConfig struct { - // DeploymentID is the expected ID - DeploymentID string - - // Status is the desired status - Status string - - // StatusDescription is the desired status description - StatusDescription string - - // JobVersion marks whether we expect a roll back job at the given version - JobVersion *uint64 - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentStatusUpdateRequest is used to match an update request -func matchDeploymentStatusUpdateRequest(c *matchDeploymentStatusUpdateConfig) func(args *structs.DeploymentStatusUpdateRequest) bool { - return func(args *structs.DeploymentStatusUpdateRequest) bool { - if args.DeploymentUpdate.DeploymentID != c.DeploymentID { - return false - } - - if args.DeploymentUpdate.Status != c.Status && args.DeploymentUpdate.StatusDescription != c.StatusDescription { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - if c.JobVersion != nil { - if args.Job == nil { - return false - } else if args.Job.Version != *c.JobVersion { - return false - } - } else if c.JobVersion == nil && args.Job != nil { - return false - } - - return true - } -} - func (m *mockBackend) UpdateDeploymentPromotion(req *structs.ApplyDeploymentPromoteRequest) (uint64, error) { - m.Called(req) + m.trackCall("UpdateDeploymentPromotion") i := m.nextIndex() return i, m.state.UpdateDeploymentPromotion(structs.MsgTypeTestSetup, i, req) } -// matchDeploymentPromoteRequestConfig is used to configure the matching -// function -type matchDeploymentPromoteRequestConfig struct { - // Promotion holds the expected promote request - Promotion *structs.DeploymentPromoteRequest - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentPromoteRequest is used to match a promote request -func matchDeploymentPromoteRequest(c *matchDeploymentPromoteRequestConfig) func(args *structs.ApplyDeploymentPromoteRequest) bool { - return func(args *structs.ApplyDeploymentPromoteRequest) bool { - if !reflect.DeepEqual(*c.Promotion, args.DeploymentPromoteRequest) { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - return true - } -} func (m *mockBackend) UpdateDeploymentAllocHealth(req *structs.ApplyDeploymentAllocHealthRequest) (uint64, error) { - m.Called(req) + m.trackCall("UpdateDeploymentAllocHealth") i := m.nextIndex() return i, m.state.UpdateDeploymentAllocHealth(structs.MsgTypeTestSetup, i, req) } - -// matchDeploymentAllocHealthRequestConfig is used to configure the matching -// function -type matchDeploymentAllocHealthRequestConfig struct { - // DeploymentID is the expected ID - DeploymentID string - - // Healthy and Unhealthy contain the expected allocation IDs that are having - // their health set - Healthy, Unhealthy []string - - // DeploymentUpdate holds the expected values of status and description. We - // don't check for exact match but string contains - DeploymentUpdate *structs.DeploymentStatusUpdate - - // JobVersion marks whether we expect a roll back job at the given version - JobVersion *uint64 - - // Eval marks whether we expect an evaluation. - Eval bool -} - -// matchDeploymentAllocHealthRequest is used to match an update request -func matchDeploymentAllocHealthRequest(c *matchDeploymentAllocHealthRequestConfig) func(args *structs.ApplyDeploymentAllocHealthRequest) bool { - return func(args *structs.ApplyDeploymentAllocHealthRequest) bool { - if args.DeploymentID != c.DeploymentID { - return false - } - - // Require a timestamp - if args.Timestamp.IsZero() { - return false - } - - if len(c.Healthy) != len(args.HealthyAllocationIDs) { - return false - } - if len(c.Unhealthy) != len(args.UnhealthyAllocationIDs) { - return false - } - - hmap, umap := make(map[string]struct{}, len(c.Healthy)), make(map[string]struct{}, len(c.Unhealthy)) - for _, h := range c.Healthy { - hmap[h] = struct{}{} - } - for _, u := range c.Unhealthy { - umap[u] = struct{}{} - } - - for _, h := range args.HealthyAllocationIDs { - if _, ok := hmap[h]; !ok { - return false - } - } - for _, u := range args.UnhealthyAllocationIDs { - if _, ok := umap[u]; !ok { - return false - } - } - - if c.DeploymentUpdate != nil { - if args.DeploymentUpdate == nil { - return false - } - - if !strings.Contains(args.DeploymentUpdate.Status, c.DeploymentUpdate.Status) { - return false - } - if !strings.Contains(args.DeploymentUpdate.StatusDescription, c.DeploymentUpdate.StatusDescription) { - return false - } - } else if args.DeploymentUpdate != nil { - return false - } - - if c.Eval && args.Eval == nil || !c.Eval && args.Eval != nil { - return false - } - - if (c.JobVersion != nil && (args.Job == nil || args.Job.Version != *c.JobVersion)) || c.JobVersion == nil && args.Job != nil { - return false - } - - return true - } -}