Skip to content

Commit 263478c

Browse files
dnrsamanbarghi
authored andcommitted
Fix schedule catchup window metric calculation while paused (temporalio#4152)
1 parent d6a4a24 commit 263478c

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

service/worker/scheduler/workflow.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,17 +358,17 @@ func (s *scheduler) processTimeRange(
358358
if t1.IsZero() || t1.After(t2) {
359359
return t1
360360
}
361+
// Peek at paused/remaining actions state and don't bother if we're not going to
362+
// take an action now. (Don't count as missed catchup window either.)
363+
if !s.canTakeScheduledAction(manual, false) {
364+
continue
365+
}
361366
if !manual && t2.Sub(t1) > catchupWindow {
362367
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
363368
s.metrics.Counter(metrics.ScheduleMissedCatchupWindow.GetMetricName()).Inc(1)
364369
s.Info.MissedCatchupWindow++
365370
continue
366371
}
367-
// Peek at paused/remaining actions state and don't even bother adding
368-
// to buffer if we're not going to take an action now.
369-
if !s.canTakeScheduledAction(manual, false) {
370-
continue
371-
}
372372
s.addStart(next.Nominal, next.Next, overlapPolicy, manual)
373373
}
374374
}

service/worker/scheduler/workflow_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,52 @@ func (s *workflowSuite) TestCatchupWindow() {
434434
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
435435
}
436436

437+
func (s *workflowSuite) TestCatchupWindowWhilePaused() {
438+
// written using low-level mocks so we can set initial state
439+
440+
s.env.RegisterDelayedCallback(func() {
441+
// should not count any "misses" since we were paused
442+
s.Equal(int64(0), s.describe().Info.MissedCatchupWindow)
443+
// unpause just to make the test end cleanly
444+
s.env.SignalWorkflow(SignalNamePatch, &schedpb.SchedulePatch{Unpause: "go ahead"})
445+
}, 3*time.Minute)
446+
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
447+
s.True(time.Date(2022, 6, 1, 0, 17, 0, 0, time.UTC).Equal(s.now()))
448+
s.Equal("myid-2022-06-01T00:17:00Z", req.Request.WorkflowId)
449+
return nil, nil
450+
})
451+
452+
currentTweakablePolicies.IterationsBeforeContinueAsNew = 3
453+
s.env.SetStartTime(baseStartTime)
454+
s.env.ExecuteWorkflow(SchedulerWorkflow, &schedspb.StartScheduleArgs{
455+
Schedule: &schedpb.Schedule{
456+
Spec: &schedpb.ScheduleSpec{
457+
Calendar: []*schedpb.CalendarSpec{{
458+
Minute: "17",
459+
Hour: "*",
460+
}},
461+
},
462+
Action: s.defaultAction("myid"),
463+
Policies: &schedpb.SchedulePolicies{
464+
CatchupWindow: timestamp.DurationPtr(1 * time.Hour),
465+
},
466+
State: &schedpb.ScheduleState{
467+
Paused: true,
468+
},
469+
},
470+
State: &schedspb.InternalState{
471+
Namespace: "myns",
472+
NamespaceId: "mynsid",
473+
ScheduleId: "myschedule",
474+
ConflictToken: InitialConflictToken,
475+
// workflow "woke up" after 6 hours
476+
LastProcessedTime: timestamp.TimePtr(time.Date(2022, 5, 31, 18, 0, 0, 0, time.UTC)),
477+
},
478+
})
479+
s.True(s.env.IsWorkflowCompleted())
480+
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()), s.env.GetWorkflowError())
481+
}
482+
437483
func (s *workflowSuite) TestOverlapSkip() {
438484
s.runAcrossContinue(
439485
[]workflowRun{

0 commit comments

Comments
 (0)