Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
39036c3
Skip over entier time range if paused
samanbarghi Apr 24, 2023
d8e6383
Refactor
samanbarghi Apr 24, 2023
5782384
Merge branch 'master' into feature/skip-over-entinre-time-range-while…
samanbarghi Apr 27, 2023
0bfb2f2
versioning using tweakables
samanbarghi Apr 27, 2023
13793a4
batch next time calculation in side effect
samanbarghi Apr 28, 2023
ee96872
Add version and a time generator to cache time results
samanbarghi Apr 28, 2023
4165936
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
584d190
Refactor based on PR comments
samanbarghi Apr 28, 2023
a861b9f
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
46b85b7
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
f5d4291
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
f9eae8f
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
230541a
Add enum for versioning
samanbarghi Apr 28, 2023
f268b46
Fix a bug with skipping schedules
samanbarghi May 1, 2023
b4051e8
Additional check for Equal
samanbarghi May 2, 2023
27b66cb
Test cache
samanbarghi May 2, 2023
b7ac461
Update service/worker/scheduler/workflow.go
samanbarghi May 2, 2023
6bb79ac
Use map and add backfill to test
samanbarghi May 3, 2023
732bf0f
Update service/worker/scheduler/workflow.go
samanbarghi May 3, 2023
199ba33
Update service/worker/scheduler/workflow.go
samanbarghi May 3, 2023
191d03a
Update service/worker/scheduler/workflow.go
samanbarghi May 5, 2023
9521321
Update name
samanbarghi May 8, 2023
6eb8d75
Improve backfill injection and add some randomness
samanbarghi May 8, 2023
26a76df
Clear the map before re-populating
samanbarghi May 9, 2023
790dc4b
set map to nil
samanbarghi May 9, 2023
3fd344a
update tests to refill cache
samanbarghi May 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 69 additions & 10 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ import (
"go.temporal.io/server/common/util"
)

type SchedulerWorkflowVersion int64

const (
// represents the state before Version is introduced
InitialVersion SchedulerWorkflowVersion = iota
// skip over entire time range if paused and batch and cache getNextTime queries
BatchAndCacheTimeQueries
)

