Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chasm/lib/callback/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/nexus"
Expand Down Expand Up @@ -69,7 +70,7 @@ type AddressMatchRules struct {

func (a AddressMatchRules) Validate(rawURL string) error {
// Exact match only; no path, query, or fragment allowed for system URL
if rawURL == nexus.SystemCallbackURL {
if rawURL == nexus.SystemCallbackURL || rawURL == chasmnexus.CompletionHandlerURL {
return nil
}
u, err := url.Parse(rawURL)
Expand Down
4 changes: 2 additions & 2 deletions chasm/lib/scheduler/gen/schedulerpb/v1/service_client.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions chasm/lib/scheduler/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func NewLibrary(
}

func (l *Library) Name() string {
return "scheduler"
return chasm.SchedulerLibraryName
}

func (l *Library) Components() []*chasm.RegistrableComponent {
return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*Scheduler]("scheduler"),
chasm.NewRegistrableComponent[*Scheduler](chasm.SchedulerComponentName),
chasm.NewRegistrableComponent[*Generator]("generator"),
chasm.NewRegistrableComponent[*Invoker]("invoker"),
chasm.NewRegistrableComponent[*Backfiller]("backfiller"),
Expand Down
108 changes: 100 additions & 8 deletions chasm/lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common"
"go.temporal.io/server/common/searchattribute/sadefs"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/worker/scheduler"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -92,8 +94,8 @@ func NewScheduler(
Backfillers: make(chasm.Map[string, *Backfiller]),
LastCompletionResult: chasm.NewDataField(ctx, &schedulerpb.LastCompletionResult{}),
}
sched.setNullableFields()
sched.Info.CreateTime = timestamppb.New(ctx.Now(sched))
sched.Schedule.State = &schedulepb.ScheduleState{}

invoker := NewInvoker(ctx, sched)
sched.Invoker = chasm.NewComponentField(ctx, invoker)
Expand All @@ -109,6 +111,16 @@ func NewScheduler(
return sched
}

// setNullableFields sets fields that are nullable in API requests.
func (s *Scheduler) setNullableFields() {
if s.Schedule.Policies == nil {
s.Schedule.Policies = &schedulepb.SchedulePolicies{}
}
if s.Schedule.State == nil {
s.Schedule.State = &schedulepb.ScheduleState{}
}
}

// handlePatch creates backfillers to fulfill the given patch request.
func (s *Scheduler) handlePatch(ctx chasm.MutableContext, patch *schedulepb.SchedulePatch) {
if patch != nil {
Expand Down Expand Up @@ -523,16 +535,72 @@ func (s *Scheduler) Describe(
ctx chasm.Context,
req *schedulerpb.DescribeScheduleRequest,
) (*schedulerpb.DescribeScheduleResponse, error) {
if s.Closed {
return nil, ErrClosed
}

visibility := s.Visibility.Get(ctx)
memo := visibility.GetMemo(ctx)
delete(memo, visibilityMemoFieldInfo) // We don't need to return a redundant info block.

attributes := visibility.GetSearchAttributes(ctx)
delete(attributes, sadefs.TemporalNamespaceDivision)
delete(attributes, sadefs.TemporalSchedulePaused)
Comment on lines +547 to +548
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These shouldn't come back from visibility... they are system fields, not user fields.
Are you seeing those come back?


if s.Schedule.Policies == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call setNullableFields again here?

s.Schedule.Policies = &schedulepb.SchedulePolicies{}
}
if s.Schedule.GetPolicies().GetOverlapPolicy() == enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED {
s.Schedule.Policies.OverlapPolicy = s.overlapPolicy()
}
if !s.Schedule.GetPolicies().GetCatchupWindow().IsValid() {
// TODO - this should be set from Tweakables.DefaultCatchupWindow.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass this into the function?

s.Schedule.Policies.CatchupWindow = durationpb.New(365 * 24 * time.Hour)
}

schedule := common.CloneProto(s.Schedule)
cleanSpec(schedule.Spec)

return &schedulerpb.DescribeScheduleResponse{
FrontendResponse: &workflowservice.DescribeScheduleResponse{
Schedule: common.CloneProto(s.Schedule),
Info: common.CloneProto(s.Info),
ConflictToken: s.generateConflictToken(),
// TODO - memo and search_attributes are handled by visibility (separate PR)
Schedule: schedule,
Info: common.CloneProto(s.Info),
ConflictToken: s.generateConflictToken(),
Memo: &commonpb.Memo{Fields: memo},
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: attributes},
},
}, nil
}

// cleanSpec sets default values in ranges for the DescribeSchedule response.
func cleanSpec(spec *schedulepb.ScheduleSpec) {
cleanRanges := func(ranges []*schedulepb.Range) {
for _, r := range ranges {
if r.End < r.Start {
r.End = r.Start
}
if r.Step == 0 {
r.Step = 1
}
}
}
cleanCal := func(structured *schedulepb.StructuredCalendarSpec) {
cleanRanges(structured.Second)
cleanRanges(structured.Minute)
cleanRanges(structured.Hour)
cleanRanges(structured.DayOfMonth)
cleanRanges(structured.Month)
cleanRanges(structured.Year)
cleanRanges(structured.DayOfWeek)
}
for _, structured := range spec.StructuredCalendar {
cleanCal(structured)
}
for _, structured := range spec.ExcludeStructuredCalendar {
cleanCal(structured)
}
}

// Delete marks the Scheduler as closed without an idle timer.
func (s *Scheduler) Delete(
ctx chasm.MutableContext,
Expand All @@ -557,13 +625,32 @@ func (s *Scheduler) Update(
//
// TODO - we could also easily support allowing the customer to update their
// memo here.
visibility := s.Visibility.Get(ctx)
visibility.SetSearchAttributes(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields())
if req.FrontendRequest.GetSearchAttributes() != nil {
// To preserve compatibility with V1 scheduler, we do a full replacement
// of search attributes, dropping any that aren't a part of the update's
// `CustomSearchAttributes` map. Search attribute replacement is ignored entirely
// when that map is unset, however, an allocated yet empty map will clear all
// attributes.

// Preserve the old custom memo in the new Visibility component.
oldVisibility := s.Visibility.Get(ctx)
oldMemo := oldVisibility.GetMemo(ctx)

visibility := chasm.NewVisibility(ctx)
s.Visibility = chasm.NewComponentField(ctx, visibility)
visibility.SetSearchAttributes(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields())
visibility.SetMemo(ctx, oldMemo)
Comment on lines +639 to +642
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
visibility := chasm.NewVisibility(ctx)
s.Visibility = chasm.NewComponentField(ctx, visibility)
visibility.SetSearchAttributes(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields())
visibility.SetMemo(ctx, oldMemo)
visibility := chasm.NewVisibilityWithData(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields(), oldMemo)
s.Visibility = chasm.NewComponentField(ctx, visibility)

}

s.Schedule = common.CloneProto(req.FrontendRequest.Schedule)
s.Schedule = req.FrontendRequest.Schedule
s.setNullableFields()
s.Info.UpdateTime = timestamppb.New(ctx.Now(s))
s.updateConflictToken()

// Since the spec may have been updated, kick off the generator.
generator := s.Generator.Get(ctx)
generator.Generate(ctx)

return &schedulerpb.UpdateScheduleResponse{
FrontendResponse: &workflowservice.UpdateScheduleResponse{},
}, nil
Expand Down Expand Up @@ -601,6 +688,11 @@ func (s *Scheduler) generateConflictToken() []byte {
}

func (s *Scheduler) validateConflictToken(token []byte) bool {
// When unset in mutate requests, the schedule should update unconditionally.
if token == nil {
return true
}

current := s.generateConflictToken()
return bytes.Equal(current, token)
}
Expand Down
4 changes: 2 additions & 2 deletions chasm/lib/tests/gen/testspb/v1/service_client.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions chasm/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package chasm
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda hate that we have to put this here but this in not blocking the PR.


// Scheduler's library and component name constants are exported here, as they
// are used as part of old VisibilityManager queries (e.g., visibility queries
// out-of-band of CHASM). Most components shouldn't have to define, or export,
// these names.
const (
SchedulerLibraryName = "scheduler"
SchedulerComponentName = "scheduler"
)

var (
SchedulerArchetype = Archetype(fullyQualifiedName(SchedulerLibraryName, SchedulerComponentName))
SchedulerArchetypeID = ArchetypeID(generateTypeID(SchedulerArchetype))
)
4 changes: 2 additions & 2 deletions cmd/tools/protoc-gen-go-chasm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (p *Plugin) genClient(w *writer, svc *protogen.Service) error {

ctorName := fmt.Sprintf("New%s", structName)
w.println("// %s initializes a new %s.", ctorName, structName)
w.println("func New%s(", ctorName)
w.println("func %s(", ctorName)
w.indent()
w.println("dc *dynamicconfig.Collection,")
w.println("rpcFactory common.RPCFactory,")
Expand All @@ -201,7 +201,7 @@ func (p *Plugin) genClient(w *writer, svc *protogen.Service) error {
w.println("logger log.Logger,")
w.println("metricsHandler metrics.Handler,")
w.unindent()
w.println(") (*%s, error) {", structName)
w.println(") (%sClient, error) {", svc.GoName)
w.indent() // start ctor body
w.println("resolver, err := monitor.GetResolver(primitives.HistoryService)")
w.println("if err != nil {")
Expand Down
6 changes: 3 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,9 @@ func CheckEventBlobSizeLimit(
if actualSize > warnLimit {
if logger != nil {
logger.Warn("Blob data size exceeds the warning limit.",
tag.WorkflowNamespace(namespace),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.WorkflowNamespace(namespace), // TODO: Not necessarily a "workflow" namespace, fix the tag.
tag.WorkflowID(workflowID), // TODO: this should be entity ID and we need an archetype too.
tag.WorkflowRunID(runID), // TODO: not necessarily a workflow run ID, fix the tag.
tag.WorkflowSize(int64(actualSize)),
blobSizeViolationOperationTag)
}
Expand Down
3 changes: 2 additions & 1 deletion components/callbacks/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"fmt"
"io"
"strings"

"github.com/google/uuid"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -44,7 +45,7 @@ func logInternalError(logger log.Logger, internalMsg string, internalErr error)

func (c chasmInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult {
// Get back the base64-encoded ComponentRef from the header.
encodedRef, ok := c.nexus.GetHeader()[commonnexus.CallbackTokenHeader]
encodedRef, ok := c.nexus.GetHeader()[strings.ToLower(commonnexus.CallbackTokenHeader)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fixing this in #8692 FYI.

if !ok {
return invocationResultFail{logInternalError(e.Logger, "callback missing token", nil)}
}
Expand Down
3 changes: 2 additions & 1 deletion components/callbacks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

chasmnexus "go.temporal.io/server/chasm/nexus"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/nexus"
Expand Down Expand Up @@ -69,7 +70,7 @@ type AddressMatchRules struct {

func (a AddressMatchRules) Validate(rawURL string) error {
// Exact match only; no path, query, or fragment allowed for system URL
if rawURL == nexus.SystemCallbackURL {
if rawURL == nexus.SystemCallbackURL || rawURL == chasmnexus.CompletionHandlerURL {
return nil
}
u, err := url.Parse(rawURL)
Expand Down
4 changes: 4 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/gorilla/mux"
"go.temporal.io/server/api/adminservice/v1"
schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -109,6 +110,7 @@ var Module = fx.Options(
fx.Provide(NexusEndpointRegistryProvider),
fx.Invoke(ServiceLifetimeHooks),
fx.Invoke(EndpointRegistryLifetimeHooks),
fx.Provide(schedulerpb.NewSchedulerServiceLayeredClient),
nexusfrontend.Module,
)

Expand Down Expand Up @@ -739,6 +741,7 @@ func HandlerProvider(
matchingClient resource.MatchingClient,
deploymentStoreClient deployment.DeploymentStoreClient,
workerDeploymentStoreClient workerdeployment.Client,
schedulerClient schedulerpb.SchedulerServiceClient,
archiverProvider provider.ArchiverProvider,
metricsHandler metrics.Handler,
payloadSerializer serialization.Serializer,
Expand Down Expand Up @@ -766,6 +769,7 @@ func HandlerProvider(
matchingClient,
deploymentStoreClient,
workerDeploymentStoreClient,
schedulerClient,
archiverProvider,
payloadSerializer,
namespaceRegistry,
Expand Down
6 changes: 5 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ type Config struct {
// Enable schedule-related RPCs
EnableSchedules dynamicconfig.BoolPropertyFnWithNamespaceFilter

// Enable creation of new schedules on CHASM (V2) engine
EnableCHASMSchedulerCreation dynamicconfig.BoolPropertyFnWithNamespaceFilter

// Enable deployment RPCs
EnableDeployments dynamicconfig.BoolPropertyFnWithNamespaceFilter

Expand Down Expand Up @@ -325,7 +328,8 @@ func NewConfig(

MaxFairnessWeightOverrideConfigLimit: dynamicconfig.MatchingMaxFairnessKeyWeightOverrides.Get(dc),

EnableSchedules: dynamicconfig.FrontendEnableSchedules.Get(dc),
EnableSchedules: dynamicconfig.FrontendEnableSchedules.Get(dc),
EnableCHASMSchedulerCreation: dynamicconfig.EnableCHASMSchedulerCreation.Get(dc),

// [cleanup-wv-pre-release]
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),
Expand Down
Loading
Loading