Skip to content

Commit 37e5ec0

Browse files
authored
Merge branch 'main' into ss/rate_limiter_tqptm
2 parents 9a73256 + c7ee12d commit 37e5ec0

File tree

6 files changed

+122
-31
lines changed

6 files changed

+122
-31
lines changed

service/worker/migration/activities.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ func (a *activities) UpdateActiveCluster(ctx context.Context, req updateActiveCl
424424

425425
func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) {
426426
ctx = headers.SetCallerInfo(ctx, headers.NewCallerInfo(request.Namespace, headers.CallerTypePreemptable, ""))
427+
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(interceptor.DCRedirectionContextHeaderName, "false"))
427428

428429
resp, err := a.frontendClient.ListWorkflowExecutions(ctx, request)
429430
if err != nil {

service/worker/replicator/namespace_replication_message_processor.go renamed to service/worker/replicator/replication_message_processor.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const (
3333
taskProcessorErrorRetryMaxAttampts = 5
3434
)
3535

36-
func newNamespaceReplicationMessageProcessor(
36+
func newReplicationMessageProcessor(
3737
currentCluster string,
3838
sourceCluster string,
3939
logger log.Logger,
@@ -45,12 +45,12 @@ func newNamespaceReplicationMessageProcessor(
4545
namespaceReplicationQueue persistence.NamespaceReplicationQueue,
4646
matchingClient matchingservice.MatchingServiceClient,
4747
namespaceRegistry namespace.Registry,
48-
) *namespaceReplicationMessageProcessor {
48+
) *replicationMessageProcessor {
4949
retryPolicy := backoff.NewExponentialRetryPolicy(taskProcessorErrorRetryWait).
5050
WithBackoffCoefficient(taskProcessorErrorRetryBackoffCoefficient).
5151
WithMaximumAttempts(taskProcessorErrorRetryMaxAttampts)
5252

53-
return &namespaceReplicationMessageProcessor{
53+
return &replicationMessageProcessor{
5454
hostInfo: hostInfo,
5555
serviceResolver: serviceResolver,
5656
status: common.DaemonStatusInitialized,
@@ -71,7 +71,7 @@ func newNamespaceReplicationMessageProcessor(
7171
}
7272

7373
type (
74-
namespaceReplicationMessageProcessor struct {
74+
replicationMessageProcessor struct {
7575
hostInfo membership.HostInfo
7676
serviceResolver membership.ServiceResolver
7777
status int32
@@ -91,21 +91,21 @@ type (
9191
}
9292
)
9393

94-
func (p *namespaceReplicationMessageProcessor) Start() {
94+
func (p *replicationMessageProcessor) Start() {
9595
if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
9696
return
9797
}
9898

9999
go p.processorLoop()
100100
}
101101

102-
func (p *namespaceReplicationMessageProcessor) processorLoop() {
102+
func (p *replicationMessageProcessor) processorLoop() {
103103
timer := time.NewTimer(getWaitDuration())
104104

105105
for {
106106
select {
107107
case <-timer.C:
108-
p.getAndHandleNamespaceReplicationTasks()
108+
p.handleReplicationTasks()
109109
timer.Reset(getWaitDuration())
110110
case <-p.done:
111111
timer.Stop()
@@ -114,7 +114,7 @@ func (p *namespaceReplicationMessageProcessor) processorLoop() {
114114
}
115115
}
116116

117-
func (p *namespaceReplicationMessageProcessor) getAndHandleNamespaceReplicationTasks() {
117+
func (p *replicationMessageProcessor) handleReplicationTasks() {
118118
// The following is a best effort to make sure only one worker is processing tasks for a
119119
// particular source cluster. When the ring is under reconfiguration, it is possible that
120120
// for a small period of time two or more workers think they are the owner and try to execute
@@ -153,12 +153,12 @@ func (p *namespaceReplicationMessageProcessor) getAndHandleNamespaceReplicationT
153153
for taskIndex := range response.Messages.ReplicationTasks {
154154
task := response.Messages.ReplicationTasks[taskIndex]
155155
err := backoff.ThrottleRetry(func() error {
156-
return p.handleNamespaceReplicationTask(taskCtx, task)
156+
return p.handleReplicationTask(taskCtx, task)
157157
}, p.retryPolicy, isTransientRetryableError)
158158

159159
if err != nil {
160160
metrics.ReplicatorFailures.With(p.metricsHandler).Record(1)
161-
p.logger.Error("Failed to apply namespace replication tasks", tag.Error(err))
161+
p.logger.Error("Failed to apply replication tasks", tag.Error(err))
162162

163163
dlqErr := backoff.ThrottleRetry(func() error {
164164

@@ -176,7 +176,7 @@ func (p *namespaceReplicationMessageProcessor) getAndHandleNamespaceReplicationT
176176
p.lastRetrievedMessageID = response.Messages.GetLastRetrievedMessageId()
177177
}
178178

179-
func (p *namespaceReplicationMessageProcessor) putNamespaceReplicationTaskToDLQ(
179+
func (p *replicationMessageProcessor) putNamespaceReplicationTaskToDLQ(
180180
ctx context.Context,
181181
task *replicationspb.ReplicationTask,
182182
) error {
@@ -205,7 +205,7 @@ func (p *namespaceReplicationMessageProcessor) putNamespaceReplicationTaskToDLQ(
205205
return p.namespaceReplicationQueue.PublishToDLQ(ctx, task)
206206
}
207207

208-
func (p *namespaceReplicationMessageProcessor) handleNamespaceReplicationTask(
208+
func (p *replicationMessageProcessor) handleReplicationTask(
209209
ctx context.Context,
210210
task *replicationspb.ReplicationTask,
211211
) error {
@@ -222,23 +222,25 @@ func (p *namespaceReplicationMessageProcessor) handleNamespaceReplicationTask(
222222
err := p.namespaceTaskExecutor.Execute(ctx, attr)
223223
if err != nil {
224224
p.logger.Error("unable to process namespace replication task",
225-
tag.WorkflowNamespaceID(attr.Id))
225+
tag.WorkflowNamespaceID(attr.Id),
226+
tag.Error(err))
226227
}
227228
return err
228229
case enumsspb.REPLICATION_TASK_TYPE_TASK_QUEUE_USER_DATA:
229230
attr := task.GetTaskQueueUserDataAttributes()
230231
err := p.handleTaskQueueUserDataReplicationTask(ctx, attr)
231232
if err != nil {
232233
p.logger.Error(fmt.Sprintf("unable to process task queue metadata replication task, %v", attr.TaskQueueName),
233-
tag.WorkflowNamespaceID(attr.NamespaceId))
234+
tag.WorkflowNamespaceID(attr.NamespaceId),
235+
tag.Error(err))
234236
}
235237
return err
236238
default:
237239
return fmt.Errorf("cannot handle replication task of type %v", task.TaskType)
238240
}
239241
}
240242

241-
func (p *namespaceReplicationMessageProcessor) handleTaskQueueUserDataReplicationTask(
243+
func (p *replicationMessageProcessor) handleTaskQueueUserDataReplicationTask(
242244
ctx context.Context,
243245
attrs *replicationspb.TaskQueueUserDataAttributes,
244246
) error {
@@ -266,7 +268,7 @@ func (p *namespaceReplicationMessageProcessor) handleTaskQueueUserDataReplicatio
266268
return err
267269
}
268270

269-
func (p *namespaceReplicationMessageProcessor) Stop() {
271+
func (p *replicationMessageProcessor) Stop() {
270272
close(p.done)
271273
}
272274

service/worker/replicator/replicator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type (
4040
replicationCleanupGroup goro.Group
4141

4242
namespaceProcessorsLock sync.Mutex
43-
namespaceProcessors map[string]*namespaceReplicationMessageProcessor
43+
namespaceProcessors map[string]*replicationMessageProcessor
4444
matchingClient matchingservice.MatchingServiceClient
4545
namespaceRegistry namespace.Registry
4646
}
@@ -69,7 +69,7 @@ func NewReplicator(
6969
serviceResolver: serviceResolver,
7070
clusterMetadata: clusterMetadata,
7171
namespaceReplicationTaskExecutor: namespaceReplicationTaskExecutor,
72-
namespaceProcessors: make(map[string]*namespaceReplicationMessageProcessor),
72+
namespaceProcessors: make(map[string]*replicationMessageProcessor),
7373
clientBean: clientBean,
7474
logger: log.With(logger, tag.ComponentReplicator),
7575
metricsHandler: metricsHandler,
@@ -139,7 +139,7 @@ func (r *Replicator) listenToClusterMetadataChange() {
139139
// This should never happen as cluster metadata should have the up-to-date data.
140140
panic(fmt.Sprintf("Bug found in cluster metadata with error %v", err))
141141
}
142-
processor := newNamespaceReplicationMessageProcessor(
142+
processor := newReplicationMessageProcessor(
143143
currentClusterName,
144144
clusterName,
145145
log.With(r.logger, tag.ComponentReplicationTaskProcessor, tag.SourceCluster(clusterName)),

service/worker/scheduler/spec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (cs *CompiledSpec) GetNextTime(jitterSeed string, after time.Time) GetNextT
276276
after = util.MaxTime(after, cs.spec.StartTime.AsTime().Add(-time.Second))
277277

278278
pastEndTime := func(t time.Time) bool {
279-
return cs.spec.EndTime != nil && t.After(cs.spec.EndTime.AsTime())
279+
return cs.spec.EndTime != nil && t.After(cs.spec.EndTime.AsTime()) || t.Year() > maxCalendarYear
280280
}
281281
var nominal time.Time
282282
for nominal.IsZero() || cs.excluded(nominal) {

service/worker/scheduler/spec_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,19 @@ func (s *specSuite) TestSpecExclude() {
366366
)
367367
}
368368

369+
func (s *specSuite) TestExcludeAll() {
370+
cs, err := s.specBuilder.NewCompiledSpec(&schedulepb.ScheduleSpec{
371+
Interval: []*schedulepb.IntervalSpec{
372+
{Interval: durationpb.New(7 * 24 * time.Hour)},
373+
},
374+
ExcludeCalendar: []*schedulepb.CalendarSpec{
375+
&schedulepb.CalendarSpec{Second: "*", Minute: "*", Hour: "*"},
376+
},
377+
})
378+
s.NoError(err)
379+
s.Zero(cs.GetNextTime("", time.Date(2022, 3, 23, 12, 53, 2, 9, time.UTC)))
380+
}
381+
369382
func (s *specSuite) TestSpecStartTime() {
370383
s.checkSequenceFull(
371384
"",

service/worker/workerdeployment/workflow.go

Lines changed: 86 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,30 @@ func (d *WorkflowRunner) updateVersionSummary(summary *deploymentspb.WorkerDeplo
127127
}
128128

129129
func (d *WorkflowRunner) run(ctx workflow.Context) error {
130-
if d.GetState().GetCreateTime() == nil {
130+
// TODO(carlydf): remove verbose logging
131+
d.logger.Info("Raw workflow state at start",
132+
"state_nil", d.State == nil,
133+
"create_time_nil", d.GetState().GetCreateTime() == nil,
134+
"routing_config_nil", d.GetState().GetRoutingConfig() == nil,
135+
"raw_state", d.State,
136+
"workflow_id", workflow.GetInfo(ctx).WorkflowExecution.ID,
137+
"run_id", workflow.GetInfo(ctx).WorkflowExecution.RunID)
138+
139+
if d.GetState().GetCreateTime() == nil ||
140+
d.GetState().GetRoutingConfig() == nil ||
141+
d.GetState().GetConflictToken() == nil {
131142
if d.State == nil {
132143
d.State = &deploymentspb.WorkerDeploymentLocalState{}
133144
}
134-
d.State.CreateTime = timestamppb.New(workflow.Now(ctx))
135-
d.State.RoutingConfig = &deploymentpb.RoutingConfig{CurrentVersion: worker_versioning.UnversionedVersionId}
136-
d.State.ConflictToken, _ = workflow.Now(ctx).MarshalBinary()
145+
if d.State.CreateTime == nil {
146+
d.State.CreateTime = timestamppb.New(workflow.Now(ctx))
147+
}
148+
if d.State.RoutingConfig == nil {
149+
d.State.RoutingConfig = &deploymentpb.RoutingConfig{CurrentVersion: worker_versioning.UnversionedVersionId}
150+
}
151+
if d.State.ConflictToken == nil {
152+
d.State.ConflictToken, _ = workflow.Now(ctx).MarshalBinary()
153+
}
137154

138155
// updating the memo since the RoutingConfig is updated
139156
if err := d.updateMemo(ctx); err != nil {
@@ -144,6 +161,15 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
144161
d.State.Versions = make(map[string]*deploymentspb.WorkerDeploymentVersionSummary)
145162
}
146163

164+
// TODO(carlydf): remove verbose logging
165+
d.logger.Info("Starting workflow run",
166+
"create_time", d.State.GetCreateTime(),
167+
"routing_config", d.State.GetRoutingConfig(),
168+
//nolint:staticcheck // SA1019: worker versioning v0.31
169+
"current_version", d.State.GetRoutingConfig().GetCurrentVersion(),
170+
//nolint:staticcheck // SA1019: worker versioning v0.31
171+
"ramping_version", d.State.GetRoutingConfig().GetRampingVersion())
172+
147173
err := workflow.SetQueryHandler(ctx, QueryDescribeDeployment, func() (*deploymentspb.QueryDescribeWorkerDeploymentResponse, error) {
148174
return &deploymentspb.QueryDescribeWorkerDeploymentResponse{
149175
State: d.State,
@@ -224,10 +250,25 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
224250
// Wait until we can continue as new or are cancelled. The workflow will continue-as-new iff
225251
// there are no pending updates/signals and the state has changed.
226252
err = workflow.Await(ctx, func() bool {
227-
return d.deleteDeployment || // deployment is deleted -> it's ok to drop all signals and updates.
253+
canContinue := d.deleteDeployment || // deployment is deleted -> it's ok to drop all signals and updates.
228254
// There is no pending signal or update, but the state is dirty or forceCaN is requested:
229255
(!d.signalHandler.signalSelector.HasPending() && d.signalHandler.processingSignals == 0 && workflow.AllHandlersFinished(ctx) &&
230256
(d.forceCAN || d.stateChanged))
257+
258+
// TODO(carlydf): remove verbose logging
259+
if canContinue {
260+
d.logger.Info("Workflow can continue as new",
261+
"workflow_id", workflow.GetInfo(ctx).WorkflowExecution.ID,
262+
"run_id", workflow.GetInfo(ctx).WorkflowExecution.RunID,
263+
"delete_deployment", d.deleteDeployment,
264+
"has_pending_signals", d.signalHandler.signalSelector.HasPending(),
265+
"processing_signals", d.signalHandler.processingSignals,
266+
"all_handlers_finished", workflow.AllHandlersFinished(ctx),
267+
"force_can", d.forceCAN,
268+
"state_changed", d.stateChanged,
269+
"routing_config", d.State.GetRoutingConfig())
270+
}
271+
return canContinue
231272
})
232273
if err != nil {
233274
return err
@@ -237,6 +278,19 @@ func (d *WorkflowRunner) run(ctx workflow.Context) error {
237278
return nil
238279
}
239280

281+
// TODO(carlydf): remove verbose logging
282+
d.logger.Info("Continuing workflow as new",
283+
"create_time", d.State.GetCreateTime(),
284+
"routing_config", d.State.GetRoutingConfig(),
285+
//nolint:staticcheck // SA1019: worker versioning v0.31
286+
"current_version", d.State.GetRoutingConfig().GetCurrentVersion(),
287+
//nolint:staticcheck // SA1019: worker versioning v0.31
288+
"ramping_version", d.State.GetRoutingConfig().GetRampingVersion(),
289+
"state_changed", d.stateChanged,
290+
"force_can", d.forceCAN,
291+
"workflow_id", workflow.GetInfo(ctx).WorkflowExecution.ID,
292+
"run_id", workflow.GetInfo(ctx).WorkflowExecution.RunID)
293+
240294
// We perform a continue-as-new after each update and signal is handled to ensure compatibility
241295
// even if the server rolls back to a previous minor version. By continuing-as-new,
242296
// we pass the current state as input to the next workflow execution, resulting in a new
@@ -338,19 +392,23 @@ func (d *WorkflowRunner) handleDeleteDeployment(ctx workflow.Context) error {
338392
}
339393

340394
func (d *WorkflowRunner) validateStateBeforeAcceptingRampingUpdate(args *deploymentspb.SetRampingVersionArgs) error {
341-
if args.Version == d.State.RoutingConfig.RampingVersion && args.Percentage == d.State.RoutingConfig.RampingVersionPercentage && args.Identity == d.State.LastModifierIdentity {
342-
return temporal.NewApplicationError("version already ramping, no change", errNoChangeType, d.State.ConflictToken)
395+
//nolint:staticcheck // SA1019: worker versioning v0.31
396+
if args.Version == d.State.GetRoutingConfig().GetRampingVersion() &&
397+
args.Percentage == d.State.GetRoutingConfig().GetRampingVersionPercentage() &&
398+
args.Identity == d.State.GetLastModifierIdentity() {
399+
return temporal.NewApplicationError("version already ramping, no change", errNoChangeType, d.State.GetConflictToken())
343400
}
344401

345-
if args.ConflictToken != nil && !bytes.Equal(args.ConflictToken, d.State.ConflictToken) {
402+
if args.ConflictToken != nil && !bytes.Equal(args.ConflictToken, d.State.GetConflictToken()) {
346403
return temporal.NewApplicationError("conflict token mismatch", errFailedPrecondition)
347404
}
348-
if args.Version == d.State.RoutingConfig.CurrentVersion {
405+
//nolint:staticcheck // SA1019: worker versioning v0.31
406+
if args.Version == d.State.GetRoutingConfig().GetCurrentVersion() {
349407
d.logger.Info("version can't be set to ramping since it is already current")
350408
return temporal.NewApplicationError(fmt.Sprintf("requested ramping version %s is already current", args.Version), errFailedPrecondition)
351409
}
352410

353-
if _, ok := d.State.Versions[args.Version]; !ok && args.Version != "" && args.Version != worker_versioning.UnversionedVersionId {
411+
if _, ok := d.State.GetVersions()[args.Version]; !ok && args.Version != "" && args.Version != worker_versioning.UnversionedVersionId {
354412
d.logger.Info("version not found in deployment")
355413
return temporal.NewApplicationError(fmt.Sprintf("requested ramping version %s not found in deployment", args.Version), errFailedPrecondition)
356414
}
@@ -579,7 +637,8 @@ func (d *WorkflowRunner) handleDeleteVersion(ctx workflow.Context, args *deploym
579637
}
580638

581639
func (d *WorkflowRunner) validateStateBeforeAcceptingSetCurrent(args *deploymentspb.SetCurrentVersionArgs) error {
582-
if d.State.RoutingConfig.CurrentVersion == args.Version && d.State.LastModifierIdentity == args.Identity {
640+
//nolint:staticcheck // SA1019: worker versioning v0.31
641+
if d.State.GetRoutingConfig().GetCurrentVersion() == args.Version && d.State.GetLastModifierIdentity() == args.Identity {
583642
return temporal.NewApplicationError("no change", errNoChangeType, d.State.ConflictToken)
584643
}
585644
if args.ConflictToken != nil && !bytes.Equal(args.ConflictToken, d.State.ConflictToken) {
@@ -609,6 +668,14 @@ func (d *WorkflowRunner) handleSetCurrent(ctx workflow.Context, args *deployment
609668
d.lock.Unlock()
610669
}()
611670

671+
// Log state before update
672+
// TODO(carlydf): remove verbose logging
673+
d.logger.Info("Starting SetCurrent update",
674+
//nolint:staticcheck // SA1019: worker versioning v0.31
675+
"current_version", d.State.GetRoutingConfig().GetCurrentVersion(),
676+
"new_version", args.Version,
677+
"routing_config", d.State.GetRoutingConfig())
678+
612679
// Validating the state before starting the SetCurrent operation. This is required due to the following reason:
613680
// The validator accepts/rejects updates based on the state of the deployment workflow. Theoretically, two concurrent update requests
614681
// might be accepted by the validator since the state of the workflow, at that point in time, is valid for the updates to take place. Since this update handler
@@ -955,6 +1022,14 @@ func (d *WorkflowRunner) newUUID(ctx workflow.Context) string {
9551022
}
9561023

9571024
func (d *WorkflowRunner) updateMemo(ctx workflow.Context) error {
1025+
// TODO(carlydf): remove verbose logging
1026+
d.logger.Info("Updating workflow memo",
1027+
"routing_config", d.State.GetRoutingConfig(),
1028+
//nolint:staticcheck // SA1019: worker versioning v0.31
1029+
"current_version", d.State.GetRoutingConfig().GetCurrentVersion(),
1030+
//nolint:staticcheck // SA1019: worker versioning v0.31
1031+
"ramping_version", d.State.GetRoutingConfig().GetRampingVersion())
1032+
9581033
return workflow.UpsertMemo(ctx, map[string]any{
9591034
WorkerDeploymentMemoField: &deploymentspb.WorkerDeploymentWorkflowMemo{
9601035
DeploymentName: d.DeploymentName,

0 commit comments

Comments
 (0)