Skip to content

Commit 212332b

Browse files
yux0alexshtin
authored andcommitted
Add flag to enable execution scanner event Id validator (#4114)
* Add flag to enable execution scanner event id validator
1 parent 7c0a0e0 commit 212332b

File tree

6 files changed

+52
-34
lines changed

6 files changed

+52
-34
lines changed

common/dynamicconfig/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,8 @@ const (
707707
ExecutionDataDurationBuffer = "worker.executionDataDurationBuffer"
708708
// ExecutionScannerWorkerCount is the execution scavenger worker count
709709
ExecutionScannerWorkerCount = "worker.executionScannerWorkerCount"
710+
// ExecutionScannerHistoryEventIdValidator is the flag to enable history event id validator
711+
ExecutionScannerHistoryEventIdValidator = "worker.executionEnableHistoryEventIdValidator"
710712
// TaskQueueScannerEnabled indicates if task queue scanner should be started as part of worker.Scanner
711713
TaskQueueScannerEnabled = "worker.taskQueueScannerEnabled"
712714
// HistoryScannerEnabled indicates if history scanner should be started as part of worker.Scanner

service/worker/scanner/executions/scavenger.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,17 @@ type (
5656
numHistoryShards int32
5757
activityContext context.Context
5858

59-
executionManager persistence.ExecutionManager
60-
registry namespace.Registry
61-
historyClient historyservice.HistoryServiceClient
62-
adminClient adminservice.AdminServiceClient
63-
executor executor.Executor
64-
rateLimiter quotas.RateLimiter
65-
perShardQPS dynamicconfig.IntPropertyFn
66-
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
67-
metricsHandler metrics.Handler
68-
logger log.Logger
59+
executionManager persistence.ExecutionManager
60+
registry namespace.Registry
61+
historyClient historyservice.HistoryServiceClient
62+
adminClient adminservice.AdminServiceClient
63+
executor executor.Executor
64+
rateLimiter quotas.RateLimiter
65+
perShardQPS dynamicconfig.IntPropertyFn
66+
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
67+
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn
68+
metricsHandler metrics.Handler
69+
logger log.Logger
6970

7071
stopC chan struct{}
7172
stopWG sync.WaitGroup
@@ -89,6 +90,7 @@ func NewScavenger(
8990
perShardQPS dynamicconfig.IntPropertyFn,
9091
executionDataDurationBuffer dynamicconfig.DurationPropertyFn,
9192
executionTaskWorker dynamicconfig.IntPropertyFn,
93+
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn,
9294
executionManager persistence.ExecutionManager,
9395
registry namespace.Registry,
9496
historyClient historyservice.HistoryServiceClient,
@@ -112,10 +114,11 @@ func NewScavenger(
112114
rateLimiter: quotas.NewDefaultOutgoingRateLimiter(
113115
func() float64 { return float64(perHostQPS()) },
114116
),
115-
perShardQPS: perShardQPS,
116-
executionDataDurationBuffer: executionDataDurationBuffer,
117-
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ExecutionsScavengerScope)),
118-
logger: logger,
117+
perShardQPS: perShardQPS,
118+
executionDataDurationBuffer: executionDataDurationBuffer,
119+
enableHistoryEventIDValidator: enableHistoryEventIDValidator,
120+
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ExecutionsScavengerScope)),
121+
logger: logger,
119122

120123
stopC: make(chan struct{}),
121124
}
@@ -185,6 +188,7 @@ func (s *Scavenger) run() {
185188
s.rateLimiter,
186189
}),
187190
s.executionDataDurationBuffer,
191+
s.enableHistoryEventIDValidator,
188192
))
189193
if !submitted {
190194
s.logger.Error("unable to submit task to executor", tag.ShardID(shardID))

service/worker/scanner/executions/task.go

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,11 @@ type (
6666
logger log.Logger
6767
scavenger *Scavenger
6868

69-
ctx context.Context
70-
rateLimiter quotas.RateLimiter
71-
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
72-
paginationToken []byte
69+
ctx context.Context
70+
rateLimiter quotas.RateLimiter
71+
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
72+
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn
73+
paginationToken []byte
7374
}
7475
)
7576

