Skip to content

Commit cec8be7

Browse files
committed
better implementation of the response rate tracking
1 parent 845bc73 commit cec8be7

File tree

8 files changed

+64
-90
lines changed

8 files changed

+64
-90
lines changed

cmd/resolve/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ func (p *params) SetupResolverPool(list []string, rpath string, timeout int, det
168168
p.Pool.SetDetectionResolver(p.QPS, detector)
169169
p.Detection = true
170170
}
171+
172+
p.Pool.SetRateTracker(resolve.NewRateTracker())
171173
return nil
172174
}
173175

conn.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,14 @@ func (r *connections) responses(c *connection) {
168168
default:
169169
}
170170
if n, addr, err := c.conn.ReadFrom(b); err == nil && n >= headerSize {
171+
at := time.Now()
171172
m := new(dns.Msg)
172173

173174
if err := m.Unpack(b[:n]); err == nil && len(m.Question) > 0 {
174175
r.resps.Append(&resp{
175176
Msg: m,
176177
Addr: addr,
177-
At: time.Now(),
178+
At: at,
178179
})
179180
}
180181
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
go.uber.org/ratelimit v0.3.1
1010
golang.org/x/net v0.37.0
1111
golang.org/x/sys v0.31.0
12+
golang.org/x/time v0.11.0
1213
)
1314

1415
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
2626
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
2727
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
2828
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
29+
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
30+
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
2931
golang.org/x/tools v0.31.0 h1:0EedkvKDbh+qistFTd0Bcwe/YLh4vHwWEkiI0toFIBU=
3032
golang.org/x/tools v0.31.0/go.mod h1:naFTU+Cev749tSJRXJlna0T3WxKvb1kWEx15xA4SdmQ=
3133
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

rate.go

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,30 @@
55
package resolve
66

77
import (
8+
"context"
9+
"math"
810
"strings"
911
"sync"
1012
"time"
1113

1214
"github.com/miekg/dns"
13-
"go.uber.org/ratelimit"
1415
"golang.org/x/net/publicsuffix"
16+
"golang.org/x/time/rate"
1517
)
1618

1719
const (
18-
startQPSPerNameserver = 10
19-
maxQPSPerNameserver = 100
20-
numIntervalSeconds = 2
21-
rateUpdateInterval = numIntervalSeconds * time.Second
22-
maxTimeoutPercentage = 0.5
20+
maxQPSPerNameserver = 100
21+
numIntervalSeconds = 5
22+
rateUpdateInterval = numIntervalSeconds * time.Second
23+
minUpdateSampleSize = 10
2324
)
2425

2526
type rateTrack struct {
2627
sync.Mutex
27-
qps int
28-
rate ratelimit.Limiter
29-
success int
30-
timeout int
28+
rate *rate.Limiter
29+
avg time.Duration
30+
count int
31+
first bool
3132
}
3233

