Skip to content

Commit e6a4a35

Browse files
lina-temporalychebotarev
authored andcommitted
Scheduler Bugfix: getFutureActionTimes should ignore actions prior to update time, respect RemainingActions counter (#6122)
## What changed/why? In #6028/#5381, our scheduler's run loop accounted for update requests that occur between nominal and actual/jittered times. The run loop will correctly skip over action times who are scheduled prior to the schedule's update time (such as when a schedule is recalculated to a new spec). This PR updates `getFutureActionTimes` (used as part of the scheduler's describe and list operations) to account for action times preceding the update time, as well as to properly reflect the number of remaining actions in a schedule. ## How did you test it? - Updated unit tests for both limited actions (`TestLimitedActions`) as well as filtered update times (`TestUpdateNotRetroactive`). `TestUpdateNotRetroactive` is the test that goes red when the [parallel condition in the scheduler](https://github.com/temporalio/temporal/blob/44a6fe777e1dc950dd278cb1b7f6ffc8b3ba4328/service/worker/scheduler/workflow.go#L655-L660) is commented out, so it seemed the appropriate place to also test `getFutureActionTimes` lines up with the fix in the run loop. - Verified both updated tests only pass when new version (`AccurateFutureActionTimes`) is set; verified no other tests break (`make test-unit`) ## Potential risks - We break a use case that relied on speculative future action times/a constant number of action times returned ## Documentation N/A ## Is hotfix candidate? No
1 parent 1006877 commit e6a4a35

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

service/worker/scheduler/workflow.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ const (
8484
CANAfterSignals = 7
8585
// set LastProcessedTime to last action instead of now
8686
UseLastAction = 8
87+
// getFutureActionTimes accounts for UpdateTime and RemainingActions
88+
AccurateFutureActionTimes = 9
8789
)
8890

8991
const (
@@ -931,6 +933,11 @@ func (s *scheduler) processSignals() bool {
931933
return scheduleChanged
932934
}
933935

936+
// Returns up to `n` future action times.
937+
//
938+
// After workflow version `AccurateFutureActionTimes`, No more than the
939+
// schedule's `RemainingActions` will be returned. Future action times that
940+
// precede the schedule's UpdateTime are not included.
934941
func (s *scheduler) getFutureActionTimes(inWorkflowContext bool, n int) []*timestamppb.Timestamp {
935942
// Note that `s` may be a `scheduler` created outside of a workflow context, used to
936943
// compute list info at creation time or in a query. In that case inWorkflowContext will
@@ -950,6 +957,10 @@ func (s *scheduler) getFutureActionTimes(inWorkflowContext bool, n int) []*times
950957
}
951958
}
952959

960+
if s.hasMinVersion(AccurateFutureActionTimes) && s.Schedule.State.LimitedActions {
961+
n = min(int(s.Schedule.State.RemainingActions), n)
962+
}
963+
953964
if s.cspec == nil {
954965
return nil
955966
}
@@ -960,6 +971,12 @@ func (s *scheduler) getFutureActionTimes(inWorkflowContext bool, n int) []*times
960971
if t1.IsZero() {
961972
break
962973
}
974+
975+
if s.hasMinVersion(AccurateFutureActionTimes) && s.Info.UpdateTime.AsTime().After(t1) {
976+
// Skip action times whose nominal times are prior to the schedule's update time
977+
continue
978+
}
979+
963980
out = append(out, timestamppb.New(t1))
964981
}
965982
return out

service/worker/scheduler/workflow_test.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,6 +1619,11 @@ func (s *workflowSuite) TestUpdate() {
16191619
}
16201620

16211621
func (s *workflowSuite) TestUpdateNotRetroactive() {
1622+
// TODO - remove when AccurateFutureActionTimes becomes the active version
1623+
prevTweakables := currentTweakablePolicies
1624+
currentTweakablePolicies.Version = AccurateFutureActionTimes
1625+
defer func() { currentTweakablePolicies = prevTweakables }()
1626+
16221627
s.runAcrossContinue(
16231628
[]workflowRun{
16241629
{
@@ -1656,6 +1661,16 @@ func (s *workflowSuite) TestUpdateNotRetroactive() {
16561661
})
16571662
},
16581663
},
1664+
// After the update above modifies the schedule, we should discard any newly
1665+
// scheduled times that are scheduled prior to the update time.
1666+
{
1667+
at: time.Date(2022, 6, 1, 1, 7, 12, 0, time.UTC),
1668+
f: func() {
1669+
desc := s.describe()
1670+
times := desc.Info.FutureActionTimes
1671+
s.True(times[0].AsTime().After(desc.Info.UpdateTime.AsTime()), "getFutureActionTimes returned an action preceding the update time after a schedule change")
1672+
},
1673+
},
16591674
{
16601675
at: time.Date(2022, 6, 1, 1, 7, 55, 0, time.UTC),
16611676
finishTest: true,
@@ -1844,6 +1859,11 @@ func (s *workflowSuite) TestPauseUnpauseBetweenNominalAndJittered() {
18441859
}
18451860

18461861
func (s *workflowSuite) TestLimitedActions() {
1862+
// TODO - remove when AccurateFutureActionTimes becomes the active version
1863+
prevTweakables := currentTweakablePolicies
1864+
currentTweakablePolicies.Version = AccurateFutureActionTimes
1865+
defer func() { currentTweakablePolicies = prevTweakables }()
1866+
18471867
// written using low-level mocks so we can sleep forever
18481868

18491869
// limited to 2
@@ -1870,13 +1890,19 @@ func (s *workflowSuite) TestLimitedActions() {
18701890
})
18711891

18721892
s.env.RegisterDelayedCallback(func() {
1873-
s.Equal(int64(2), s.describe().Schedule.State.RemainingActions)
1893+
desc := s.describe()
1894+
s.Equal(int64(2), desc.Schedule.State.RemainingActions)
1895+
s.Equal(2, len(desc.Info.FutureActionTimes))
18741896
}, 1*time.Minute)
18751897
s.env.RegisterDelayedCallback(func() {
1876-
s.Equal(int64(1), s.describe().Schedule.State.RemainingActions)
1898+
desc := s.describe()
1899+
s.Equal(int64(1), desc.Schedule.State.RemainingActions)
1900+
s.Equal(1, len(desc.Info.FutureActionTimes))
18771901
}, 5*time.Minute)
18781902
s.env.RegisterDelayedCallback(func() {
1879-
s.Equal(int64(0), s.describe().Schedule.State.RemainingActions)
1903+
desc := s.describe()
1904+
s.Equal(int64(0), desc.Schedule.State.RemainingActions)
1905+
s.Equal(0, len(desc.Info.FutureActionTimes))
18801906
s.Equal(1, len(s.runningWorkflows()))
18811907
}, 7*time.Minute)
18821908
s.env.RegisterDelayedCallback(func() {

0 commit comments

Comments
 (0)