@@ -86,6 +87,7 @@ func newTask(
8687
scavenger *Scavenger,
8788
rateLimiter quotas.RateLimiter,
8889
executionDataDurationBuffer dynamicconfig.DurationPropertyFn,
90+
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn,
8991
) executor.Task {
9092
return &task{
9193
shardID: shardID,
@@ -98,9 +100,10 @@ func newTask(
98100
logger: logger,
99101
scavenger: scavenger,
100102

101-
ctx: ctx,
102-
rateLimiter: rateLimiter,
103-
executionDataDurationBuffer: executionDataDurationBuffer,
103+
ctx: ctx,
104+
rateLimiter: rateLimiter,
105+
executionDataDurationBuffer: executionDataDurationBuffer,
106+
enableHistoryEventIDValidator: enableHistoryEventIDValidator,
104107
}
105108
}
106109

@@ -186,19 +189,21 @@ func (t *task) validate(
186189
return results
187190
}
188191

189-
if validationResults, err := NewHistoryEventIDValidator(
190-
t.shardID,
191-
t.executionManager,
192-
).Validate(t.ctx, mutableState); err != nil {
193-
t.logger.Error("unable to validate history event ID being contiguous",
194-
tag.ShardID(t.shardID),
195-
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().GetNamespaceId()),
196-
tag.WorkflowID(mutableState.GetExecutionInfo().GetWorkflowId()),
197-
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
198-
tag.Error(err),
199-
)
200-
} else {
201-
results = append(results, validationResults...)
192+
if t.enableHistoryEventIDValidator() {
193+
if validationResults, err := NewHistoryEventIDValidator(
194+
t.shardID,
195+
t.executionManager,
196+
).Validate(t.ctx, mutableState); err != nil {
197+
t.logger.Error("unable to validate history event ID being contiguous",
198+
tag.ShardID(t.shardID),
199+
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().GetNamespaceId()),
200+
tag.WorkflowID(mutableState.GetExecutionInfo().GetWorkflowId()),
201+
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
202+
tag.Error(err),
203+
)
204+
} else {
205+
results = append(results, validationResults...)
206+
}
202207
}
203208

204209
return results

service/worker/scanner/scanner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ type (
8282
ExecutionDataDurationBuffer dynamicconfig.DurationPropertyFn
8383
// ExecutionScannerWorkerCount is the execution scavenger task worker number
8484
ExecutionScannerWorkerCount dynamicconfig.IntPropertyFn
85+
// ExecutionScannerHistoryEventIdValidator indicates if the execution scavenger to validate history event id.
86+
ExecutionScannerHistoryEventIdValidator dynamicconfig.BoolPropertyFn
8587
}
8688

8789
// scannerContext is the context object that get's

service/worker/scanner/workflow.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ func ExecutionsScavengerActivity(
196196
ctx.cfg.ExecutionScannerPerShardQPS,
197197
ctx.cfg.ExecutionDataDurationBuffer,
198198
ctx.cfg.ExecutionScannerWorkerCount,
199+
ctx.cfg.ExecutionScannerHistoryEventIdValidator,
199200
ctx.executionManager,
200201
ctx.namespaceRegistry,
201202
ctx.historyClient,

service/worker/service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten
302302
dynamicconfig.ExecutionScannerWorkerCount,
303303
8,
304304
),
305+
ExecutionScannerHistoryEventIdValidator: dc.GetBoolProperty(
306+
dynamicconfig.ExecutionScannerHistoryEventIdValidator,
307+
true,
308+
),
305309
},
306310
EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true),
307311
BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS),

0 commit comments

Comments
 (0)