@@ -76,8 +76,6 @@ const (
7676 // query so it can be changed without breaking history.)
7777 maxListMatchingTimesCount = 1000
7878
79- invalidDuration time.Duration = - 1
80-
8179 rateLimitedErrorType = "RateLimited"
8280)
8381
@@ -118,7 +116,8 @@ type (
118116 SleepWhilePaused bool // If true, don't set timers while paused/out of actions
119117 // MaxBufferSize limits the number of buffered starts. This also limits the number of
120118 // workflows that can be backfilled at once (since they all have to fit in the buffer).
121- MaxBufferSize int
119+ MaxBufferSize int
120+ AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
122121 }
123122)
124123
@@ -150,6 +149,7 @@ var (
150149 IterationsBeforeContinueAsNew : 500 ,
151150 SleepWhilePaused : true ,
152151 MaxBufferSize : 1000 ,
152+ AllowZeroSleep : true ,
153153 }
154154
155155 errUpdateConflict = errors .New ("conflicting concurrent update" )
@@ -204,7 +204,7 @@ func (s *scheduler) run() error {
204204 s .logger .Warn ("Time went backwards" , "from" , t1 , "to" , t2 )
205205 t2 = t1
206206 }
207- nextSleep := s .processTimeRange (
207+ nextWakeup := s .processTimeRange (
208208 t1 , t2 ,
209209 // resolve this to the schedule's policy as late as possible
210210 enumspb .SCHEDULE_OVERLAP_POLICY_UNSPECIFIED ,
@@ -215,7 +215,7 @@ func (s *scheduler) run() error {
215215 scheduleChanged := s .processSignals ()
216216 if scheduleChanged {
217217 // need to calculate sleep again
218- nextSleep = s .processTimeRange (t2 , t2 , enumspb .SCHEDULE_OVERLAP_POLICY_UNSPECIFIED , false )
218+ nextWakeup = s .processTimeRange (t2 , t2 , enumspb .SCHEDULE_OVERLAP_POLICY_UNSPECIFIED , false )
219219 }
220220 // try starting workflows in the buffer
221221 //nolint:revive
@@ -226,7 +226,7 @@ func (s *scheduler) run() error {
226226 // 1. requested time elapsed
227227 // 2. we got a signal (update, request, refresh)
228228 // 3. a workflow that we were watching finished
229- s .sleep (nextSleep )
229+ s .sleep (nextWakeup )
230230 s .updateTweakables ()
231231 }
232232
@@ -333,11 +333,11 @@ func (s *scheduler) processTimeRange(
333333 t1 , t2 time.Time ,
334334 overlapPolicy enumspb.ScheduleOverlapPolicy ,
335335 manual bool ,
336- ) time.Duration {
336+ ) time.Time {
337337 s .logger .Debug ("processTimeRange" , "t1" , t1 , "t2" , t2 , "overlap-policy" , overlapPolicy , "manual" , manual )
338338
339339 if s .cspec == nil {
340- return invalidDuration
340+ return time. Time {}
341341 }
342342
343343 catchupWindow := s .getCatchupWindow ()
@@ -350,10 +350,8 @@ func (s *scheduler) processTimeRange(
350350 return s .cspec .getNextTime (t1 )
351351 }).Get (& next ))
352352 t1 = next .Next
353- if t1 .IsZero () {
354- return invalidDuration
355- } else if t1 .After (t2 ) {
356- return t1 .Sub (t2 )
353+ if t1 .IsZero () || t1 .After (t2 ) {
354+ return t1
357355 }
358356 if ! manual && t2 .Sub (t1 ) > catchupWindow {
359357 s .logger .Warn ("Schedule missed catchup window" , "now" , t2 , "time" , t1 )
@@ -395,7 +393,7 @@ func (s *scheduler) canTakeScheduledAction(manual, decrement bool) bool {
395393 return false
396394}
397395
398- func (s * scheduler ) sleep (nextSleep time.Duration ) {
396+ func (s * scheduler ) sleep (nextWakeup time.Time ) {
399397 sel := workflow .NewSelector (s .ctx )
400398
401399 upCh := workflow .GetSignalChannel (s .ctx , SignalNameUpdate )
@@ -413,19 +411,27 @@ func (s *scheduler) sleep(nextSleep time.Duration) {
413411
414412 // if we're paused or out of actions, we don't need to wake up until we get an update
415413 if s .tweakables .SleepWhilePaused && ! s .canTakeScheduledAction (false , false ) {
416- nextSleep = invalidDuration
414+ nextWakeup = time. Time {}
417415 }
418416
419- if nextSleep != invalidDuration {
420- tmr := workflow .NewTimer (s .ctx , nextSleep )
417+ if ! nextWakeup .IsZero () {
418+ sleepTime := nextWakeup .Sub (s .now ())
419+ // A previous version of this workflow passed around sleep duration instead of wakeup time,
420+ // which means it always set a timer even in cases where sleepTime comes out negative. For
421+ // compatibility, we have to continue setting a positive timer in those cases. The value
422+ // doesn't have to match, though.
423+ if ! s .tweakables .AllowZeroSleep && sleepTime <= 0 {
424+ sleepTime = time .Second
425+ }
426+ tmr := workflow .NewTimer (s .ctx , sleepTime )
421427 sel .AddFuture (tmr , func (_ workflow.Future ) {})
422428 }
423429
424430 if s .watchingFuture != nil {
425431 sel .AddFuture (s .watchingFuture , s .wfWatcherReturned )
426432 }
427433
428- s .logger .Debug ("sleeping" , "next-sleep " , nextSleep , "watching" , s .watchingFuture != nil )
434+ s .logger .Debug ("sleeping" , "next-wakeup " , nextWakeup , "watching" , s .watchingFuture != nil )
429435 sel .Select (s .ctx )
430436 for sel .HasPending () {
431437 sel .Select (s .ctx )
0 commit comments