Skip to content

Commit 873df34

Browse files
dnrmindaugasrukas
authored andcommitted
Improve build id scavenger (#4568)
1 parent ed7f94c commit 873df34

File tree

5 files changed

+30
-22
lines changed

5 files changed

+30
-22
lines changed

common/dynamicconfig/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ const (
214214
// 2. There are delays in the visibility task processor (which is asynchronous).
215215
// 3. There's propagation delay of the versioning data between matching nodes.
216216
RemovableBuildIdDurationSinceDefault = "worker.removableBuildIdDurationSinceDefault"
217+
// BuildIdScavengerVisibilityRPS is the rate limit for visibility calls from the build id scavenger
218+
BuildIdScavenengerVisibilityRPS = "worker.buildIdScavengerVisibilityRPS"
217219

218220
// keys for frontend
219221

service/worker/scanner/build_ids/scavenger.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ package build_ids
2626

2727
import (
2828
"context"
29-
"math"
3029
"time"
3130

3231
enumspb "go.temporal.io/api/enums/v1"
@@ -67,7 +66,6 @@ var (
6766

6867
type (
6968
BuildIdScavangerInput struct {
70-
VisibilityRPS float64
7169
NamespaceListPageSize int
7270
TaskQueueListPageSize int
7371
}
@@ -86,6 +84,7 @@ type (
8684
// 2. workflows with that identifier that have yet to be indexed in visibility
8785
// The scavenger should allow enough time to pass before cleaning these build ids.
8886
removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn
87+
buildIdScavengerVisibilityRPS dynamicconfig.FloatPropertyFn
8988
}
9089

9190
heartbeatDetails struct {
@@ -105,6 +104,7 @@ func NewActivities(
105104
matchingClient matchingservice.MatchingServiceClient,
106105
currentClusterName string,
107106
removableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn,
107+
buildIdScavengerVisibilityRPS dynamicconfig.FloatPropertyFn,
108108
) *Activities {
109109
return &Activities{
110110
logger: logger,
@@ -115,6 +115,7 @@ func NewActivities(
115115
matchingClient: matchingClient,
116116
currentClusterName: currentClusterName,
117117
removableBuildIdDurationSinceDefault: removableBuildIdDurationSinceDefault,
118+
buildIdScavengerVisibilityRPS: buildIdScavengerVisibilityRPS,
118119
}
119120
}
120121

@@ -136,9 +137,6 @@ func (a *Activities) setDefaults(input *BuildIdScavangerInput) {
136137
if input.TaskQueueListPageSize == 0 {
137138
input.TaskQueueListPageSize = 100
138139
}
139-
if input.VisibilityRPS == 0 {
140-
input.VisibilityRPS = 1
141-
}
142140
}
143141

144142
func (a *Activities) recordHeartbeat(ctx context.Context, heartbeat heartbeatDetails) {
@@ -155,7 +153,7 @@ func (a *Activities) ScavengeBuildIds(ctx context.Context, input BuildIdScavange
155153
return temporal.NewNonRetryableApplicationError("failed to load previous heartbeat details", "TypeError", err)
156154
}
157155
}
158-
rateLimiter := quotas.NewRateLimiter(input.VisibilityRPS, int(math.Ceil(input.VisibilityRPS)))
156+
rateLimiter := quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(a.buildIdScavengerVisibilityRPS))
159157
for {
160158
nsResponse, err := a.metadataManager.ListNamespaces(ctx, &persistence.ListNamespacesRequest{
161159
PageSize: input.NamespaceListPageSize,
@@ -208,14 +206,16 @@ func (a *Activities) processNamespaceEntry(
208206
return err
209207
}
210208
for heartbeat.TaskQueueIdx < len(tqResponse.Entries) {
209+
if ctx.Err() != nil {
210+
return ctx.Err()
211+
}
211212
entry := tqResponse.Entries[heartbeat.TaskQueueIdx]
212213
if err := a.processUserDataEntry(ctx, rateLimiter, *heartbeat, ns, entry); err != nil {
213214
// Intentionally don't fail the activity on single entry.
214215
a.logger.Error("Failed to update task queue user data",
215216
tag.WorkflowNamespace(ns.Name().String()),
216217
tag.WorkflowTaskQueueName(entry.TaskQueue),
217218
tag.Error(err))
218-
continue
219219
}
220220
heartbeat.TaskQueueIdx++
221221
a.recordHeartbeat(ctx, *heartbeat)

service/worker/scanner/build_ids/scavenger_test.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"go.temporal.io/server/api/matchingservicemock/v1"
4545
persistencespb "go.temporal.io/server/api/persistence/v1"
4646
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
47+
"go.temporal.io/server/common/dynamicconfig"
4748
"go.temporal.io/server/common/log"
4849
"go.temporal.io/server/common/namespace"
4950
"go.temporal.io/server/common/persistence"
@@ -84,11 +85,10 @@ func Test_findBuildIdsToRemove_FindsAllBuildIdsToRemove(t *testing.T) {
8485
rateLimiter := quotas.NewMockRateLimiter(ctrl)
8586

8687
a := &Activities{
87-
logger: log.NewCLILogger(),
88-
visibilityManager: visiblityManager,
89-
removableBuildIdDurationSinceDefault: func() time.Duration {
90-
return time.Hour
91-
},
88+
logger: log.NewCLILogger(),
89+
visibilityManager: visiblityManager,
90+
removableBuildIdDurationSinceDefault: dynamicconfig.GetDurationPropertyFn(time.Hour),
91+
buildIdScavengerVisibilityRPS: dynamicconfig.GetFloatPropertyFn(1.0),
9292
}
9393

9494
visiblityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Times(4).DoAndReturn(
@@ -233,16 +233,15 @@ func Test_ScavengeBuildIds_Heartbeats(t *testing.T) {
233233
matchingClient := matchingservicemock.NewMockMatchingServiceClient(ctrl)
234234

235235
a := &Activities{
236-
logger: log.NewCLILogger(),
237-
visibilityManager: visiblityManager,
238-
metadataManager: metadataManager,
239-
taskManager: taskManager,
240-
namespaceRegistry: namespaceRegistry,
241-
matchingClient: matchingClient,
242-
removableBuildIdDurationSinceDefault: func() time.Duration {
243-
return time.Hour
244-
},
245-
currentClusterName: "test-cluster",
236+
logger: log.NewCLILogger(),
237+
visibilityManager: visiblityManager,
238+
metadataManager: metadataManager,
239+
taskManager: taskManager,
240+
namespaceRegistry: namespaceRegistry,
241+
matchingClient: matchingClient,
242+
removableBuildIdDurationSinceDefault: dynamicconfig.GetDurationPropertyFn(time.Hour),
243+
buildIdScavengerVisibilityRPS: dynamicconfig.GetFloatPropertyFn(1.0),
244+
currentClusterName: "test-cluster",
246245
}
247246

248247
rateLimiter.EXPECT().Wait(gomock.Any()).AnyTimes()

service/worker/scanner/scanner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type (
9393
// RemovableBuildIdDurationSinceDefault is the minimum duration since a build id was last default in its
9494
// containing set for it to be considered for removal.
9595
RemovableBuildIdDurationSinceDefault dynamicconfig.DurationPropertyFn
96+
// BuildIdScavengerVisibilityRPS is the rate limit for visibility calls from the build id scavenger
97+
BuildIdScavengerVisibilityRPS dynamicconfig.FloatPropertyFn
9698
}
9799

98100
// scannerContext is the context object that gets
@@ -209,6 +211,7 @@ func (s *Scanner) Start() error {
209211
s.context.matchingClient,
210212
s.context.currentClusterName,
211213
s.context.cfg.RemovableBuildIdDurationSinceDefault,
214+
s.context.cfg.BuildIdScavengerVisibilityRPS,
212215
)
213216

214217
work := s.context.sdkClientFactory.NewWorker(s.context.sdkClientFactory.GetSystemClient(), build_ids.BuildIdScavengerTaskQueueName, workerOpts)

service/worker/service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,10 @@ func NewConfig(
324324
dynamicconfig.RemovableBuildIdDurationSinceDefault,
325325
time.Hour,
326326
),
327+
BuildIdScavengerVisibilityRPS: dc.GetFloat64Property(
328+
dynamicconfig.BuildIdScavenengerVisibilityRPS,
329+
1.0,
330+
),
327331
},
328332
EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true),
329333
BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS),

0 commit comments

Comments
 (0)