Skip to content

Commit e67a1ce

Browse files
authored
Account for local activity run time in schedule sleep (#4079)
1 parent 832f4e6 commit e67a1ce

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

service/worker/scheduler/workflow.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ const (
7676
// query so it can be changed without breaking history.)
7777
maxListMatchingTimesCount = 1000
7878

79-
invalidDuration time.Duration = -1
80-
8179
rateLimitedErrorType = "RateLimited"
8280
)
8381

@@ -118,7 +116,8 @@ type (
118116
SleepWhilePaused bool // If true, don't set timers while paused/out of actions
119117
// MaxBufferSize limits the number of buffered starts. This also limits the number of
120118
// workflows that can be backfilled at once (since they all have to fit in the buffer).
121-
MaxBufferSize int
119+
MaxBufferSize int
120+
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
122121
}
123122
)
124123

@@ -150,6 +149,7 @@ var (
150149
IterationsBeforeContinueAsNew: 500,
151150
SleepWhilePaused: true,
152151
MaxBufferSize: 1000,
152+
AllowZeroSleep: true,
153153
}
154154

155155
errUpdateConflict = errors.New("conflicting concurrent update")
@@ -204,7 +204,7 @@ func (s *scheduler) run() error {
204204
s.logger.Warn("Time went backwards", "from", t1, "to", t2)
205205
t2 = t1
206206
}
207-
nextSleep := s.processTimeRange(
207+
nextWakeup := s.processTimeRange(
208208
t1, t2,
209209
// resolve this to the schedule's policy as late as possible
210210
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
@@ -215,7 +215,7 @@ func (s *scheduler) run() error {
215215
scheduleChanged := s.processSignals()
216216
if scheduleChanged {
217217
// need to calculate sleep again
218-
nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
218+
nextWakeup = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
219219
}
220220
// try starting workflows in the buffer
221221
//nolint:revive
@@ -226,7 +226,7 @@ func (s *scheduler) run() error {
226226
// 1. requested time elapsed
227227
// 2. we got a signal (update, request, refresh)
228228
// 3. a workflow that we were watching finished
229-
s.sleep(nextSleep)
229+
s.sleep(nextWakeup)
230230
s.updateTweakables()
231231
}
232232

@@ -333,11 +333,11 @@ func (s *scheduler) processTimeRange(
333333
t1, t2 time.Time,
334334
overlapPolicy enumspb.ScheduleOverlapPolicy,
335335
manual bool,
336-
) time.Duration {
336+
) time.Time {
337337
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlap-policy", overlapPolicy, "manual", manual)
338338

339339
if s.cspec == nil {
340-
return invalidDuration
340+
return time.Time{}
341341
}
342342

343343
catchupWindow := s.getCatchupWindow()
@@ -350,10 +350,8 @@ func (s *scheduler) processTimeRange(
350350
return s.cspec.getNextTime(t1)
351351
}).Get(&next))
352352
t1 = next.Next
353-
if t1.IsZero() {
354-
return invalidDuration
355-
} else if t1.After(t2) {
356-
return t1.Sub(t2)
353+
if t1.IsZero() || t1.After(t2) {
354+
return t1
357355
}
358356
if !manual && t2.Sub(t1) > catchupWindow {
359357
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
@@ -395,7 +393,7 @@ func (s *scheduler) canTakeScheduledAction(manual, decrement bool) bool {
395393
return false
396394
}
397395

398-
func (s *scheduler) sleep(nextSleep time.Duration) {
396+
func (s *scheduler) sleep(nextWakeup time.Time) {
399397
sel := workflow.NewSelector(s.ctx)
400398

401399
upCh := workflow.GetSignalChannel(s.ctx, SignalNameUpdate)
@@ -413,19 +411,27 @@ func (s *scheduler) sleep(nextSleep time.Duration) {
413411

414412
// if we're paused or out of actions, we don't need to wake up until we get an update
415413
if s.tweakables.SleepWhilePaused && !s.canTakeScheduledAction(false, false) {
416-
nextSleep = invalidDuration
414+
nextWakeup = time.Time{}
417415
}
418416

419-
if nextSleep != invalidDuration {
420-
tmr := workflow.NewTimer(s.ctx, nextSleep)
417+
if !nextWakeup.IsZero() {
418+
sleepTime := nextWakeup.Sub(s.now())
419+
// A previous version of this workflow passed around sleep duration instead of wakeup time,
420+
// which means it always set a timer even in cases where sleepTime comes out negative. For
421+
// compatibility, we have to continue setting a positive timer in those cases. The value
422+
// doesn't have to match, though.
423+
if !s.tweakables.AllowZeroSleep && sleepTime <= 0 {
424+
sleepTime = time.Second
425+
}
426+
tmr := workflow.NewTimer(s.ctx, sleepTime)
421427
sel.AddFuture(tmr, func(_ workflow.Future) {})
422428
}
423429

424430
if s.watchingFuture != nil {
425431
sel.AddFuture(s.watchingFuture, s.wfWatcherReturned)
426432
}
427433

428-
s.logger.Debug("sleeping", "next-sleep", nextSleep, "watching", s.watchingFuture != nil)
434+
s.logger.Debug("sleeping", "next-wakeup", nextWakeup, "watching", s.watchingFuture != nil)
429435
sel.Select(s.ctx)
430436
for sel.HasPending() {
431437
sel.Select(s.ctx)

0 commit comments

Comments
 (0)