const (
// Schedules are implemented by a workflow whose ID is this string plus the schedule ID.
WorkflowIDPrefix = "temporal-sys-scheduler:"
Expand All @@ -77,6 +86,8 @@ const (
maxListMatchingTimesCount = 1000

rateLimitedErrorType = "RateLimited"

maxNextTimeResultCacheSize = 10
)

type (
Expand Down Expand Up @@ -104,6 +115,10 @@ type (
pendingUpdate *schedspb.FullUpdateRequest

uuidBatch []string

// This cache is used to store time results after batching getNextTime queries
// in a single SideEffect
nextTimeResultCache map[time.Time]getNextTimeResult
}

tweakablePolicies struct {
Expand All @@ -120,8 +135,10 @@ type (
// MaxBufferSize limits the number of buffered starts. This also limits the number of
// workflows that can be backfilled at once (since they all have to fit in the buffer).
MaxBufferSize int
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
Version SchedulerWorkflowVersion // Used to keep track of schedules version to release new features and for backward compatibility
// version 0 corresponds to the schedule version that comes before introducing the Version parameter
}
)

Expand Down Expand Up @@ -155,6 +172,7 @@ var (
MaxBufferSize: 1000,
AllowZeroSleep: true,
ReuseTimer: true,
Version: BatchAndCacheTimeQueries,
}

errUpdateConflict = errors.New("conflicting concurrent update")
Expand Down Expand Up @@ -202,6 +220,7 @@ func (s *scheduler) run() error {
s.InitialPatch = nil

for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0 || s.pendingUpdate != nil || s.pendingPatch != nil; iters-- {

t1 := timestamp.TimeValue(s.State.LastProcessedTime)
t2 := s.now()
if t2.Before(t1) {
Expand Down Expand Up @@ -271,6 +290,9 @@ func (s *scheduler) ensureFields() {
}

func (s *scheduler) compileSpec() {
// if spec changes invalidate current nextTimeResult cache
s.nextTimeResultCache = nil

cspec, err := NewCompiledSpec(s.Schedule.Spec)
if err != nil {
if s.logger != nil {
Expand Down Expand Up @@ -334,6 +356,29 @@ func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
}
}

func (s *scheduler) getNextTime(after time.Time) getNextTimeResult {

// we populate the map sequentially, if after is not in the map, it means we either exhausted
// all items, or we jumped through time (forward or backward), in either case, refresh the cache
next, ok := s.nextTimeResultCache[after]
if ok {
return next
}
s.nextTimeResultCache = nil
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
results := make(map[time.Time]getNextTimeResult)
for t := after; !t.IsZero() && len(results) < maxNextTimeResultCacheSize; {
next := s.cspec.getNextTime(t)
results[t] = next
t = next.Next
}
return results
}).Get(&s.nextTimeResultCache))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing: I think that Get into a map may actually add entries instead of resetting the map (it ends up in the data converter which does json.Unmarshal), so the map might keep growing. I think we should explicitly clear the map before Get just in case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, will do.

return s.nextTimeResultCache[after]
}

func (s *scheduler) processTimeRange(
t1, t2 time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
Expand All @@ -347,20 +392,34 @@ func (s *scheduler) processTimeRange(

catchupWindow := s.getCatchupWindow()

// A previous version would record a marker for each time which could make a workflow
// fail. With the new version, the entire time range is skipped if the workflow is paused
// or we are not going to take an action now
if s.tweakables.Version >= BatchAndCacheTimeQueries {
// Peek at paused/remaining actions state and don't bother if we're not going to
// take an action now. (Don't count as missed catchup window either.)
// Skip over entire time range if paused or no actions can be taken
if !s.canTakeScheduledAction(manual, false) {
return s.getNextTime(t2).Next
}
}

for {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next))
if s.tweakables.Version < BatchAndCacheTimeQueries {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next))
} else {
next = s.getNextTime(t1)
}
t1 = next.Next
if t1.IsZero() || t1.After(t2) {
return t1
}
// Peek at paused/remaining actions state and don't bother if we're not going to
// take an action now. (Don't count as missed catchup window either.)
if !s.canTakeScheduledAction(manual, false) {
if s.tweakables.Version < BatchAndCacheTimeQueries && !s.canTakeScheduledAction(manual, false) {
continue
}
if !manual && t2.Sub(t1) > catchupWindow {
Expand Down
72 changes: 72 additions & 0 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package scheduler
import (
"context"
"errors"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -1402,3 +1403,74 @@ func (s *workflowSuite) TestLimitedActions() {
s.True(s.env.IsWorkflowCompleted())
// doesn't end properly since it sleeps forever after pausing
}

func (s *workflowSuite) TestLotsOfIterations() {
// This is mostly testing getNextTime caching logic.
const runIterations = 30
const backfillIterations = 15

runs := make([]workflowRun, runIterations)
for i := range runs {
t := time.Date(2022, 6, 1, i, 27+i%2, 0, 0, time.UTC)
runs[i] = workflowRun{
id: "myid-" + t.Format(time.RFC3339),
start: t,
end: t.Add(time.Duration(5+i%7) * time.Minute),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
}
}

delayedCallbacks := make([]delayedCallback, backfillIterations)

expected := runIterations
// schedule a call back every hour to spray backfills among scheduled runs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nitpick: can you make the backfill only start after 2022-06-01 15:00:00? i.e. I want at least one long stretch where the cache runs out and has to refill naturally. (or even better, use the value of maxNextTimeResultCacheSize.) I think it may not actually be covering that branch anymore

// each call back adds random number of backfills in
// [maxNextTimeResultCacheSize, 2*maxNextTimeResultCacheSize) range
for i := range delayedCallbacks {

maxRuns := rand.Intn(maxNextTimeResultCacheSize) + maxNextTimeResultCacheSize
expected += maxRuns
// a point in time to send the callback request
callbackTime := time.Date(2022, 6, 1, i+15, 2, 0, 0, time.UTC)
// start time for callback request
callBackRangeStartTime := time.Date(2022, 5, i, 0, 0, 0, 0, time.UTC)

// add/process maxRuns schedules
for j := 0; j < maxRuns; j++ {
runStartTime := time.Date(2022, 5, i, j, 27+j%2, 0, 0, time.UTC)
runs = append(runs, workflowRun{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad these don't have to be in order, that'd be even more annoying

id: "myid-" + runStartTime.Format(time.RFC3339),
start: callbackTime.Add(time.Duration(j) * time.Minute),
end: callbackTime.Add(time.Duration(j+1) * time.Minute),
result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
})
}

delayedCallbacks[i] = delayedCallback{
at: callbackTime,
f: func() {
s.env.SignalWorkflow(SignalNamePatch, &schedpb.SchedulePatch{
BackfillRequest: []*schedpb.BackfillRequest{{
StartTime: timestamp.TimePtr(callBackRangeStartTime),
EndTime: timestamp.TimePtr(callBackRangeStartTime.Add(time.Duration(maxRuns) * time.Hour)),
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL,
}},
})
},
}
}

s.runAcrossContinue(
runs,
delayedCallbacks,
&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Calendar: []*schedpb.CalendarSpec{
{Minute: "27", Hour: "0/2"},
{Minute: "28", Hour: "1/2"},
},
},
},
expected+1,
)
}