Skip to content

Commit 2f41fdc

Browse files
Turns on the history scavenger for NoSQL backends
1 parent a86d455 commit 2f41fdc

File tree

3 files changed

+243
-8
lines changed

3 files changed

+243
-8
lines changed

service/worker/scanner/scanner.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package scanner
2626

2727
import (
2828
"context"
29+
"sync"
2930
"time"
3031

3132
"go.temporal.io/api/serviceerror"
@@ -40,13 +41,14 @@ import (
4041
"go.temporal.io/server/common/log"
4142
"go.temporal.io/server/common/metrics"
4243
"go.temporal.io/server/common/persistence"
44+
"go.temporal.io/server/common/sdk"
4345

4446
"go.temporal.io/server/common/backoff"
4547
"go.temporal.io/server/common/dynamicconfig"
4648
"go.temporal.io/server/common/log/tag"
4749
)
4850

49-
const (
51+
var (
5052
// scannerStartUpDelay is to let services warm up
5153
scannerStartUpDelay = time.Second * 4
5254
)
@@ -84,13 +86,15 @@ type (
8486
executionManager persistence.ExecutionManager
8587
taskManager persistence.TaskManager
8688
historyClient historyservice.HistoryServiceClient
89+
workerFactory sdk.WorkerFactory
8790
}
8891

8992
// Scanner is the background sub-system that does full scans
9093
// of database tables to cleanup resources, monitor anamolies
9194
// and emit stats for analytics
9295
Scanner struct {
9396
context scannerContext
97+
wg sync.WaitGroup
9498
}
9599
)
96100

@@ -107,6 +111,7 @@ func New(
107111
executionManager persistence.ExecutionManager,
108112
taskManager persistence.TaskManager,
109113
historyClient historyservice.HistoryServiceClient,
114+
workerFactory sdk.WorkerFactory,
110115
) *Scanner {
111116
return &Scanner{
112117
context: scannerContext{
@@ -117,6 +122,7 @@ func New(
117122
executionManager: executionManager,
118123
taskManager: taskManager,
119124
historyClient: historyClient,
125+
workerFactory: workerFactory,
120126
},
121127
}
122128
}
@@ -137,20 +143,23 @@ func (s *Scanner) Start() error {
137143

138144
var workerTaskQueueNames []string
139145
if s.context.cfg.ExecutionsScannerEnabled() {
146+
s.wg.Add(1)
140147
go s.startWorkflowWithRetry(executionsScannerWFStartOptions, executionsScannerWFTypeName)
141148
workerTaskQueueNames = append(workerTaskQueueNames, executionsScannerTaskQueueName)
142149
}
143150

144151
if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeSQL && s.context.cfg.TaskQueueScannerEnabled() {
152+
s.wg.Add(1)
145153
go s.startWorkflowWithRetry(tlScannerWFStartOptions, tqScannerWFTypeName)
146154
workerTaskQueueNames = append(workerTaskQueueNames, tqScannerTaskQueueName)
147-
} else if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeNoSQL && s.context.cfg.HistoryScannerEnabled() {
155+
} else if s.context.cfg.HistoryScannerEnabled() {
156+
s.wg.Add(1)
148157
go s.startWorkflowWithRetry(historyScannerWFStartOptions, historyScannerWFTypeName)
149158
workerTaskQueueNames = append(workerTaskQueueNames, historyScannerTaskQueueName)
150159
}
151160

152161
for _, tl := range workerTaskQueueNames {
153-
work := worker.New(s.context.sdkSystemClient, tl, workerOpts)
162+
work := s.context.workerFactory.New(s.context.sdkSystemClient, tl, workerOpts)
154163

155164
work.RegisterWorkflowWithOptions(TaskQueueScannerWorkflow, workflow.RegisterOptions{Name: tqScannerWFTypeName})
156165
work.RegisterWorkflowWithOptions(HistoryScannerWorkflow, workflow.RegisterOptions{Name: historyScannerWFTypeName})
@@ -167,11 +176,16 @@ func (s *Scanner) Start() error {
167176
return nil
168177
}
169178

179+
func (s *Scanner) Stop() {
180+
s.wg.Wait()
181+
}
182+
170183
func (s *Scanner) startWorkflowWithRetry(
171184
options sdkclient.StartWorkflowOptions,
172185
workflowType string,
173186
workflowArgs ...interface{},
174187
) {
188+
defer s.wg.Done()
175189

176190
// let history / matching service warm up
177191
time.Sleep(scannerStartUpDelay)
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package scanner
26+
27+
import (
28+
"testing"
29+
"time"
30+
31+
"github.com/golang/mock/gomock"
32+
"github.com/stretchr/testify/suite"
33+
34+
"go.temporal.io/server/api/historyservicemock/v1"
35+
"go.temporal.io/server/common/config"
36+
"go.temporal.io/server/common/log"
37+
"go.temporal.io/server/common/metrics"
38+
p "go.temporal.io/server/common/persistence"
39+
"go.temporal.io/server/common/sdk"
40+
"go.temporal.io/server/common/testing/mocksdk"
41+
)
42+
43+
type scannerTestSuite struct {
44+
suite.Suite
45+
}
46+
47+
func TestScanner(t *testing.T) {
48+
suite.Run(t, new(scannerTestSuite))
49+
}
50+
51+
func (s *scannerTestSuite) TestScannerEnabled() {
52+
defer func(originalDelay time.Duration) {
53+
scannerStartUpDelay = originalDelay
54+
}(scannerStartUpDelay)
55+
scannerStartUpDelay = 0
56+
57+
type expectedScanner struct {
58+
WFTypeName string
59+
TaskQueueName string
60+
}
61+
executionScanner := expectedScanner{
62+
WFTypeName: executionsScannerWFTypeName,
63+
TaskQueueName: executionsScannerTaskQueueName,
64+
}
65+
_ = executionScanner
66+
taskQueueScanner := expectedScanner{
67+
WFTypeName: tqScannerWFTypeName,
68+
TaskQueueName: tqScannerTaskQueueName,
69+
}
70+
historyScanner := expectedScanner{
71+
WFTypeName: historyScannerWFTypeName,
72+
TaskQueueName: historyScannerTaskQueueName,
73+
}
74+
75+
type testCase struct {
76+
Name string
77+
ExecutionsScannerEnabled bool
78+
TaskQueueScannerEnabled bool
79+
HistoryScannerEnabled bool
80+
DefaultStore string
81+
ExpectedScanners []expectedScanner
82+
}
83+
84+
for _, c := range []testCase{
85+
{
86+
Name: "NothingEnabledNoSQL",
87+
ExecutionsScannerEnabled: false,
88+
TaskQueueScannerEnabled: false,
89+
HistoryScannerEnabled: false,
90+
DefaultStore: config.StoreTypeNoSQL,
91+
ExpectedScanners: []expectedScanner{},
92+
},
93+
{
94+
Name: "NothingEnabledSQL",
95+
ExecutionsScannerEnabled: false,
96+
TaskQueueScannerEnabled: false,
97+
HistoryScannerEnabled: false,
98+
DefaultStore: config.StoreTypeSQL,
99+
ExpectedScanners: []expectedScanner{},
100+
},
101+
{
102+
Name: "HistoryScannerNoSQL",
103+
ExecutionsScannerEnabled: false,
104+
TaskQueueScannerEnabled: false,
105+
HistoryScannerEnabled: true,
106+
DefaultStore: config.StoreTypeNoSQL,
107+
ExpectedScanners: []expectedScanner{historyScanner},
108+
},
109+
{
110+
Name: "HistoryScannerSQL",
111+
ExecutionsScannerEnabled: false,
112+
TaskQueueScannerEnabled: false,
113+
HistoryScannerEnabled: true,
114+
DefaultStore: config.StoreTypeSQL,
115+
ExpectedScanners: []expectedScanner{historyScanner},
116+
},
117+
{
118+
Name: "TaskQueueScannerNoSQL",
119+
ExecutionsScannerEnabled: false,
120+
TaskQueueScannerEnabled: true,
121+
HistoryScannerEnabled: false,
122+
DefaultStore: config.StoreTypeNoSQL,
123+
ExpectedScanners: []expectedScanner{}, // TODO: enable task queue scanner for NoSQL?
124+
},
125+
{
126+
Name: "TaskQueueScannerSQL",
127+
ExecutionsScannerEnabled: false,
128+
TaskQueueScannerEnabled: true,
129+
HistoryScannerEnabled: false,
130+
DefaultStore: config.StoreTypeSQL,
131+
ExpectedScanners: []expectedScanner{taskQueueScanner},
132+
},
133+
{
134+
Name: "ExecutionsScannerNoSQL",
135+
ExecutionsScannerEnabled: true,
136+
TaskQueueScannerEnabled: false,
137+
HistoryScannerEnabled: false,
138+
DefaultStore: config.StoreTypeNoSQL,
139+
ExpectedScanners: []expectedScanner{executionScanner},
140+
},
141+
{
142+
Name: "ExecutionsScannerSQL",
143+
ExecutionsScannerEnabled: true,
144+
TaskQueueScannerEnabled: false,
145+
HistoryScannerEnabled: false,
146+
DefaultStore: config.StoreTypeSQL,
147+
ExpectedScanners: []expectedScanner{executionScanner},
148+
},
149+
} {
150+
s.Run(c.Name, func() {
151+
ctrl := gomock.NewController(s.T())
152+
mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl)
153+
mockSdkClient := mocksdk.NewMockClient(ctrl)
154+
scanner := New(
155+
log.NewNoopLogger(),
156+
&Config{
157+
MaxConcurrentActivityExecutionSize: func() int {
158+
return 1
159+
},
160+
MaxConcurrentWorkflowTaskExecutionSize: func() int {
161+
return 1
162+
},
163+
MaxConcurrentActivityTaskPollers: func() int {
164+
return 1
165+
},
166+
MaxConcurrentWorkflowTaskPollers: func() int {
167+
return 1
168+
},
169+
ExecutionsScannerEnabled: func() bool {
170+
return c.ExecutionsScannerEnabled
171+
},
172+
HistoryScannerEnabled: func() bool {
173+
return c.HistoryScannerEnabled
174+
},
175+
TaskQueueScannerEnabled: func() bool {
176+
return c.TaskQueueScannerEnabled
177+
},
178+
Persistence: &config.Persistence{
179+
DefaultStore: c.DefaultStore,
180+
DataStores: map[string]config.DataStore{
181+
config.StoreTypeNoSQL: {},
182+
config.StoreTypeSQL: {
183+
SQL: &config.SQL{},
184+
},
185+
},
186+
},
187+
},
188+
mockSdkClient,
189+
metrics.NoopClient,
190+
p.NewMockExecutionManager(ctrl),
191+
p.NewMockTaskManager(ctrl),
192+
historyservicemock.NewMockHistoryServiceClient(ctrl),
193+
mockWorkerFactory,
194+
)
195+
for _, sc := range c.ExpectedScanners {
196+
worker := mocksdk.NewMockWorker(ctrl)
197+
worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
198+
worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
199+
worker.EXPECT().Start()
200+
mockWorkerFactory.EXPECT().New(gomock.Any(), sc.TaskQueueName, gomock.Any()).Return(worker)
201+
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName, gomock.Any())
202+
}
203+
err := scanner.Start()
204+
s.NoError(err)
205+
scanner.Stop()
206+
})
207+
}
208+
}

service/worker/service.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"sync/atomic"
3131
"time"
3232

33+
"github.com/benbjohnson/clock"
3334
"go.temporal.io/api/serviceerror"
3435

3536
"go.temporal.io/server/api/historyservice/v1"
@@ -97,6 +98,9 @@ type (
9798

9899
workerManager *workerManager
99100
perNamespaceWorkerManager *perNamespaceWorkerManager
101+
scanner *scanner.Scanner
102+
workerFactory sdk.WorkerFactory
103+
clock clock.Clock
100104
}
101105

102106
// Config contains all the service config for worker
@@ -148,13 +152,14 @@ func NewService(
148152
workerManager *workerManager,
149153
perNamespaceWorkerManager *perNamespaceWorkerManager,
150154
visibilityManager manager.VisibilityManager,
155+
workerFactory sdk.WorkerFactory,
151156
) (*Service, error) {
152157
workerServiceResolver, err := membershipMonitor.GetResolver(common.WorkerServiceName)
153158
if err != nil {
154159
return nil, err
155160
}
156161

157-
return &Service{
162+
s := &Service{
158163
status: common.DaemonStatusInitialized,
159164
config: serviceConfig,
160165
sdkClientFactory: sdkClientFactory,
@@ -181,7 +186,10 @@ func NewService(
181186

182187
workerManager: workerManager,
183188
perNamespaceWorkerManager: perNamespaceWorkerManager,
184-
}, nil
189+
workerFactory: workerFactory,
190+
}
191+
s.initScanner()
192+
return s, nil
185193
}
186194

187195
// NewConfig builds the new Config for worker service
@@ -401,6 +409,7 @@ func (s *Service) Stop() {
401409

402410
close(s.stopC)
403411

412+
s.scanner.Stop()
404413
s.perNamespaceWorkerManager.Stop()
405414
s.workerManager.Stop()
406415
s.namespaceRegistry.Stop()
@@ -449,17 +458,21 @@ func (s *Service) startBatcher() {
449458
}
450459
}
451460

452-
func (s *Service) startScanner() {
453-
sc := scanner.New(
461+
func (s *Service) initScanner() {
462+
s.scanner = scanner.New(
454463
s.logger,
455464
s.config.ScannerCfg,
456465
s.sdkClientFactory.GetSystemClient(),
457466
s.metricsClient,
458467
s.executionManager,
459468
s.taskManager,
460469
s.historyClient,
470+
s.workerFactory,
461471
)
462-
if err := sc.Start(); err != nil {
472+
}
473+
474+
func (s *Service) startScanner() {
475+
if err := s.scanner.Start(); err != nil {
463476
s.logger.Fatal(
464477
"error starting scanner",
465478
tag.Error(err),

0 commit comments

Comments
 (0)