Skip to content
Merged
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
39036c3
Skip over entier time range if paused
samanbarghi Apr 24, 2023
d8e6383
Refactor
samanbarghi Apr 24, 2023
5782384
Merge branch 'master' into feature/skip-over-entinre-time-range-while…
samanbarghi Apr 27, 2023
0bfb2f2
versioning using tweakables
samanbarghi Apr 27, 2023
13793a4
batch next time calculation in side effect
samanbarghi Apr 28, 2023
ee96872
Add version and a time generator to cache time results
samanbarghi Apr 28, 2023
4165936
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
584d190
Refactor based on PR comments
samanbarghi Apr 28, 2023
a861b9f
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
46b85b7
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
f5d4291
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
f9eae8f
Update service/worker/scheduler/workflow.go
samanbarghi Apr 28, 2023
230541a
Add enum for versioning
samanbarghi Apr 28, 2023
f268b46
Fix a bug with skipping schedules
samanbarghi May 1, 2023
b4051e8
Additional check for Equal
samanbarghi May 2, 2023
27b66cb
Test cache
samanbarghi May 2, 2023
b7ac461
Update service/worker/scheduler/workflow.go
samanbarghi May 2, 2023
6bb79ac
Use map and add backfill to test
samanbarghi May 3, 2023
732bf0f
Update service/worker/scheduler/workflow.go
samanbarghi May 3, 2023
199ba33
Update service/worker/scheduler/workflow.go
samanbarghi May 3, 2023
191d03a
Update service/worker/scheduler/workflow.go
samanbarghi May 5, 2023
9521321
Update name
samanbarghi May 8, 2023
6eb8d75
Improve backfill injection and add some randomness
samanbarghi May 8, 2023
26a76df
Clear the map before re-populating
samanbarghi May 9, 2023
790dc4b
set map to nil
samanbarghi May 9, 2023
3fd344a
update tests to refill cache
samanbarghi May 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 75 additions & 8 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ import (
"go.temporal.io/server/common/util"
)

// List of scheudle versions:
// 0 this represents the code before Version is introduced
// 1 Skip over entire time range if paused and batch and cache getNextTime queries
const latestVersion = 1

const (
// Schedules are implemented by a workflow whose ID is this string plus the schedule ID.
WorkflowIDPrefix = "temporal-sys-scheduler:"
Expand All @@ -77,6 +82,8 @@ const (
maxListMatchingTimesCount = 1000

rateLimitedErrorType = "RateLimited"

maxNextTimeResultCacheSize = 10
)

type (
Expand Down Expand Up @@ -104,6 +111,10 @@ type (
pendingUpdate *schedspb.FullUpdateRequest

uuidBatch []string

// This cache is used to store time results after batching getNextTime queries
// in a single SideEffect
nextTimeResultCache []getNextTimeResult
}

tweakablePolicies struct {
Expand All @@ -122,6 +133,8 @@ type (
MaxBufferSize int
AllowZeroSleep bool // Whether to allow a zero-length timer. Used for workflow compatibility.
ReuseTimer bool // Whether to reuse timer. Used for workflow compatibility.
Version int // Used to keep track of schedules version to release new features and for backward compatibility
// version 0 corresponds to the schedule version that comes before introducing the Version parameter
}
)

Expand Down Expand Up @@ -155,6 +168,7 @@ var (
MaxBufferSize: 1000,
AllowZeroSleep: true,
ReuseTimer: true,
Version: latestVersion,
}

errUpdateConflict = errors.New("conflicting concurrent update")
Expand Down Expand Up @@ -201,7 +215,13 @@ func (s *scheduler) run() error {
s.pendingPatch = s.InitialPatch
s.InitialPatch = nil

currentVersion := s.tweakables.Version

for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0 || s.pendingUpdate != nil || s.pendingPatch != nil; iters-- {
// in case of downgrades, break to contine-as-new
if currentVersion > s.tweakables.Version {
break
}
t1 := timestamp.TimeValue(s.State.LastProcessedTime)
t2 := s.now()
if t2.Before(t1) {
Expand Down Expand Up @@ -271,6 +291,9 @@ func (s *scheduler) ensureFields() {
}

func (s *scheduler) compileSpec() {
// if spec changes invalidate current nextTimeResult cache
s.nextTimeResultCache = make([]getNextTimeResult, 0)

cspec, err := NewCompiledSpec(s.Schedule.Spec)
if err != nil {
if s.logger != nil {
Expand Down Expand Up @@ -334,6 +357,36 @@ func (s *scheduler) processPatch(patch *schedpb.SchedulePatch) {
}
}

func (s *scheduler) getNextTime(after time.Time) getNextTimeResult {

// ignore schedules that come before the 'after' parameter
for len(s.nextTimeResultCache) != 0 && s.nextTimeResultCache[0].Next.Before(after) {
s.nextTimeResultCache = s.nextTimeResultCache[1:]
}

// if we exhausted all cache elements, fetch more schedules
if len(s.nextTimeResultCache) == 0 {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
var results []getNextTimeResult
for len(results) < maxNextTimeResultCacheSize {
next := s.cspec.getNextTime(after)
after = next.Next
results = append(results, next)
// if there is no matching time, stop fetching
if after.IsZero() {
break
}
}
return results
}).Get(&s.nextTimeResultCache))
}
next := s.nextTimeResultCache[0]
s.nextTimeResultCache = s.nextTimeResultCache[1:]
return next
}

func (s *scheduler) processTimeRange(
t1, t2 time.Time,
overlapPolicy enumspb.ScheduleOverlapPolicy,
Expand All @@ -347,20 +400,34 @@ func (s *scheduler) processTimeRange(

catchupWindow := s.getCatchupWindow()

// A previous version would record a marker for each time which could make a workflow
// fail. With the new version, the entire time range is skipped if the workflow is paused
// or we are not going to take an action now
if s.tweakables.Version >= 1 {
// Peek at paused/remaining actions state and don't bother if we're not going to
// take an action now. (Don't count as missed catchup window either.)
// Skip over entire time range if paused or no actions can be taken
if !s.canTakeScheduledAction(manual, false) {
return s.getNextTime(t2).Next
}
}

for {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
var next getNextTimeResult
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next))
if s.tweakables.Version < 1 {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
panicIfErr(workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
return s.cspec.getNextTime(t1)
}).Get(&next))
} else {
next = s.getNextTime(t1)
}
t1 = next.Next
if t1.IsZero() || t1.After(t2) {
return t1
}
// Peek at paused/remaining actions state and don't bother if we're not going to
// take an action now. (Don't count as missed catchup window either.)
if !s.canTakeScheduledAction(manual, false) {
if s.tweakables.Version < 1 && !s.canTakeScheduledAction(manual, false) {
continue
}
if !manual && t2.Sub(t1) > catchupWindow {
Expand Down