3334
type RateTracker struct {
@@ -52,9 +53,11 @@ func NewRateTracker() *RateTracker {
5253
}
5354

5455
func newRateTrack() *rateTrack {
56+
limit := rate.Every(100 * time.Millisecond)
57+
5558
return &rateTrack{
56-
qps: startQPSPerNameserver,
57-
rate: ratelimit.New(startQPSPerNameserver),
59+
rate: rate.NewLimiter(limit, 1),
60+
first: true,
5861
}
5962
}
6063

@@ -75,24 +78,24 @@ func (r *RateTracker) Take(sub string) {
7578
rate := tracker.rate
7679
tracker.Unlock()
7780

78-
rate.Take()
79-
}
80-
81-
// Success signals to the RateTracker that a request for the provided subdomain name was successful.
82-
func (r *RateTracker) Success(sub string) {
83-
tracker := r.getDomainRateTracker(sub)
84-
85-
tracker.Lock()
86-
tracker.success++
87-
tracker.Unlock()
81+
_ = rate.Wait(context.TODO())
8882
}
8983

90-
// Timeout signals to the RateTracker that a request for the provided subdomain name timed out.
91-
func (r *RateTracker) Timeout(sub string) {
84+
// ReportResponseTime provides the response time for a request for the domain name provided in the sub parameter.
85+
func (r *RateTracker) ReportResponseTime(sub string, delta time.Duration) {
86+
var average, count float64
9287
tracker := r.getDomainRateTracker(sub)
9388

9489
tracker.Lock()
95-
tracker.timeout++
90+
tracker.count++
91+
count = float64(tracker.count)
92+
average = float64(tracker.avg.Milliseconds())
93+
average = ((average * (count - 1)) + float64(delta.Milliseconds())) / count
94+
tracker.avg = time.Duration(math.Round(average)) * time.Millisecond
95+
96+
if tracker.first {
97+
defer tracker.update()
98+
}
9699
tracker.Unlock()
97100
}
98101

@@ -124,26 +127,18 @@ func (r *RateTracker) updateAllRateLimiters() {
124127
func (rt *rateTrack) update() {
125128
rt.Lock()
126129
defer rt.Unlock()
127-
// check if this rate tracker has already been updated
128-
if rt.success == 0 && rt.timeout == 0 {
130+
131+
if rt.first {
132+
rt.first = false
133+
} else if rt.count < minUpdateSampleSize {
129134
return
130135
}
131-
// timeouts in excess of maxTimeoutPercentage indicate a need to slow down
132-
if float64(rt.timeout)/float64(rt.success+rt.timeout) > maxTimeoutPercentage {
133-
rt.qps -= 1
134-
if rt.qps <= 0 {
135-
rt.qps = 1
136-
}
137-
} else {
138-
rt.qps += 1
139-
if rt.qps > maxQPSPerNameserver {
140-
rt.qps = maxQPSPerNameserver
141-
}
142-
}
136+
137+
limit := rate.Every(rt.avg)
143138
// update the QPS rate limiter and reset counters
144-
rt.rate = ratelimit.New(rt.qps)
145-
rt.success = 0
146-
rt.timeout = 0
139+
rt.rate = rate.NewLimiter(limit, 1)
140+
rt.avg = 0
141+
rt.count = 0
147142
}
148143

149144
func (r *RateTracker) getDomainRateTracker(sub string) *rateTrack {

rate_test.go

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,27 @@ func TestUpdateRateLimiters(t *testing.T) {
1818
rt.Take(domain)
1919
tracker := rt.getDomainRateTracker(domain)
2020

21-
tracker.Lock()
22-
qps := tracker.qps
23-
tracker.Unlock()
24-
num := qps / 3
25-
// set a large number of timeouts
26-
for i := 0; i < num; i++ {
27-
rt.Success(domain)
28-
}
29-
30-
max := tracker.qps - num
31-
for i := 0; i < max; i++ {
32-
rt.Timeout(domain)
33-
}
34-
time.Sleep(rateUpdateInterval + (rateUpdateInterval / 2))
21+
rt.ReportResponseTime(domain, 500*time.Millisecond)
3522

3623
tracker.Lock()
37-
qps2 := tracker.qps
24+
limit := tracker.rate.Limit()
3825
tracker.Unlock()
3926
// the QPS should now be lower
40-
if qps2 >= qps {
41-
t.Errorf("Unexpected QPS, expected QPS lower than %d, got %d", qps, qps2)
27+
if limit > 3 {
28+
t.Errorf("Unexpected QPS, expected QPS lower than %d, got %f", 3, limit)
4229
}
4330

4431
tracker.Lock()
45-
succ := tracker.success
46-
tout := tracker.timeout
32+
tracker.avg = 50 * time.Millisecond
33+
tracker.count = minUpdateSampleSize
4734
tracker.Unlock()
48-
// check that the counters have been cleared
49-
if succ != 0 || tout != 0 {
50-
t.Errorf("Unexpected counter values, Success Counter %d, Timeout Counter %d", succ, tout)
51-
}
52-
53-
tracker.Lock()
54-
qps = tracker.qps
55-
tracker.Unlock()
56-
// set a large number of successes
57-
for i := 0; i < qps; i++ {
58-
rt.Success(domain)
59-
}
60-
time.Sleep(rateUpdateInterval + (rateUpdateInterval / 2))
35+
tracker.update()
6136

6237
tracker.Lock()
63-
qps2 = tracker.qps
38+
limit = tracker.rate.Limit()
6439
tracker.Unlock()
6540
// the QPS should now be higher
66-
if qps2 <= qps {
67-
t.Errorf("Unexpected QPS, expected QPS higher than %d, got %d", qps, qps2)
41+
if limit < 20 || limit > 21 {
42+
t.Errorf("Unexpected QPS, expected QPS of %d, got %f", 20, limit)
6843
}
6944
}

resolvers.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,8 @@ func (r *Resolvers) processSingleResp(response *resp) {
357357
req.Result <- req.Resp
358358
req.Res.collectStats(req.Resp)
359359
if r.servRates != nil {
360-
r.servRates.Success(name)
360+
delta := req.RecvAt.Sub(req.SentAt)
361+
r.servRates.ReportResponseTime(name, delta)
361362
}
362363
req.release()
363364
}
@@ -392,9 +393,6 @@ func (r *Resolvers) timeouts() {
392393
for _, req := range res.xchgs.removeExpired() {
393394
req.errNoResponse()
394395
res.collectStats(req.Msg)
395-
if r.servRates != nil {
396-
r.servRates.Timeout(req.Msg.Question[0].Name)
397-
}
398396
req.release()
399397
}
400398
}

xchg_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,18 @@ func TestXchgUpdateTimestamp(t *testing.T) {
4444
msg := QueryMsg(name, dns.TypeA)
4545
req := &request{Msg: msg}
4646

47-
if !req.Timestamp.IsZero() {
47+
if !req.SentAt.IsZero() {
4848
t.Errorf("Expected the new request to have a zero value timestamp")
4949
}
5050
if err := xchg.add(req); err != nil {
5151
t.Errorf("Failed to add the request")
5252
}
53-
xchg.updateTimestamp(msg.Id, name)
53+
xchg.updateSentAt(msg.Id, name)
5454
// For complete coverage
55-
xchg.updateTimestamp(msg.Id, "Bad Name")
55+
xchg.updateSentAt(msg.Id, "Bad Name")
5656

5757
req = xchg.remove(msg.Id, msg.Question[0].Name)
58-
if req == nil || req.Timestamp.IsZero() {
58+
if req == nil || req.SentAt.IsZero() {
5959
t.Errorf("Expected the updated request to not have a zero value timestamp")
6060
}
6161
}
@@ -67,8 +67,8 @@ func TestXchgRemoveExpired(t *testing.T) {
6767
for _, name := range names {
6868
msg := QueryMsg(name, dns.TypeA)
6969
if err := xchg.add(&request{
70-
Msg: msg,
71-
Timestamp: time.Now(),
70+
Msg: msg,
71+
SentAt: time.Now(),
7272
}); err != nil {
7373
t.Errorf("Failed to add the request")
7474
}
@@ -77,8 +77,8 @@ func TestXchgRemoveExpired(t *testing.T) {
7777
name := "vpn.caffix.net"
7878
msg := QueryMsg(name, dns.TypeA)
7979
if err := xchg.add(&request{
80-
Msg: msg,
81-
Timestamp: time.Now().Add(3 * time.Second),
80+
Msg: msg,
81+
SentAt: time.Now().Add(3 * time.Second),
8282
}); err != nil {
8383
t.Errorf("Failed to add the request")
8484
}

0 commit comments

Comments
 (0)