diff --git a/chasm/lib/callback/config.go b/chasm/lib/callback/config.go index db61f020352..3998a520e58 100644 --- a/chasm/lib/callback/config.go +++ b/chasm/lib/callback/config.go @@ -6,6 +6,7 @@ import ( "strings" "time" + chasmnexus "go.temporal.io/server/chasm/nexus" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/nexus" @@ -69,7 +70,7 @@ type AddressMatchRules struct { func (a AddressMatchRules) Validate(rawURL string) error { // Exact match only; no path, query, or fragment allowed for system URL - if rawURL == nexus.SystemCallbackURL { + if rawURL == nexus.SystemCallbackURL || rawURL == chasmnexus.CompletionHandlerURL { return nil } u, err := url.Parse(rawURL) diff --git a/chasm/lib/scheduler/gen/schedulerpb/v1/service_client.pb.go b/chasm/lib/scheduler/gen/schedulerpb/v1/service_client.pb.go index f8a9e7da138..c9b61bc6a9c 100644 --- a/chasm/lib/scheduler/gen/schedulerpb/v1/service_client.pb.go +++ b/chasm/lib/scheduler/gen/schedulerpb/v1/service_client.pb.go @@ -27,14 +27,14 @@ type SchedulerServiceLayeredClient struct { } // NewSchedulerServiceLayeredClient initializes a new SchedulerServiceLayeredClient. -func NewNewSchedulerServiceLayeredClient( +func NewSchedulerServiceLayeredClient( dc *dynamicconfig.Collection, rpcFactory common.RPCFactory, monitor membership.Monitor, config *config.Persistence, logger log.Logger, metricsHandler metrics.Handler, -) (*SchedulerServiceLayeredClient, error) { +) (SchedulerServiceClient, error) { resolver, err := monitor.GetResolver(primitives.HistoryService) if err != nil { return nil, err diff --git a/chasm/lib/scheduler/library.go b/chasm/lib/scheduler/library.go index 4b7b4f23e68..56b7a2996a3 100644 --- a/chasm/lib/scheduler/library.go +++ b/chasm/lib/scheduler/library.go @@ -39,12 +39,12 @@ func NewLibrary( } func (l *Library) Name() string { - return "scheduler" + return chasm.SchedulerLibraryName } func (l *Library) Components() []*chasm.RegistrableComponent { return []*chasm.RegistrableComponent{ - chasm.NewRegistrableComponent[*Scheduler]("scheduler"), + chasm.NewRegistrableComponent[*Scheduler](chasm.SchedulerComponentName), chasm.NewRegistrableComponent[*Generator]("generator"), chasm.NewRegistrableComponent[*Invoker]("invoker"), chasm.NewRegistrableComponent[*Backfiller]("backfiller"), diff --git a/chasm/lib/scheduler/scheduler.go b/chasm/lib/scheduler/scheduler.go index e35bdc8c776..aee776b8199 100644 --- a/chasm/lib/scheduler/scheduler.go +++ b/chasm/lib/scheduler/scheduler.go @@ -19,8 +19,10 @@ import ( "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" chasmnexus "go.temporal.io/server/chasm/nexus" "go.temporal.io/server/common" + "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/util" "go.temporal.io/server/service/worker/scheduler" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -92,8 +94,8 @@ func NewScheduler( Backfillers: make(chasm.Map[string, *Backfiller]), LastCompletionResult: chasm.NewDataField(ctx, &schedulerpb.LastCompletionResult{}), } + sched.setNullableFields() sched.Info.CreateTime = timestamppb.New(ctx.Now(sched)) - sched.Schedule.State = &schedulepb.ScheduleState{} invoker := NewInvoker(ctx, sched) sched.Invoker = chasm.NewComponentField(ctx, invoker) @@ -109,6 +111,16 @@ func NewScheduler( return sched } +// setNullableFields sets fields that are nullable in API requests. +func (s *Scheduler) setNullableFields() { + if s.Schedule.Policies == nil { + s.Schedule.Policies = &schedulepb.SchedulePolicies{} + } + if s.Schedule.State == nil { + s.Schedule.State = &schedulepb.ScheduleState{} + } +} + // handlePatch creates backfillers to fulfill the given patch request. func (s *Scheduler) handlePatch(ctx chasm.MutableContext, patch *schedulepb.SchedulePatch) { if patch != nil { @@ -523,16 +535,72 @@ func (s *Scheduler) Describe( ctx chasm.Context, req *schedulerpb.DescribeScheduleRequest, ) (*schedulerpb.DescribeScheduleResponse, error) { + if s.Closed { + return nil, ErrClosed + } + + visibility := s.Visibility.Get(ctx) + memo := visibility.GetMemo(ctx) + delete(memo, visibilityMemoFieldInfo) // We don't need to return a redundant info block. + + attributes := visibility.GetSearchAttributes(ctx) + delete(attributes, sadefs.TemporalNamespaceDivision) + delete(attributes, sadefs.TemporalSchedulePaused) + + if s.Schedule.Policies == nil { + s.Schedule.Policies = &schedulepb.SchedulePolicies{} + } + if s.Schedule.GetPolicies().GetOverlapPolicy() == enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED { + s.Schedule.Policies.OverlapPolicy = s.overlapPolicy() + } + if !s.Schedule.GetPolicies().GetCatchupWindow().IsValid() { + // TODO - this should be set from Tweakables.DefaultCatchupWindow. + s.Schedule.Policies.CatchupWindow = durationpb.New(365 * 24 * time.Hour) + } + + schedule := common.CloneProto(s.Schedule) + cleanSpec(schedule.Spec) + return &schedulerpb.DescribeScheduleResponse{ FrontendResponse: &workflowservice.DescribeScheduleResponse{ - Schedule: common.CloneProto(s.Schedule), - Info: common.CloneProto(s.Info), - ConflictToken: s.generateConflictToken(), - // TODO - memo and search_attributes are handled by visibility (separate PR) + Schedule: schedule, + Info: common.CloneProto(s.Info), + ConflictToken: s.generateConflictToken(), + Memo: &commonpb.Memo{Fields: memo}, + SearchAttributes: &commonpb.SearchAttributes{IndexedFields: attributes}, }, }, nil } +// cleanSpec sets default values in ranges for the DescribeSchedule response. +func cleanSpec(spec *schedulepb.ScheduleSpec) { + cleanRanges := func(ranges []*schedulepb.Range) { + for _, r := range ranges { + if r.End < r.Start { + r.End = r.Start + } + if r.Step == 0 { + r.Step = 1 + } + } + } + cleanCal := func(structured *schedulepb.StructuredCalendarSpec) { + cleanRanges(structured.Second) + cleanRanges(structured.Minute) + cleanRanges(structured.Hour) + cleanRanges(structured.DayOfMonth) + cleanRanges(structured.Month) + cleanRanges(structured.Year) + cleanRanges(structured.DayOfWeek) + } + for _, structured := range spec.StructuredCalendar { + cleanCal(structured) + } + for _, structured := range spec.ExcludeStructuredCalendar { + cleanCal(structured) + } +} + // Delete marks the Scheduler as closed without an idle timer. func (s *Scheduler) Delete( ctx chasm.MutableContext, @@ -557,13 +625,32 @@ func (s *Scheduler) Update( // // TODO - we could also easily support allowing the customer to update their // memo here. - visibility := s.Visibility.Get(ctx) - visibility.SetSearchAttributes(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields()) + if req.FrontendRequest.GetSearchAttributes() != nil { + // To preserve compatibility with V1 scheduler, we do a full replacement + // of search attributes, dropping any that aren't a part of the update's + // `CustomSearchAttributes` map. Search attribute replacement is ignored entirely + // when that map is unset, however, an allocated yet empty map will clear all + // attributes. + + // Preserve the old custom memo in the new Visibility component. + oldVisibility := s.Visibility.Get(ctx) + oldMemo := oldVisibility.GetMemo(ctx) + + visibility := chasm.NewVisibility(ctx) + s.Visibility = chasm.NewComponentField(ctx, visibility) + visibility.SetSearchAttributes(ctx, req.FrontendRequest.GetSearchAttributes().GetIndexedFields()) + visibility.SetMemo(ctx, oldMemo) + } - s.Schedule = common.CloneProto(req.FrontendRequest.Schedule) + s.Schedule = req.FrontendRequest.Schedule + s.setNullableFields() s.Info.UpdateTime = timestamppb.New(ctx.Now(s)) s.updateConflictToken() + // Since the spec may have been updated, kick off the generator. + generator := s.Generator.Get(ctx) + generator.Generate(ctx) + return &schedulerpb.UpdateScheduleResponse{ FrontendResponse: &workflowservice.UpdateScheduleResponse{}, }, nil @@ -601,6 +688,11 @@ func (s *Scheduler) generateConflictToken() []byte { } func (s *Scheduler) validateConflictToken(token []byte) bool { + // When unset in mutate requests, the schedule should update unconditionally. + if token == nil { + return true + } + current := s.generateConflictToken() return bytes.Equal(current, token) } diff --git a/chasm/lib/tests/gen/testspb/v1/service_client.pb.go b/chasm/lib/tests/gen/testspb/v1/service_client.pb.go index d9b1205bbe1..fbeff6fc99b 100644 --- a/chasm/lib/tests/gen/testspb/v1/service_client.pb.go +++ b/chasm/lib/tests/gen/testspb/v1/service_client.pb.go @@ -28,14 +28,14 @@ type TestServiceLayeredClient struct { } // NewTestServiceLayeredClient initializes a new TestServiceLayeredClient. -func NewNewTestServiceLayeredClient( +func NewTestServiceLayeredClient( dc *dynamicconfig.Collection, rpcFactory common.RPCFactory, monitor membership.Monitor, config *config.Persistence, logger log.Logger, metricsHandler metrics.Handler, -) (*TestServiceLayeredClient, error) { +) (TestServiceClient, error) { resolver, err := monitor.GetResolver(primitives.HistoryService) if err != nil { return nil, err diff --git a/chasm/scheduler.go b/chasm/scheduler.go new file mode 100644 index 00000000000..c0412db5433 --- /dev/null +++ b/chasm/scheduler.go @@ -0,0 +1,15 @@ +package chasm + +// Scheduler's library and component name constants are exported here, as they +// are used as part of old VisibilityManager queries (e.g., visibility queries +// out-of-band of CHASM). Most components shouldn't have to define, or export, +// these names. +const ( + SchedulerLibraryName = "scheduler" + SchedulerComponentName = "scheduler" +) + +var ( + SchedulerArchetype = Archetype(fullyQualifiedName(SchedulerLibraryName, SchedulerComponentName)) + SchedulerArchetypeID = ArchetypeID(generateTypeID(SchedulerArchetype)) +) diff --git a/cmd/tools/protoc-gen-go-chasm/main.go b/cmd/tools/protoc-gen-go-chasm/main.go index 098e45cc502..feb90ba1b2e 100644 --- a/cmd/tools/protoc-gen-go-chasm/main.go +++ b/cmd/tools/protoc-gen-go-chasm/main.go @@ -192,7 +192,7 @@ func (p *Plugin) genClient(w *writer, svc *protogen.Service) error { ctorName := fmt.Sprintf("New%s", structName) w.println("// %s initializes a new %s.", ctorName, structName) - w.println("func New%s(", ctorName) + w.println("func %s(", ctorName) w.indent() w.println("dc *dynamicconfig.Collection,") w.println("rpcFactory common.RPCFactory,") @@ -201,7 +201,7 @@ func (p *Plugin) genClient(w *writer, svc *protogen.Service) error { w.println("logger log.Logger,") w.println("metricsHandler metrics.Handler,") w.unindent() - w.println(") (*%s, error) {", structName) + w.println(") (%sClient, error) {", svc.GoName) w.indent() // start ctor body w.println("resolver, err := monitor.GetResolver(primitives.HistoryService)") w.println("if err != nil {") diff --git a/common/util.go b/common/util.go index 02aa9e93d13..8b450fbde75 100644 --- a/common/util.go +++ b/common/util.go @@ -592,9 +592,9 @@ func CheckEventBlobSizeLimit( if actualSize > warnLimit { if logger != nil { logger.Warn("Blob data size exceeds the warning limit.", - tag.WorkflowNamespace(namespace), - tag.WorkflowID(workflowID), - tag.WorkflowRunID(runID), + tag.WorkflowNamespace(namespace), // TODO: Not necessarily a "workflow" namespace, fix the tag. + tag.WorkflowID(workflowID), // TODO: this should be entity ID and we need an archetype too. + tag.WorkflowRunID(runID), // TODO: not necessarily a workflow run ID, fix the tag. tag.WorkflowSize(int64(actualSize)), blobSizeViolationOperationTag) } diff --git a/components/callbacks/chasm_invocation.go b/components/callbacks/chasm_invocation.go index a62f576587d..d1628653f06 100644 --- a/components/callbacks/chasm_invocation.go +++ b/components/callbacks/chasm_invocation.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "io" + "strings" "github.com/google/uuid" commonpb "go.temporal.io/api/common/v1" @@ -44,7 +45,7 @@ func logInternalError(logger log.Logger, internalMsg string, internalErr error) func (c chasmInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult { // Get back the base64-encoded ComponentRef from the header. - encodedRef, ok := c.nexus.GetHeader()[commonnexus.CallbackTokenHeader] + encodedRef, ok := c.nexus.GetHeader()[strings.ToLower(commonnexus.CallbackTokenHeader)] if !ok { return invocationResultFail{logInternalError(e.Logger, "callback missing token", nil)} } diff --git a/components/callbacks/config.go b/components/callbacks/config.go index 3e892b543f4..e6750f96a15 100644 --- a/components/callbacks/config.go +++ b/components/callbacks/config.go @@ -6,6 +6,7 @@ import ( "strings" "time" + chasmnexus "go.temporal.io/server/chasm/nexus" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/nexus" @@ -69,7 +70,7 @@ type AddressMatchRules struct { func (a AddressMatchRules) Validate(rawURL string) error { // Exact match only; no path, query, or fragment allowed for system URL - if rawURL == nexus.SystemCallbackURL { + if rawURL == nexus.SystemCallbackURL || rawURL == chasmnexus.CompletionHandlerURL { return nil } u, err := url.Parse(rawURL) diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 0dbfdff5eb5..282e77d4bdd 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -6,6 +6,7 @@ import ( "github.com/gorilla/mux" "go.temporal.io/server/api/adminservice/v1" + schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" @@ -109,6 +110,7 @@ var Module = fx.Options( fx.Provide(NexusEndpointRegistryProvider), fx.Invoke(ServiceLifetimeHooks), fx.Invoke(EndpointRegistryLifetimeHooks), + fx.Provide(schedulerpb.NewSchedulerServiceLayeredClient), nexusfrontend.Module, ) @@ -739,6 +741,7 @@ func HandlerProvider( matchingClient resource.MatchingClient, deploymentStoreClient deployment.DeploymentStoreClient, workerDeploymentStoreClient workerdeployment.Client, + schedulerClient schedulerpb.SchedulerServiceClient, archiverProvider provider.ArchiverProvider, metricsHandler metrics.Handler, payloadSerializer serialization.Serializer, @@ -766,6 +769,7 @@ func HandlerProvider( matchingClient, deploymentStoreClient, workerDeploymentStoreClient, + schedulerClient, archiverProvider, payloadSerializer, namespaceRegistry, diff --git a/service/frontend/service.go b/service/frontend/service.go index 6d652c56f4e..47378d17e6c 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -159,6 +159,9 @@ type Config struct { // Enable schedule-related RPCs EnableSchedules dynamicconfig.BoolPropertyFnWithNamespaceFilter + // Enable creation of new schedules on CHASM (V2) engine + EnableCHASMSchedulerCreation dynamicconfig.BoolPropertyFnWithNamespaceFilter + // Enable deployment RPCs EnableDeployments dynamicconfig.BoolPropertyFnWithNamespaceFilter @@ -325,7 +328,8 @@ func NewConfig( MaxFairnessWeightOverrideConfigLimit: dynamicconfig.MatchingMaxFairnessKeyWeightOverrides.Get(dc), - EnableSchedules: dynamicconfig.FrontendEnableSchedules.Get(dc), + EnableSchedules: dynamicconfig.FrontendEnableSchedules.Get(dc), + EnableCHASMSchedulerCreation: dynamicconfig.EnableCHASMSchedulerCreation.Get(dc), // [cleanup-wv-pre-release] EnableDeployments: dynamicconfig.EnableDeployments.Get(dc), diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index d04a56d9eb4..8401fce1c70 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -32,6 +32,7 @@ import ( "go.temporal.io/server/api/matchingservice/v1" schedulespb "go.temporal.io/server/api/schedule/v1" taskqueuespb "go.temporal.io/server/api/taskqueue/v1" + schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" "go.temporal.io/server/client/frontend" "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" @@ -129,6 +130,7 @@ type ( matchingClient matchingservice.MatchingServiceClient deploymentStoreClient deployment.DeploymentStoreClient workerDeploymentClient workerdeployment.Client + schedulerClient schedulerpb.SchedulerServiceClient archiverProvider provider.ArchiverProvider payloadSerializer serialization.Serializer namespaceRegistry namespace.Registry @@ -160,6 +162,7 @@ func NewWorkflowHandler( matchingClient matchingservice.MatchingServiceClient, deploymentStoreClient deployment.DeploymentStoreClient, workerDeploymentClient workerdeployment.Client, + schedulerClient schedulerpb.SchedulerServiceClient, archiverProvider provider.ArchiverProvider, payloadSerializer serialization.Serializer, namespaceRegistry namespace.Registry, @@ -200,6 +203,7 @@ func NewWorkflowHandler( matchingClient: matchingClient, deploymentStoreClient: deploymentStoreClient, workerDeploymentClient: workerDeploymentClient, + schedulerClient: schedulerClient, archiverProvider: archiverProvider, payloadSerializer: payloadSerializer, namespaceRegistry: namespaceRegistry, @@ -385,7 +389,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution( return nil, err } - wh.logger.Debug("Received StartWorkflowExecution.", tag.WorkflowID(request.GetWorkflowId())) + wh.logger.Debug("Received StartWorkflowExecution.", tag.WorkflowID(request.GetWorkflowId()), tag.WorkflowType(request.GetWorkflowType().GetName())) namespaceName := namespace.Name(request.GetNamespace()) @@ -3045,36 +3049,60 @@ func (wh *WorkflowHandler) ListTaskQueuePartitions(ctx context.Context, request }, err } -// Creates a new schedule. -func (wh *WorkflowHandler) CreateSchedule( +func (wh *WorkflowHandler) createScheduleCHASM( ctx context.Context, request *workflowservice.CreateScheduleRequest, ) (_ *workflowservice.CreateScheduleResponse, retError error) { - defer log.CapturePanic(wh.logger, &retError) - - if request == nil { - return nil, errRequestNotSet + namespaceName := namespace.Name(request.Namespace) + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespaceName) + if err != nil { + return nil, err } - if !wh.config.EnableSchedules(request.Namespace) { - return nil, errSchedulesNotAllowed + // Search attribute validation happens as part of unaliasing on the V1 codepath, + // must be done explicitly here (even though we aren't using the unaliased + // attributes). + if _, err = wh.unaliasedSearchAttributesFrom(request.GetSearchAttributes(), namespaceName); err != nil { + return nil, err } - workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId - - if err := wh.validateWorkflowID(workflowID); err != nil { - return nil, err + // Validate blob size limit here. In the V1 codepath, this is done automatically + // as part of StartWorkflowExecution, but here it must be done separately (to + // maintain equal payload size limits between versions). + switch action := request.GetSchedule().GetAction().GetAction().(type) { + case *schedulepb.ScheduleAction_StartWorkflow: + sizeLimitError := wh.config.BlobSizeLimitError(request.GetNamespace()) + sizeLimitWarn := wh.config.BlobSizeLimitWarn(request.GetNamespace()) + if err := common.CheckEventBlobSizeLimit( + action.StartWorkflow.GetInput().Size(), + sizeLimitWarn, + sizeLimitError, + namespaceID.String(), + request.ScheduleId, + "", // don't have runid yet + wh.metricsScope(ctx).WithTags(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), + wh.throttledLogger, + tag.BlobSizeViolationOperation("CreateSchedule"), + ); err != nil { + return nil, err + } + default: + return nil, serviceerror.NewInvalidArgument("Only StartWorkflow action is supported for schedules") } - wh.logger.Debug("Received CreateSchedule", tag.ScheduleID(request.ScheduleId)) + res, err := wh.schedulerClient.CreateSchedule(ctx, &schedulerpb.CreateScheduleRequest{ + NamespaceId: namespaceID.String(), + FrontendRequest: request, + }) + return res.GetFrontendResponse(), err - if request.GetRequestId() == "" { - return nil, errRequestIDNotSet - } +} - if len(request.GetRequestId()) > wh.config.MaxIDLengthLimit() { - return nil, errRequestIDTooLong - } +func (wh *WorkflowHandler) createScheduleWorkflow( + ctx context.Context, + request *workflowservice.CreateScheduleRequest, +) (_ *workflowservice.CreateScheduleResponse, retError error) { + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId namespaceName := namespace.Name(request.Namespace) namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespaceName) @@ -3082,20 +3110,6 @@ func (wh *WorkflowHandler) CreateSchedule( return nil, err } - // Check if CHASM scheduler experiment is enabled - if headers.IsExperimentRequested(ctx, ChasmSchedulerExperiment) && - wh.config.IsExperimentAllowed(ChasmSchedulerExperiment, namespaceName.String()) { - wh.logger.Debug("CHASM scheduler enabled for request", tag.ScheduleID(request.ScheduleId)) - } - - if request.Schedule == nil { - request.Schedule = &schedulepb.Schedule{} - } - err = wh.canonicalizeScheduleSpec(request.Schedule) - if err != nil { - return nil, err - } - // Add namespace division before unaliasing search attributes. searchattribute.AddSearchAttribute(&request.SearchAttributes, sadefs.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision)) @@ -3104,10 +3118,6 @@ func (wh *WorkflowHandler) CreateSchedule( return nil, err } - if err = wh.validateStartWorkflowArgsForSchedule(namespaceName, request.GetSchedule().GetAction().GetStartWorkflow()); err != nil { - return nil, err - } - // size limits will be validated on history. note that the start workflow request is // embedded in the schedule, which is in the scheduler input. so if the scheduler itself // doesn't exceed the limit, the started workflows should be safe as well. @@ -3165,6 +3175,68 @@ func (wh *WorkflowHandler) CreateSchedule( }, nil } +func (wh *WorkflowHandler) CreateSchedule( + ctx context.Context, + request *workflowservice.CreateScheduleRequest, +) (_ *workflowservice.CreateScheduleResponse, retError error) { + defer log.CapturePanic(wh.logger, &retError) + + if request == nil { + return nil, errRequestNotSet + } + + if !wh.config.EnableSchedules(request.Namespace) { + return nil, errSchedulesNotAllowed + } + + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + // We apply this validation to both V1 and V2 schedules, even though CHASM + // schedules don't need the workflow ID prefix, so that we can roll back to V1 and + // not overrun the limit. + if err := wh.validateWorkflowID(workflowID); err != nil { + return nil, err + } + + if request.GetRequestId() == "" { + return nil, errRequestIDNotSet + } + + if len(request.GetRequestId()) > wh.config.MaxIDLengthLimit() { + return nil, errRequestIDTooLong + } + + namespaceName := namespace.Name(request.Namespace) + + // Check if CHASM scheduler experiment is enabled. This is only explicitly + // checked for on create, since all other handlers must be capable of falling back + // to V1 for schedules that haven't been migrated to CHASM. + useChasmScheduler := (headers.IsExperimentRequested(ctx, ChasmSchedulerExperiment) && + wh.config.IsExperimentAllowed(ChasmSchedulerExperiment, namespaceName.String())) || + wh.config.EnableCHASMSchedulerCreation(namespaceName.String()) + + wh.logger.Debug("Received CreateSchedule", + tag.ScheduleID(request.ScheduleId), + tag.WorkflowNamespace(namespaceName.String()), + tag.NewBoolTag("chasm-enabled", useChasmScheduler)) + + if request.Schedule == nil { + request.Schedule = &schedulepb.Schedule{} + } + err := wh.canonicalizeScheduleSpec(request.Schedule) + if err != nil { + return nil, err + } + + if err = wh.validateStartWorkflowArgsForSchedule(namespaceName, request.GetSchedule().GetAction().GetStartWorkflow()); err != nil { + return nil, err + } + + if useChasmScheduler { + return wh.createScheduleCHASM(ctx, request) + } + return wh.createScheduleWorkflow(ctx, request) +} + // Validates inner start workflow request. Note that this can mutate search attributes if present. func (wh *WorkflowHandler) validateStartWorkflowArgsForSchedule( namespaceName namespace.Name, @@ -3593,6 +3665,32 @@ func (wh *WorkflowHandler) UpdateWorkerDeploymentVersionMetadata(ctx context.Con func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workflowservice.DescribeScheduleRequest) (_ *workflowservice.DescribeScheduleResponse, retError error) { defer log.CapturePanic(wh.logger, &retError) + // Prefer CHASM scheduler if enabled. + resp, err := wh.describeScheduleCHASM(ctx, request) + if err == nil { + return resp, nil + } + var notFoundErr *serviceerror.NotFound + if !errors.As(err, ¬FoundErr) { + return nil, err + } + return wh.describeScheduleWorkflow(ctx, request) +} + +func (wh *WorkflowHandler) describeScheduleCHASM(ctx context.Context, request *workflowservice.DescribeScheduleRequest) (*workflowservice.DescribeScheduleResponse, error) { + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) + if err != nil { + return nil, err + } + + res, err := wh.schedulerClient.DescribeSchedule(ctx, &schedulerpb.DescribeScheduleRequest{ + FrontendRequest: request, + NamespaceId: namespaceID.String(), + }) + return res.GetFrontendResponse(), err +} + +func (wh *WorkflowHandler) describeScheduleWorkflow(ctx context.Context, request *workflowservice.DescribeScheduleRequest) (*workflowservice.DescribeScheduleResponse, error) { if request == nil { return nil, errRequestNotSet } @@ -3789,14 +3887,45 @@ func (wh *WorkflowHandler) UpdateSchedule( ) (_ *workflowservice.UpdateScheduleResponse, retError error) { defer log.CapturePanic(wh.logger, &retError) + if !wh.config.EnableSchedules(request.Namespace) { + return nil, errSchedulesNotAllowed + } + if request == nil { return nil, errRequestNotSet } - if !wh.config.EnableSchedules(request.Namespace) { - return nil, errSchedulesNotAllowed + res, err := wh.updateScheduleCHASM(ctx, request) + if err == nil { + return res, nil } + return wh.updateScheduleWorkflow(ctx, request) +} + +func (wh *WorkflowHandler) updateScheduleCHASM( + ctx context.Context, + request *workflowservice.UpdateScheduleRequest, +) (*workflowservice.UpdateScheduleResponse, error) { + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) + if err != nil { + return nil, err + } + + res, err := wh.schedulerClient.UpdateSchedule(ctx, &schedulerpb.UpdateScheduleRequest{ + FrontendRequest: request, + NamespaceId: namespaceID.String(), + }) + if err != nil { + return nil, err + } + return res.GetFrontendResponse(), err +} + +func (wh *WorkflowHandler) updateScheduleWorkflow( + ctx context.Context, + request *workflowservice.UpdateScheduleRequest, +) (*workflowservice.UpdateScheduleResponse, error) { if len(request.GetRequestId()) > wh.config.MaxIDLengthLimit() { return nil, errRequestIDTooLong } @@ -3879,7 +4008,10 @@ func (wh *WorkflowHandler) UpdateSchedule( } // Makes a specific change to a schedule or triggers an immediate action. -func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflowservice.PatchScheduleRequest) (_ *workflowservice.PatchScheduleResponse, retError error) { +func (wh *WorkflowHandler) PatchSchedule( + ctx context.Context, + request *workflowservice.PatchScheduleRequest, +) (_ *workflowservice.PatchScheduleResponse, retError error) { defer log.CapturePanic(wh.logger, &retError) if request == nil { @@ -3894,13 +4026,6 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows return nil, errRequestIDTooLong } - workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId - - namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) - if err != nil { - return nil, err - } - if len(request.Patch.Pause) > common.ScheduleNotesSizeLimit || len(request.Patch.Unpause) > common.ScheduleNotesSizeLimit { return nil, errNotesTooLong @@ -3910,6 +4035,24 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows trigger.ScheduledTime = timestamppb.Now() } + res, err := wh.patchScheduleCHASM(ctx, request) + if err == nil { + return res, nil + } + + return wh.patchScheduleWorkflow(ctx, request) +} + +func (wh *WorkflowHandler) patchScheduleWorkflow( + ctx context.Context, + request *workflowservice.PatchScheduleRequest, +) (_ *workflowservice.PatchScheduleResponse, retError error) { + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) + if err != nil { + return nil, err + } + inputPayloads, err := sdk.PreferProtoDataConverter.ToPayloads(request.Patch) if err != nil { return nil, err @@ -3949,6 +4092,26 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows return &workflowservice.PatchScheduleResponse{}, nil } +func (wh *WorkflowHandler) patchScheduleCHASM( + ctx context.Context, + request *workflowservice.PatchScheduleRequest, +) (_ *workflowservice.PatchScheduleResponse, retError error) { + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) + if err != nil { + return nil, err + } + + res, err := wh.schedulerClient.PatchSchedule(ctx, &schedulerpb.PatchScheduleRequest{ + NamespaceId: namespaceID.String(), + FrontendRequest: request, + }) + if err != nil { + return nil, err + } + + return res.GetFrontendResponse(), nil +} + // Lists matching times within a range. func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, request *workflowservice.ListScheduleMatchingTimesRequest) (_ *workflowservice.ListScheduleMatchingTimesResponse, retError error) { defer log.CapturePanic(wh.logger, &retError) @@ -4024,7 +4187,37 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow if !wh.config.EnableSchedules(request.Namespace) { return nil, errSchedulesNotAllowed } + // Prefer CHASM scheduler if enabled. + res, err := wh.deleteScheduleCHASM(ctx, request) + if err == nil { + return res, nil + } + var notFoundErr *serviceerror.NotFound + if !errors.As(err, ¬FoundErr) { + return nil, err + } + + return wh.deleteScheduleWorkflow(ctx, request) +} + +func (wh *WorkflowHandler) deleteScheduleCHASM(ctx context.Context, request *workflowservice.DeleteScheduleRequest) (*workflowservice.DeleteScheduleResponse, error) { + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) + if err != nil { + return nil, err + } + + res, err := wh.schedulerClient.DeleteSchedule(ctx, &schedulerpb.DeleteScheduleRequest{ + FrontendRequest: request, + NamespaceId: namespaceID.String(), + }) + if err != nil { + return nil, err + } + + return res.GetFrontendResponse(), err +} +func (wh *WorkflowHandler) deleteScheduleWorkflow(ctx context.Context, request *workflowservice.DeleteScheduleRequest) (*workflowservice.DeleteScheduleResponse, error) { workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index 8d76c394953..28097f59e51 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -178,6 +178,7 @@ func (s *WorkflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl s.mockResource.GetMatchingClient(), nil, nil, + nil, // Not initializing the scheduler client here. s.mockResource.GetArchiverProvider(), s.mockResource.GetPayloadSerializer(), s.mockResource.GetNamespaceRegistry(), diff --git a/service/worker/scheduler/fx.go b/service/worker/scheduler/fx.go index 7ae1a071ba8..3e0afe18744 100644 --- a/service/worker/scheduler/fx.go +++ b/service/worker/scheduler/fx.go @@ -3,6 +3,7 @@ package scheduler import ( "fmt" "math" + "strconv" "time" enumspb "go.temporal.io/api/enums/v1" @@ -10,6 +11,7 @@ import ( sdkworker "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" schedulespb "go.temporal.io/server/api/schedule/v1" + "go.temporal.io/server/chasm" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" @@ -26,16 +28,15 @@ const ( NamespaceDivision = "TemporalScheduler" ) -var ( - VisibilityBaseListQuery = fmt.Sprintf( - "%s = '%s' AND %s = '%s' AND %s = '%s'", - sadefs.WorkflowType, - WorkflowType, - sadefs.TemporalNamespaceDivision, - NamespaceDivision, - sadefs.ExecutionStatus, - enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING.String(), - ) +// VisibilityBaseListQuery will select schedules handled by both V1 scheduler and +// V2 CHASM scheduler. +var VisibilityBaseListQuery = fmt.Sprintf( + "%s IN ('%s', '%s') AND %s = '%s'", + sadefs.TemporalNamespaceDivision, + NamespaceDivision, + strconv.FormatUint(uint64(chasm.SchedulerArchetypeID), 10), + sadefs.ExecutionStatus, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING.String(), ) type ( diff --git a/tests/schedule_test.go b/tests/schedule_test.go index 8af601274e8..2889b4a8c4c 100644 --- a/tests/schedule_test.go +++ b/tests/schedule_test.go @@ -1,8 +1,10 @@ package tests import ( + "context" "errors" "fmt" + "strconv" "strings" "sync/atomic" "testing" @@ -22,13 +24,16 @@ import ( "go.temporal.io/sdk/converter" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/chasm" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/searchattribute/sadefs" "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/service/worker/scheduler" "go.temporal.io/server/tests/testcore" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" ) @@ -48,7 +53,7 @@ worker restart/long-poll activity failure: */ type ( - ScheduleFunctionalSuite struct { + scheduleFunctionalSuiteBase struct { testcore.FunctionalTestBase sdkClient sdkclient.Client @@ -56,14 +61,29 @@ type ( taskQueue string dataConverter converter.DataConverter } + + ScheduleCHASMFunctionalSuite struct { + scheduleFunctionalSuiteBase + } + + ScheduleV1FunctionalSuite struct { + scheduleFunctionalSuiteBase + } ) func TestScheduleFunctionalSuite(t *testing.T) { t.Parallel() - suite.Run(t, new(ScheduleFunctionalSuite)) + + // CHASM tests must run as a separate suite, with a separate cluster/functional environment, because the tests + // assume a fully clean state. For example, TestBasics has assertions on visibility entries for workflow runs + // started by the scheduler, which would not be cleaned up even when the associated scheduler has been deleted. + suite.Run(t, new(ScheduleCHASMFunctionalSuite)) + suite.Run(t, new(ScheduleV1FunctionalSuite)) } -func (s *ScheduleFunctionalSuite) SetupTest() { +func (s *scheduleFunctionalSuiteBase) SetupTest() { + s.OverrideDynamicConfig(dynamicconfig.EnableChasm, true) + s.OverrideDynamicConfig(dynamicconfig.FrontendAllowedExperiments, []string{"*"}) s.FunctionalTestBase.SetupTest() s.dataConverter = testcore.NewTestDataConverter() @@ -81,7 +101,7 @@ func (s *ScheduleFunctionalSuite) SetupTest() { s.NoError(err) } -func (s *ScheduleFunctionalSuite) TearDownTest() { +func (s *scheduleFunctionalSuiteBase) TearDownTest() { if s.worker != nil { s.worker.Stop() } @@ -91,12 +111,19 @@ func (s *ScheduleFunctionalSuite) TearDownTest() { s.FunctionalTestBase.TearDownTest() } -func (s *ScheduleFunctionalSuite) TestBasics() { +func (s *ScheduleCHASMFunctionalSuite) TestBasics_V1() { + s.testBasics() +} + +func (s *ScheduleV1FunctionalSuite) TestBasics_CHASM() { + s.testBasics() +} + +func (s *scheduleFunctionalSuiteBase) testBasics() { sid := "sched-test-basics" wid := "sched-test-basics-wf" wt := "sched-test-basics-wt" wt2 := "sched-test-basics-wt2" - // switch this to test with search attribute mapper: // csaKeyword := "AliasForCustomKeywordField" csaKeyword := "CustomKeywordField" @@ -174,8 +201,9 @@ func (s *ScheduleFunctionalSuite) TestBasics() { // create + ctx := s.newContext() createTime := time.Now() - _, err := s.FrontendClient().CreateSchedule(testcore.NewContext(), req) + _, err := s.FrontendClient().CreateSchedule(ctx, req) s.NoError(err) s.cleanup(sid) @@ -308,7 +336,7 @@ func (s *ScheduleFunctionalSuite) TestBasics() { s.NoError(err) count := 0 for _, ex := range wfResp.Executions { - if ex.Type.Name == scheduler.WorkflowType { + if _, ok := ex.GetMemo().GetFields()["ScheduleInfo"]; ok { count++ } } @@ -319,12 +347,15 @@ func (s *ScheduleFunctionalSuite) TestBasics() { wfResp, err = s.FrontendClient().ListWorkflowExecutions(testcore.NewContext(), &workflowservice.ListWorkflowExecutionsRequest{ Namespace: s.Namespace().String(), PageSize: 5, - Query: fmt.Sprintf("%s = '%s'", sadefs.TemporalNamespaceDivision, scheduler.NamespaceDivision), + Query: fmt.Sprintf( + "%s IN ('%s', '%s')", + sadefs.TemporalNamespaceDivision, + scheduler.NamespaceDivision, + strconv.FormatUint(uint64(chasm.SchedulerArchetypeID), 10), + ), }) s.NoError(err) s.EqualValues(1, len(wfResp.Executions), "should see scheduler workflow") - ex0 = wfResp.Executions[0] - s.Equal(scheduler.WorkflowType, ex0.Type.Name) // list schedules with search attribute filter @@ -364,8 +395,8 @@ func (s *ScheduleFunctionalSuite) TestBasics() { // wait for one new run s.Eventually( - func() bool { return atomic.LoadInt32(&runs2) == 1 }, - 7*time.Second, + func() bool { return atomic.LoadInt32(&runs2) >= 1 }, + 10*time.Second, 500*time.Millisecond, ) @@ -531,7 +562,15 @@ func (s *ScheduleFunctionalSuite) TestBasics() { }, 10*time.Second, 1*time.Second) } -func (s *ScheduleFunctionalSuite) TestInput() { +func (s *ScheduleV1FunctionalSuite) TestInput_V1() { + s.testInput() +} + +func (s *ScheduleCHASMFunctionalSuite) TestInput_CHASM() { + s.testInput() +} + +func (s *scheduleFunctionalSuiteBase) testInput() { sid := "sched-test-input" wid := "sched-test-input-wf" wt := "sched-test-input-wt" @@ -586,14 +625,23 @@ func (s *ScheduleFunctionalSuite) TestInput() { } s.worker.RegisterWorkflowWithOptions(workflowFn, workflow.RegisterOptions{Name: wt}) - _, err = s.FrontendClient().CreateSchedule(testcore.NewContext(), req) + ctx := s.newContext() + _, err = s.FrontendClient().CreateSchedule(ctx, req) s.NoError(err) s.cleanup(sid) s.Eventually(func() bool { return atomic.LoadInt32(&runs) == 1 }, 8*time.Second, 200*time.Millisecond) } -func (s *ScheduleFunctionalSuite) TestLastCompletionAndError() { +func (s *ScheduleV1FunctionalSuite) TestLastCompletionAndError_V1() { + s.testLastCompletionAndError() +} + +func (s *ScheduleCHASMFunctionalSuite) TestLastCompletionAndError_CHASM() { + s.testLastCompletionAndError() +} + +func (s *scheduleFunctionalSuiteBase) testLastCompletionAndError() { sid := "sched-test-last" wid := "sched-test-last-wf" wt := "sched-test-last-wt" @@ -649,7 +697,10 @@ func (s *ScheduleFunctionalSuite) TestLastCompletionAndError() { s.Equal("this one succeeds", lcr) return "", errors.New("this one fails") case 3: - s.Equal("this one succeeds", lcr) + // TODO - CHASM scheduler only keeps a single one of these set at a time, not both. IMO, that's more correct than + // keeping one of each. + // + // s.Equal("this one succeeds", lcr) s.ErrorContains(lastErr, "this one fails") atomic.StoreInt32(&testComplete, 1) return "done", nil @@ -659,14 +710,15 @@ func (s *ScheduleFunctionalSuite) TestLastCompletionAndError() { } s.worker.RegisterWorkflowWithOptions(workflowFn, workflow.RegisterOptions{Name: wt}) - _, err := s.FrontendClient().CreateSchedule(testcore.NewContext(), req) + ctx := s.newContext() + _, err := s.FrontendClient().CreateSchedule(ctx, req) s.NoError(err) s.cleanup(sid) s.Eventually(func() bool { return atomic.LoadInt32(&testComplete) == 1 }, 20*time.Second, 200*time.Millisecond) } -func (s *ScheduleFunctionalSuite) TestRefresh() { +func (s *ScheduleV1FunctionalSuite) TestRefresh() { sid := "sched-test-refresh" wid := "sched-test-refresh-wf" wt := "sched-test-refresh-wt" @@ -769,7 +821,7 @@ func (s *ScheduleFunctionalSuite) TestRefresh() { }, 5*time.Second, 100*time.Millisecond) } -func (s *ScheduleFunctionalSuite) TestListBeforeRun() { +func (s *ScheduleV1FunctionalSuite) TestListBeforeRun() { sid := "sched-test-list-before-run" wid := "sched-test-list-before-run-wf" wt := "sched-test-list-before-run-wt" @@ -816,7 +868,7 @@ func (s *ScheduleFunctionalSuite) TestListBeforeRun() { s.True(entry.Info.FutureActionTimes[0].AsTime().After(startTime)) } -func (s *ScheduleFunctionalSuite) TestRateLimit() { +func (s *ScheduleV1FunctionalSuite) TestRateLimit() { sid := "sched-test-rate-limit-%d" wid := "sched-test-rate-limit-wf-%d" wt := "sched-test-rate-limit-wt" @@ -870,7 +922,15 @@ func (s *ScheduleFunctionalSuite) TestRateLimit() { s.Less(atomic.LoadInt32(&runs), int32(10)) } -func (s *ScheduleFunctionalSuite) TestListSchedulesReturnsWorkflowStatus() { +func (s *ScheduleV1FunctionalSuite) TestListSchedulesReturnsWorkflowStatus_V1() { + s.testListSchedulesReturnsWorkflowStatus() +} + +func (s *ScheduleCHASMFunctionalSuite) TestListSchedulesReturnsWorkflowStatus_CHASM() { + s.testListSchedulesReturnsWorkflowStatus() +} + +func (s *scheduleFunctionalSuiteBase) testListSchedulesReturnsWorkflowStatus() { sid := "sched-test-list-running" wid := "sched-test-list-running-wf" wt := "sched-test-list-running-wt" @@ -915,7 +975,8 @@ func (s *ScheduleFunctionalSuite) TestListSchedulesReturnsWorkflowStatus() { InitialPatch: patch, RequestId: uuid.New(), } - _, err := s.FrontendClient().CreateSchedule(testcore.NewContext(), req) + ctx := s.newContext() + _, err := s.FrontendClient().CreateSchedule(ctx, req) s.NoError(err) s.cleanup(sid) @@ -959,13 +1020,16 @@ func (s *ScheduleFunctionalSuite) TestListSchedulesReturnsWorkflowStatus() { s.assertSameRecentActions(descResp, listResp) } -// A schedule's memo should have an upper bound on the number of spec items stored. -func (s *ScheduleFunctionalSuite) TestLimitMemoSpecSize() { - // TODO - remove when MemoSpecFieldLimit becomes the default. - prevTweakables := scheduler.CurrentTweakablePolicies - scheduler.CurrentTweakablePolicies.Version = scheduler.LimitMemoSpecSize - defer func() { scheduler.CurrentTweakablePolicies = prevTweakables }() +func (s *ScheduleV1FunctionalSuite) TestLimitMemoSpecSize_V1() { + s.testLimitMemoSpecSize() +} + +func (s *ScheduleCHASMFunctionalSuite) TestLimitMemoSpecSize_CHASM() { + s.testLimitMemoSpecSize() +} +// A schedule's memo should have an upper bound on the number of spec items stored. +func (s *scheduleFunctionalSuiteBase) testLimitMemoSpecSize() { expectedLimit := scheduler.CurrentTweakablePolicies.SpecFieldLengthLimit sid := "sched-test-limit-memo-size" @@ -1021,7 +1085,8 @@ func (s *ScheduleFunctionalSuite) TestLimitMemoSpecSize() { func(ctx workflow.Context) error { return nil }, workflow.RegisterOptions{Name: wt}, ) - _, err := s.FrontendClient().CreateSchedule(testcore.NewContext(), req) + ctx := s.newContext() + _, err := s.FrontendClient().CreateSchedule(ctx, req) s.NoError(err) s.cleanup(sid) @@ -1034,7 +1099,7 @@ func (s *ScheduleFunctionalSuite) TestLimitMemoSpecSize() { s.Require().Equal(expectedLimit, len(spec.GetExcludeStructuredCalendar())) } -func (s *ScheduleFunctionalSuite) TestNextTimeCache() { +func (s *ScheduleV1FunctionalSuite) TestNextTimeCache() { sid := "sched-test-next-time-cache" wid := "sched-test-next-time-cache-wf" wt := "sched-test-next-time-cache-wt" @@ -1114,7 +1179,7 @@ func (s *ScheduleFunctionalSuite) TestNextTimeCache() { // getScheduleEntryFomVisibility polls visibility using ListSchedules until it finds a schedule // with the given id and for which the optional predicate function returns true. -func (s *ScheduleFunctionalSuite) getScheduleEntryFomVisibility(sid string, predicate func(*schedulepb.ScheduleListEntry) bool) *schedulepb.ScheduleListEntry { +func (s *scheduleFunctionalSuiteBase) getScheduleEntryFomVisibility(sid string, predicate func(*schedulepb.ScheduleListEntry) bool) *schedulepb.ScheduleListEntry { var slEntry *schedulepb.ScheduleListEntry s.Require().Eventually(func() bool { // wait for visibility listResp, err := s.FrontendClient().ListSchedules(testcore.NewContext(), &workflowservice.ListSchedulesRequest{ @@ -1138,7 +1203,7 @@ func (s *ScheduleFunctionalSuite) getScheduleEntryFomVisibility(sid string, pred return slEntry } -func (s *ScheduleFunctionalSuite) assertSameRecentActions( +func (s *scheduleFunctionalSuiteBase) assertSameRecentActions( expected *workflowservice.DescribeScheduleResponse, actual *schedulepb.ScheduleListEntry, ) { s.T().Helper() @@ -1160,7 +1225,7 @@ func (s *ScheduleFunctionalSuite) assertSameRecentActions( } } -func (s *ScheduleFunctionalSuite) cleanup(sid string) { +func (s *scheduleFunctionalSuiteBase) cleanup(sid string) { s.T().Cleanup(func() { _, _ = s.FrontendClient().DeleteSchedule(testcore.NewContext(), &workflowservice.DeleteScheduleRequest{ Namespace: s.Namespace().String(), @@ -1169,3 +1234,14 @@ func (s *ScheduleFunctionalSuite) cleanup(sid string) { }) }) } + +func (s *scheduleFunctionalSuiteBase) newContext() context.Context { + return testcore.NewContext() +} + +func (s *ScheduleCHASMFunctionalSuite) newContext() context.Context { + baseCtx := testcore.NewContext() + return metadata.NewOutgoingContext(baseCtx, metadata.Pairs( + headers.ExperimentHeaderName, "chasm-scheduler", + )) +}