Skip to content

Commit 0441ebe

Browse files
CHASM Visibility support minor fixes (#8690)
## What changed? - Addresses a few follow-ups from the initial PR after merge: - Makes `Memo` provider use deterministic proto marshalling - Fixes a few style nitpicks - Removes some unneeded TODOs
1 parent 86f4799 commit 0441ebe

File tree

7 files changed

+48
-58
lines changed

7 files changed

+48
-58
lines changed

chasm/lib/scheduler/generator_tasks.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (g *GeneratorTaskExecutor) Execute(
101101

102102
// Write the new high water mark and future action times.
103103
generator.LastProcessedTime = timestamppb.New(result.LastActionTime)
104-
if err := g.updateFutureActionTimes(ctx, generator, scheduler); err != nil {
104+
if err := g.updateFutureActionTimes(generator, scheduler); err != nil {
105105
return queueerrors.NewUnprocessableTaskError(
106106
fmt.Sprintf("failed to update future action times: %s", err.Error()))
107107
}
@@ -143,12 +143,11 @@ func (g *GeneratorTaskExecutor) logSchedule(logger log.Logger, msg string, sched
143143
}
144144

145145
func (g *GeneratorTaskExecutor) updateFutureActionTimes(
146-
ctx chasm.MutableContext,
147146
generator *Generator,
148147
scheduler *Scheduler,
149148
) error {
150149
nextTime := func(t time.Time) (time.Time, error) {
151-
res, err := g.SpecProcessor.GetNextTime(scheduler, t)
150+
res, err := g.SpecProcessor.NextTime(scheduler, t)
152151
return res.Next, err
153152
}
154153

chasm/lib/scheduler/invoker_tasks.go

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ import (
2020
"go.temporal.io/server/common/log"
2121
"go.temporal.io/server/common/log/tag"
2222
"go.temporal.io/server/common/metrics"
23-
"go.temporal.io/server/common/payload"
2423
"go.temporal.io/server/common/resource"
25-
"go.temporal.io/server/common/searchattribute/sadefs"
2624
"go.temporal.io/server/common/util"
2725
queueerrors "go.temporal.io/server/service/history/queues/errors"
2826
legacyscheduler "go.temporal.io/server/service/worker/scheduler"
@@ -516,26 +514,6 @@ func (e *InvokerProcessBufferTaskExecutor) startWorkflowDeadline(
516514
return start.ActualTime.AsTime().Add(timeout)
517515
}
518516

519-
// startWorkflowSearchAttributes returns the search attributes to be applied to
520-
// workflows kicked off. Includes custom search attributes and Temporal-managed.
521-
func startWorkflowSearchAttributes(
522-
scheduler *Scheduler,
523-
nominal time.Time,
524-
) *commonpb.SearchAttributes {
525-
attributes := scheduler.Schedule.GetAction().GetStartWorkflow().GetSearchAttributes()
526-
527-
fields := util.CloneMapNonNil(attributes.GetIndexedFields())
528-
if p, err := payload.Encode(nominal); err == nil {
529-
fields[sadefs.TemporalScheduledStartTime] = p
530-
}
531-
if p, err := payload.Encode(scheduler.ScheduleId); err == nil {
532-
fields[sadefs.TemporalScheduledById] = p
533-
}
534-
return &commonpb.SearchAttributes{
535-
IndexedFields: fields,
536-
}
537-
}
538-
539517
func (e *InvokerExecuteTaskExecutor) startWorkflow(
540518
ctx context.Context,
541519
scheduler *Scheduler,
@@ -574,7 +552,7 @@ func (e *InvokerExecuteTaskExecutor) startWorkflow(
574552
Namespace: scheduler.Namespace,
575553
RequestId: start.RequestId,
576554
RetryPolicy: requestSpec.RetryPolicy,
577-
SearchAttributes: startWorkflowSearchAttributes(scheduler, start.NominalTime.AsTime()),
555+
SearchAttributes: scheduler.startWorkflowSearchAttributes(start.NominalTime.AsTime()),
578556
TaskQueue: requestSpec.TaskQueue,
579557
UserMetadata: requestSpec.UserMetadata,
580558
WorkflowExecutionTimeout: requestSpec.WorkflowExecutionTimeout,

chasm/lib/scheduler/scheduler.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ import (
1818
"go.temporal.io/server/chasm"
1919
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
2020
"go.temporal.io/server/common"
21+
"go.temporal.io/server/common/payload"
22+
"go.temporal.io/server/common/searchattribute/sadefs"
2123
"go.temporal.io/server/common/util"
2224
"go.temporal.io/server/service/worker/scheduler"
25+
"google.golang.org/protobuf/proto"
2326
"google.golang.org/protobuf/types/known/timestamppb"
2427
)
2528

@@ -63,7 +66,7 @@ const (
6366

6467
var (
6568
ErrConflictTokenMismatch = serviceerror.NewFailedPrecondition("mismatched conflict token")
66-
ErrClosed = serviceerror.NewInvalidArgument("schedule closed")
69+
ErrClosed = serviceerror.NewFailedPrecondition("schedule closed")
6770
ErrUnprocessable = serviceerror.NewInternal("unprocessable schedule")
6871
)
6972

@@ -615,33 +618,25 @@ func (s *Scheduler) SearchAttributes(chasm.Context) []chasm.SearchAttributeKeyVa
615618
func (s *Scheduler) Memo(
616619
ctx chasm.Context,
617620
) map[string]chasm.VisibilityValue {
618-
newInfo, err := s.ListInfo(ctx)
619-
if err != nil {
620-
// Unable to retrieve list info. Return nil to skip memo update.
621-
// Error will be logged once loggers are available in CHASM.
622-
return nil
623-
}
621+
newInfo := s.ListInfo(ctx)
624622

625-
payload, err := newInfo.Marshal()
623+
infoPayload, err := proto.MarshalOptions{
624+
Deterministic: true,
625+
}.Marshal(newInfo)
626626
if err != nil {
627-
// Unable to marshal list info. Return nil to skip memo update.
628-
// Error will be logged once loggers are available in CHASM.
629627
return nil
630628
}
631629

632-
// TODO - Memo provider is being updated to support protobufs, byte slices
633-
// may cause oscillating writes to visibility given that proto serialization is
634-
// non-deterministic.
635630
return map[string]chasm.VisibilityValue{
636-
visibilityMemoFieldInfo: chasm.VisibilityValueByteSlice(payload),
631+
visibilityMemoFieldInfo: chasm.VisibilityValueByteSlice(infoPayload),
637632
}
638633
}
639634

640635
// ListInfo returns the ScheduleListInfo, used as the visibility memo, and to
641636
// answer List queries.
642637
func (s *Scheduler) ListInfo(
643638
ctx chasm.Context,
644-
) (*schedulepb.ScheduleListInfo, error) {
639+
) *schedulepb.ScheduleListInfo {
645640
spec := common.CloneProto(s.Schedule.Spec)
646641

647642
// Clear fields that are too large/not useful for the list view.
@@ -661,5 +656,24 @@ func (s *Scheduler) ListInfo(
661656
Paused: s.Schedule.State.Paused,
662657
RecentActions: util.SliceTail(s.Info.RecentActions, recentActionCount),
663658
FutureActionTimes: generator.FutureActionTimes,
664-
}, nil
659+
}
660+
}
661+
662+
// startWorkflowSearchAttributes returns the search attributes to be applied to
663+
// workflows kicked off. Includes custom search attributes and Temporal-managed.
664+
func (s *Scheduler) startWorkflowSearchAttributes(
665+
nominal time.Time,
666+
) *commonpb.SearchAttributes {
667+
attributes := s.Schedule.GetAction().GetStartWorkflow().GetSearchAttributes()
668+
669+
fields := util.CloneMapNonNil(attributes.GetIndexedFields())
670+
if p, err := payload.Encode(nominal); err == nil {
671+
fields[sadefs.TemporalScheduledStartTime] = p
672+
}
673+
if p, err := payload.Encode(s.ScheduleId); err == nil {
674+
fields[sadefs.TemporalScheduledById] = p
675+
}
676+
return &commonpb.SearchAttributes{
677+
IndexedFields: fields,
678+
}
665679
}

chasm/lib/scheduler/scheduler_suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ func (s *schedulerSuite) SetupTest() {
9393

9494
// Set up future action times.
9595
futureTime := now.Add(time.Hour)
96-
s.specProcessor.EXPECT().GetNextTime(s.scheduler, gomock.Any()).Return(legacyscheduler.GetNextTimeResult{
96+
s.specProcessor.EXPECT().NextTime(s.scheduler, gomock.Any()).Return(legacyscheduler.GetNextTimeResult{
9797
Next: futureTime,
9898
Nominal: futureTime,
9999
}, nil).MaxTimes(1)
100-
s.specProcessor.EXPECT().GetNextTime(s.scheduler, gomock.Any()).Return(legacyscheduler.GetNextTimeResult{}, nil).AnyTimes()
100+
s.specProcessor.EXPECT().NextTime(s.scheduler, gomock.Any()).Return(legacyscheduler.GetNextTimeResult{}, nil).AnyTimes()
101101
}
102102

103103
// hasTask returns true if the given task type was added at the end of the

chasm/lib/scheduler/scheduler_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ func TestListInfo(t *testing.T) {
1616
expectedFutureTimes := []*timestamppb.Timestamp{timestamppb.Now(), timestamppb.Now()}
1717
generator.FutureActionTimes = expectedFutureTimes
1818

19-
listInfo, err := scheduler.ListInfo(ctx)
20-
require.NoError(t, err)
19+
listInfo := scheduler.ListInfo(ctx)
2120

2221
// Should return a populated info block.
2322
require.NotNil(t, listInfo)

chasm/lib/scheduler/spec_processor.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ type (
3737
limit *int,
3838
) (*ProcessedTimeRange, error)
3939

40-
// GetNextTime provides a peek at the next time in the spec following 'after'.
41-
GetNextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error)
40+
// NextTime provides a peek at the next time in the spec following 'after'.
41+
NextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error)
4242
}
4343

4444
SpecProcessorImpl struct {
@@ -94,7 +94,7 @@ func (s *SpecProcessorImpl) ProcessTimeRange(
9494
// Manual (backfill/patch) runs are always buffered here.
9595
if !scheduler.useScheduledAction(false) && !manual {
9696
// Use end as last action time so that we don't reprocess time spent paused.
97-
next, err := s.GetNextTime(scheduler, end)
97+
next, err := s.NextTime(scheduler, end)
9898
if err != nil {
9999
return nil, err
100100
}
@@ -117,7 +117,7 @@ func (s *SpecProcessorImpl) ProcessTimeRange(
117117
var next legacyscheduler.GetNextTimeResult
118118
var err error
119119
var bufferedStarts []*schedulespb.BufferedStart
120-
for next, err = s.GetNextTime(scheduler, start); err == nil && (!next.Next.IsZero() && !next.Next.After(end)); next, err = s.GetNextTime(scheduler, next.Next) {
120+
for next, err = s.NextTime(scheduler, start); err == nil && (!next.Next.IsZero() && !next.Next.After(end)); next, err = s.NextTime(scheduler, next.Next) {
121121
lastAction = next.Next
122122

123123
if scheduler.Info.UpdateTime.AsTime().After(next.Next) {
@@ -172,8 +172,8 @@ func catchupWindow(s *Scheduler, tweakables Tweakables) time.Duration {
172172
return max(cw.AsDuration(), tweakables.MinCatchupWindow)
173173
}
174174

175-
// GetNextTime returns the next time result, or an error if the schedule cannot be compiled.
176-
func (s *SpecProcessorImpl) GetNextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error) {
175+
// NextTime returns the next time result, or an error if the schedule cannot be compiled.
176+
func (s *SpecProcessorImpl) NextTime(scheduler *Scheduler, after time.Time) (legacyscheduler.GetNextTimeResult, error) {
177177
spec, err := scheduler.getCompiledSpec(s.specBuilder)
178178
if err != nil {
179179
s.logger.Error("Invalid schedule", tag.Error(err))

chasm/lib/scheduler/spec_processor_mock.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)