Skip to content

Commit 4ef165a

Browse files
samanbarghidnr
authored andcommitted
Skip over entire time range if paused and batch and cache time queries (#4215)
* Skip over entier time range if paused * Refactor * versioning using tweakables * batch next time calculation in side effect * Add version and a time generator to cache time results * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Refactor based on PR comments * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Add enum for versioning * Fix a bug with skipping schedules * Additional check for Equal * Test cache * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Use map and add backfill to test * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Update service/worker/scheduler/workflow.go Co-authored-by: David Reiss <[email protected]> * Update name * Improve backfill injection and add some randomness * Clear the map before re-populating * set map to nil * update tests to refill cache --------- Co-authored-by: David Reiss <[email protected]>
1 parent f64bb0e commit 4ef165a

File tree

2 files changed

+141
-10
lines changed

2 files changed

+141
-10
lines changed

service/worker/scheduler/workflow.go

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ import (
5353
"go.temporal.io/server/common/util"
5454
)
5555

56+
type SchedulerWorkflowVersion int64
57+
58+
const (
59+
// represents the state before Version is introduced
60+
InitialVersion SchedulerWorkflowVersion = iota
61+
// skip over entire time range if paused and batch and cache getNextTime queries
62+
BatchAndCacheTimeQueries
63+
)
64+
5665
const (
5766
// Schedules are implemented by a workflow whose ID is this string plus the schedule ID.
5867
WorkflowIDPrefix = "temporal-sys-scheduler:"
@@ -77,6 +86,8 @@ const (
7786
maxListMatchingTimesCount = 1000
7887

7988
rateLimitedErrorType = "RateLimited"
89+
90+
maxNextTimeResultCacheSize = 10
8091
)
8192

8293
type (
@@ -104,6 +115,10 @@ type (
104115
pendingUpdate *schedspb.FullUpdateRequest
105116

106117
uuidBatch []string
118+
119+
// This cache is used to store time results after batching getNextTime queries
120+
// in a single SideEffect
121+
nextTimeResultCache map[time.Time]getNextTimeResult
107122
}
108123

109124
tweakablePolicies struct {
@@ -120,8 +135,10 @@ type (
120135
// MaxBufferSize limits the number of buffered starts. This also limits the number of
121136
// workflows that can be backfilled at once (since they all have to fit in the buffer).
122137
MaxBufferSize int
123-
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
124-
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
138+
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
139+
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
140+
Version SchedulerWorkflowVersion // Used to keep track of schedules version to release new features and for backward compatibility
141+
// version 0 corresponds to the schedule version that comes before introducing the Version parameter
125142
}
126143
)
127144

@@ -155,6 +172,7 @@ var (
155172
MaxBufferSize: 1000,
156173
AllowZeroSleep: true,
157174
ReuseTimer: true,
175+
Version: BatchAndCacheTimeQueries,
158176
}
159177

160178
errUpdateConflict = errors.New("conflicting concurrent update")
@@ -202,6 +220,7 @@ func (s *scheduler) run() error {
202220
s.InitialPatch = nil
203221

204222
for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0 || s.pendingUpdate != nil || s.pendingPatch != nil; iters-- {
223+
205224
t1 := timestamp.TimeValue(s.State.LastProcessedTime)
206225
t2 := s.now()
207226
if t2.Before(t1) {
@@ -271,6 +290,9 @@ func (s *scheduler) ensureFields() {
271290
}
272291

273292
func (s *scheduler) compileSpec() {
293+
// if spec changes invalidate current nextTimeResult cache
294+
s.nextTimeResultCache = nil
295+
274296
cspec, err := NewCompiledSpec(s.Schedule.Spec)
275297
if err != nil {
276298
if s.logger != nil {
@@ -334,6 +356,29 @@ func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
334356
}
335357
}
336358

359+
func (s *scheduler) getNextTime(after time.Time) getNextTimeResult {
360+
361+
// we populate the map sequentially, if after is not in the map, it means we either exhausted
362+
// all items, or we jumped through time (forward or backward), in either case, refresh the cache
363+
next, ok := s.nextTimeResultCache[after]
364+
if ok {
365+
return next
366+
}
367+
s.nextTimeResultCache = nil
368+
// Run this logic in a SideEffect so that we can fix bugs there without breaking
369+
// existing schedule workflows.
370+
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
371+
results := make(map[time.Time]getNextTimeResult)
372+
for t := after; !t.IsZero() && len(results) < maxNextTimeResultCacheSize; {
373+
next := s.cspec.getNextTime(t)
374+
results[t] = next
375+
t = next.Next
376+
}
377+
return results
378+
}).Get(&s.nextTimeResultCache))
379+
return s.nextTimeResultCache[after]
380+
}
381+
337382
func (s *scheduler) processTimeRange(
338383
t1, t2 time.Time,
339384
overlapPolicy enumspb.ScheduleOverlapPolicy,
@@ -347,20 +392,34 @@ func (s *scheduler) processTimeRange(
347392

348393
catchupWindow := s.getCatchupWindow()
349394

395+
// A previous version would record a marker for each time which could make a workflow
396+
// fail. With the new version, the entire time range is skipped if the workflow is paused
397+
// or we are not going to take an action now
398+
if s.tweakables.Version >= BatchAndCacheTimeQueries {
399+
// Peek at paused/remaining actions state and don't bother if we're not going to
400+
// take an action now. (Don't count as missed catchup window either.)
401+
// Skip over entire time range if paused or no actions can be taken
402+
if !s.canTakeScheduledAction(manual, false) {
403+
return s.getNextTime(t2).Next
404+
}
405+
}
406+
350407
for {
351-
// Run this logic in a SideEffect so that we can fix bugs there without breaking
352-
// existing schedule workflows.
353408
var next getNextTimeResult
354-
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
355-
return s.cspec.getNextTime(t1)
356-
}).Get(&next))
409+
if s.tweakables.Version < BatchAndCacheTimeQueries {
410+
// Run this logic in a SideEffect so that we can fix bugs there without breaking
411+
// existing schedule workflows.
412+
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
413+
return s.cspec.getNextTime(t1)
414+
}).Get(&next))
415+
} else {
416+
next = s.getNextTime(t1)
417+
}
357418
t1 = next.Next
358419
if t1.IsZero() || t1.After(t2) {
359420
return t1
360421
}
361-
// Peek at paused/remaining actions state and don't bother if we're not going to
362-
// take an action now. (Don't count as missed catchup window either.)
363-
if !s.canTakeScheduledAction(manual, false) {
422+
if s.tweakables.Version < BatchAndCacheTimeQueries && !s.canTakeScheduledAction(manual, false) {
364423
continue
365424
}
366425
if !manual && t2.Sub(t1) > catchupWindow {

service/worker/scheduler/workflow_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package scheduler
2727
import (
2828
"context"
2929
"errors"
30+
"math/rand"
3031
"testing"
3132
"time"
3233

@@ -1402,3 +1403,74 @@ func (s *workflowSuite) TestLimitedActions() {
14021403
s.True(s.env.IsWorkflowCompleted())
14031404
// doesn't end properly since it sleeps forever after pausing
14041405
}
1406+
1407+
func (s *workflowSuite) TestLotsOfIterations() {
1408+
// This is mostly testing getNextTime caching logic.
1409+
const runIterations = 30
1410+
const backfillIterations = 15
1411+
1412+
runs := make([]workflowRun, runIterations)
1413+
for i := range runs {
1414+
t := time.Date(2022, 6, 1, i, 27+i%2, 0, 0, time.UTC)
1415+
runs[i] = workflowRun{
1416+
id: "myid-" + t.Format(time.RFC3339),
1417+
start: t,
1418+
end: t.Add(time.Duration(5+i%7) * time.Minute),
1419+
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
1420+
}
1421+
}
1422+
1423+
delayedCallbacks := make([]delayedCallback, backfillIterations)
1424+
1425+
expected := runIterations
1426+
// schedule a call back every hour to spray backfills among scheduled runs
1427+
// each call back adds random number of backfills in
1428+
// [maxNextTimeResultCacheSize, 2*maxNextTimeResultCacheSize) range
1429+
for i := range delayedCallbacks {
1430+
1431+
maxRuns := rand.Intn(maxNextTimeResultCacheSize) + maxNextTimeResultCacheSize
1432+
expected += maxRuns
1433+
// a point in time to send the callback request
1434+
callbackTime := time.Date(2022, 6, 1, i+15, 2, 0, 0, time.UTC)
1435+
// start time for callback request
1436+
callBackRangeStartTime := time.Date(2022, 5, i, 0, 0, 0, 0, time.UTC)
1437+
1438+
// add/process maxRuns schedules
1439+
for j := 0; j < maxRuns; j++ {
1440+
runStartTime := time.Date(2022, 5, i, j, 27+j%2, 0, 0, time.UTC)
1441+
runs = append(runs, workflowRun{
1442+
id: "myid-" + runStartTime.Format(time.RFC3339),
1443+
start: callbackTime.Add(time.Duration(j) * time.Minute),
1444+
end: callbackTime.Add(time.Duration(j+1) * time.Minute),
1445+
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
1446+
})
1447+
}
1448+
1449+
delayedCallbacks[i] = delayedCallback{
1450+
at: callbackTime,
1451+
f: func() {
1452+
s.env.SignalWorkflow(SignalNamePatch, &schedpb.SchedulePatch{
1453+
BackfillRequest: []*schedpb.BackfillRequest{{
1454+
StartTime: timestamp.TimePtr(callBackRangeStartTime),
1455+
EndTime: timestamp.TimePtr(callBackRangeStartTime.Add(time.Duration(maxRuns) * time.Hour)),
1456+
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL,
1457+
}},
1458+
})
1459+
},
1460+
}
1461+
}
1462+
1463+
s.runAcrossContinue(
1464+
runs,
1465+
delayedCallbacks,
1466+
&schedpb.Schedule{
1467+
Spec: &schedpb.ScheduleSpec{
1468+
Calendar: []*schedpb.CalendarSpec{
1469+
{Minute: "27", Hour: "0/2"},
1470+
{Minute: "28", Hour: "1/2"},
1471+
},
1472+
},
1473+
},
1474+
expected+1,
1475+
)
1476+
}

0 commit comments

Comments
 (0)