Skip to content

Commit 2d35d0d

Browse files
arschlesasw101
andauthored
fixing concurrent map access issue (#415)
Signed-off-by: Aaron Schlesinger <[email protected]> Co-authored-by: Aaron Wislang <[email protected]>
1 parent b15bc3d commit 2d35d0d

File tree

5 files changed

+118
-81
lines changed

5 files changed

+118
-81
lines changed

scaler/handlers.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ func (e *impl) IsActive(
7373
)
7474
if !ok {
7575
err := fmt.Errorf("host '%s' not found in counts", host)
76-
allCounts := mergeCountsWithRoutingTable(e.pinger.counts(), e.routingTable)
76+
allCounts := e.pinger.mergeCountsWithRoutingTable(
77+
e.routingTable,
78+
)
7779
lggr.Error(err, "Given host was not found in queue count map", "host", host, "allCounts", allCounts)
7880
return nil, err
7981
}
@@ -173,7 +175,7 @@ func (e *impl) GetMetrics(
173175
hostCount = e.pinger.aggregate()
174176
} else {
175177
err := fmt.Errorf("host '%s' not found in counts", host)
176-
allCounts := mergeCountsWithRoutingTable(e.pinger.counts(), e.routingTable)
178+
allCounts := e.pinger.mergeCountsWithRoutingTable(e.routingTable)
177179
lggr.Error(err, "allCounts", allCounts)
178180
return nil, err
179181
}

scaler/host_counts.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,6 @@ import (
44
"github.com/kedacore/http-add-on/pkg/routing"
55
)
66

7-
// mergeCountsWithRoutingTable ensures that all hosts in routing table
8-
// are present in combined counts, if count is not present value is set to 0
9-
func mergeCountsWithRoutingTable(
10-
counts map[string]int,
11-
table routing.TableReader,
12-
) map[string]int {
13-
mergedCounts := make(map[string]int)
14-
for _, host := range table.Hosts() {
15-
mergedCounts[host] = 0
16-
}
17-
for key, value := range counts {
18-
mergedCounts[key] = value
19-
}
20-
return mergedCounts
21-
}
22-
237
// getHostCount gets proper count for given host regardless whether
248
// host is in counts or only in routerTable
259
func getHostCount(

scaler/host_counts_test.go

Lines changed: 50 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -14,79 +14,66 @@ type testCase struct {
1414
retCounts map[string]int
1515
}
1616

17-
var cases = []testCase{
18-
{
19-
name: "empty queue",
20-
table: newRoutingTable([]hostAndTarget{
21-
{
22-
host: "www.example.com",
23-
target: routing.Target{},
17+
func cases() []testCase {
18+
return []testCase{
19+
{
20+
name: "empty queue",
21+
table: newRoutingTable([]hostAndTarget{
22+
{
23+
host: "www.example.com",
24+
target: routing.Target{},
25+
},
26+
{
27+
host: "www.example2.com",
28+
target: routing.Target{},
29+
},
30+
}),
31+
counts: make(map[string]int),
32+
retCounts: map[string]int{
33+
"www.example.com": 0,
34+
"www.example2.com": 0,
2435
},
25-
{
26-
host: "www.example2.com",
27-
target: routing.Target{},
28-
},
29-
}),
30-
counts: make(map[string]int),
31-
retCounts: map[string]int{
32-
"www.example.com": 0,
33-
"www.example2.com": 0,
3436
},
35-
},
36-
{
37-
name: "one entry in queue, same entry in routing table",
38-
table: newRoutingTable([]hostAndTarget{
39-
{
40-
host: "example.com",
41-
target: routing.Target{},
37+
{
38+
name: "one entry in queue, same entry in routing table",
39+
table: newRoutingTable([]hostAndTarget{
40+
{
41+
host: "example.com",
42+
target: routing.Target{},
43+
},
44+
}),
45+
counts: map[string]int{
46+
"example.com": 1,
47+
},
48+
retCounts: map[string]int{
49+
"example.com": 1,
4250
},
43-
}),
44-
counts: map[string]int{
45-
"example.com": 1,
46-
},
47-
retCounts: map[string]int{
48-
"example.com": 1,
4951
},
50-
},
51-
{
52-
name: "one entry in queue, two in routing table",
53-
table: newRoutingTable([]hostAndTarget{
54-
{
55-
host: "example.com",
56-
target: routing.Target{},
52+
{
53+
name: "one entry in queue, two in routing table",
54+
table: newRoutingTable([]hostAndTarget{
55+
{
56+
host: "example.com",
57+
target: routing.Target{},
58+
},
59+
{
60+
host: "example2.com",
61+
target: routing.Target{},
62+
},
63+
}),
64+
counts: map[string]int{
65+
"example.com": 1,
5766
},
58-
{
59-
host: "example2.com",
60-
target: routing.Target{},
67+
retCounts: map[string]int{
68+
"example.com": 1,
69+
"example2.com": 0,
6170
},
62-
}),
63-
counts: map[string]int{
64-
"example.com": 1,
6571
},
66-
retCounts: map[string]int{
67-
"example.com": 1,
68-
"example2.com": 0,
69-
},
70-
},
71-
}
72-
73-
func TestMergeCountsWithRoutingTable(t *testing.T) {
74-
75-
for _, tc := range cases {
76-
t.Run(tc.name, func(t *testing.T) {
77-
r := require.New(t)
78-
ret := mergeCountsWithRoutingTable(
79-
tc.counts,
80-
tc.table,
81-
)
82-
r.Equal(tc.retCounts, ret)
83-
})
8472
}
85-
}
8673

74+
}
8775
func TestGetHostCount(t *testing.T) {
88-
89-
for _, tc := range cases {
76+
for _, tc := range cases() {
9077
for host, retCount := range tc.retCounts {
9178
t.Run(tc.name, func(t *testing.T) {
9279
r := require.New(t)

scaler/queue_pinger.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/go-logr/logr"
1212
"github.com/kedacore/http-add-on/pkg/k8s"
1313
"github.com/kedacore/http-add-on/pkg/queue"
14+
"github.com/kedacore/http-add-on/pkg/routing"
1415
"github.com/pkg/errors"
1516
"golang.org/x/sync/errgroup"
1617
)
@@ -124,6 +125,23 @@ func (q *queuePinger) counts() map[string]int {
124125
return q.allCounts
125126
}
126127

128+
// mergeCountsWithRoutingTable ensures that all hosts in routing table
129+
// are present in combined counts, if count is not present value is set to 0
130+
func (q *queuePinger) mergeCountsWithRoutingTable(
131+
table routing.TableReader,
132+
) map[string]int {
133+
q.pingMut.RLock()
134+
defer q.pingMut.RUnlock()
135+
mergedCounts := make(map[string]int)
136+
for _, host := range table.Hosts() {
137+
mergedCounts[host] = 0
138+
}
139+
for key, value := range q.allCounts {
140+
mergedCounts[key] = value
141+
}
142+
return mergedCounts
143+
}
144+
127145
func (q *queuePinger) aggregate() int {
128146
q.pingMut.RLock()
129147
defer q.pingMut.RUnlock()

scaler/queue_pinger_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/kedacore/http-add-on/pkg/k8s"
1010
"github.com/kedacore/http-add-on/pkg/queue"
1111
"github.com/stretchr/testify/require"
12+
"golang.org/x/sync/errgroup"
1213
v1 "k8s.io/api/core/v1"
1314
)
1415

@@ -215,3 +216,48 @@ func TestFetchCounts(t *testing.T) {
215216
}
216217
r.Equal(expectedCounts, cts)
217218
}
219+
220+
func TestMergeCountsWithRoutingTable(t *testing.T) {
221+
for _, tc := range cases() {
222+
t.Run(tc.name, func(t *testing.T) {
223+
ctx := context.Background()
224+
grp, ctx := errgroup.WithContext(ctx)
225+
r := require.New(t)
226+
const C = 100
227+
tickr, q, err := newFakeQueuePinger(
228+
ctx,
229+
logr.Discard(),
230+
)
231+
r.NoError(err)
232+
defer tickr.Stop()
233+
q.allCounts = tc.counts
234+
235+
retCh := make(chan map[string]int)
236+
for i := 0; i < C; i++ {
237+
grp.Go(func() error {
238+
retCh <- q.mergeCountsWithRoutingTable(tc.table)
239+
return nil
240+
})
241+
}
242+
243+
// ensure we receive from retCh C times
244+
allRets := map[int]map[string]int{}
245+
for i := 0; i < C; i++ {
246+
allRets[i] = <-retCh
247+
}
248+
249+
r.NoError(grp.Wait())
250+
251+
// ensure that all returned maps are the
252+
// same
253+
prev := allRets[0]
254+
for i := 1; i < C; i++ {
255+
r.Equal(prev, allRets[i])
256+
prev = allRets[i]
257+
}
258+
// ensure that all the returned maps are
259+
// equal to what we expected
260+
r.Equal(tc.retCounts, prev)
261+
})
262+
}
263+
}

0 commit comments

Comments
 (0)