Skip to content

Commit 8a4bf5d

Browse files
committed
add tcp-pinger for measuring rtt of mailservers
Signed-off-by: Jakub Sokołowski <[email protected]>
1 parent 89659f8 commit 8a4bf5d

File tree

88 files changed

+3367
-146
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+3367
-146
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ require (
2727
github.com/mutecomm/go-sqlcipher v0.0.0-20190227152316-55dbde17881f
2828
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
2929
github.com/pborman/uuid v1.2.0
30+
github.com/pkg/errors v0.8.1
3031
github.com/prometheus/client_golang v1.2.1
3132
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
3233
github.com/status-im/migrate/v4 v4.6.2-status.2
3334
github.com/status-im/rendezvous v1.3.0
3435
github.com/status-im/status-protocol-go v0.4.5-0.20191107122821-775d17008edf
36+
github.com/status-im/tcp-shaker v0.0.0-20191113234341-a6c5fb59f6ad
3537
github.com/status-im/whisper v1.5.2
3638
github.com/stretchr/testify v1.4.0
3739
github.com/syndtr/goleveldb v1.0.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,8 @@ github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4
578578
github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s=
579579
github.com/status-im/status-protocol-go v0.4.5-0.20191107122821-775d17008edf h1:1boOd5yMePhXxYei97Rm/hFF45alUpMl87ZAWvlSKtg=
580580
github.com/status-im/status-protocol-go v0.4.5-0.20191107122821-775d17008edf/go.mod h1:r8TgqNOpY+fGKkBfR9PldxSSaBN0EsEEY4a3WsIh9LY=
581+
github.com/status-im/tcp-shaker v0.0.0-20191113234341-a6c5fb59f6ad h1:L0O2TPUJvbiByREFlaP4VobT/msU2/32QuUc2cnKAQU=
582+
github.com/status-im/tcp-shaker v0.0.0-20191113234341-a6c5fb59f6ad/go.mod h1:5bLqb2K4TvKZrrAY/c/d8Znmz11HnBRv2b03vkWGzzQ=
581583
github.com/status-im/whisper v1.5.2 h1:26NgiKusmPic38eQdtXnaY+iaQ/LuQ3Dh0kCGYT/Uxs=
582584
github.com/status-im/whisper v1.5.2/go.mod h1:emrOxzJme0k66QtbbQ2bdd3P8RCdLZ8sTD7SkwH1s2s=
583585
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=
@@ -716,6 +718,8 @@ golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7w
716718
golang.org/x/sys v0.0.0-20190927073244-c990c680b611/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
717719
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
718720
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
721+
golang.org/x/sys v0.0.0-20191110163157-d32e6e3b99c4 h1:Hynbrlo6LbYI3H1IqXpkVDOcX/3HiPdhVEuyj5a59RM=
722+
golang.org/x/sys v0.0.0-20191110163157-d32e6e3b99c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
719723
golang.org/x/text v0.0.0-20171227012246-e19ae1496984/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
720724
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
721725
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

rtt/rtt.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package rtt
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
errors "github.com/pkg/errors"
9+
tcp "github.com/status-im/tcp-shaker"
10+
)
11+
12+
type Result struct {
13+
Addr string
14+
RTTMs int
15+
Err error
16+
}
17+
18+
// timeoutError indicates an error due to TCP connection timeout.
19+
// tcp-shaker returns an error implementing this interface in such a case.
20+
type timeoutError interface {
21+
Timeout() bool
22+
}
23+
24+
func runCheck(c *tcp.Checker, address string, timeout time.Duration) Result {
25+
// mesaure RTT
26+
start := time.Now()
27+
// TCP Ping
28+
err := c.CheckAddr(address, timeout)
29+
// measure RTT
30+
elapsed := time.Since(start)
31+
latency := int(elapsed.Milliseconds())
32+
33+
if err != nil { // don't confuse users with valid latency values on error
34+
latency = -1
35+
switch err.(type) {
36+
case timeoutError:
37+
err = errors.Wrap(err, "tcp check timeout")
38+
case tcp.ErrConnect:
39+
err = errors.Wrap(err, "unable to connect")
40+
default:
41+
err = err
42+
}
43+
}
44+
45+
return Result{
46+
Addr: address,
47+
RTTMs: latency,
48+
Err: err,
49+
}
50+
}
51+
52+
func waitForResults(errCh <-chan error, resCh <-chan Result) (results []Result, err error) {
53+
for {
54+
select {
55+
case err = <-errCh:
56+
return nil, err
57+
case res, ok := <-resCh:
58+
if !ok {
59+
return
60+
}
61+
results = append(results, res)
62+
}
63+
}
64+
}
65+
66+
func CheckHosts(addresses []string, timeout time.Duration) ([]Result, error) {
67+
c := tcp.NewChecker()
68+
69+
// channel for receiving possible checking loop failure
70+
errCh := make(chan error, 1)
71+
72+
// stop the checking loop when function exists
73+
ctx, stopChecker := context.WithCancel(context.Background())
74+
defer stopChecker()
75+
76+
// loop that queries Epoll and pipes events to CheckAddr() calls
77+
go func() {
78+
errCh <- c.CheckingLoop(ctx)
79+
}()
80+
// wait for CheckingLoop to prepare the epoll/kqueue
81+
<-c.WaitReady()
82+
83+
// channel for returning results from concurrent checks
84+
resCh := make(chan Result, len(addresses))
85+
86+
var wg sync.WaitGroup
87+
for i := 0; i < len(addresses); i++ {
88+
wg.Add(1)
89+
go func(address string, resCh chan<- Result) {
90+
defer wg.Done()
91+
resCh <- runCheck(c, address, timeout)
92+
}(addresses[i], resCh)
93+
}
94+
// wait for all the routines to finish before closing results channel
95+
wg.Wait()
96+
close(resCh)
97+
98+
// wait for the results for all addresses or a checking loop error
99+
return waitForResults(errCh, resCh)
100+
}

services/mailservers/tcp_ping.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package mailservers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/ethereum/go-ethereum/p2p/enode"
10+
"github.com/ethereum/go-ethereum/p2p/enr"
11+
12+
"github.com/status-im/status-go/rtt"
13+
)
14+
15+
type PingQuery struct {
16+
Addresses []string `json:"addresses"`
17+
TimeoutMs int `json:"timeoutMs"`
18+
}
19+
20+
type PingResult struct {
21+
ENode string `json:"address"`
22+
RTTMs *int `json:"rtt_ms"`
23+
Err *string `json:"error"`
24+
}
25+
26+
func (pr *PingResult) Update(rttMs int, err error) {
27+
if err != nil {
28+
errStr := err.Error()
29+
pr.Err = &errStr
30+
}
31+
pr.RTTMs = &rttMs
32+
}
33+
34+
func toPingResult(r rtt.Result, addrMap map[string]string) PingResult {
35+
var err *string
36+
if r.Err != nil {
37+
tmpErr := r.Err.Error()
38+
err = &tmpErr
39+
}
40+
return PingResult{
41+
ENode: addrMap[r.Addr],
42+
RTTMs: &r.RTTMs,
43+
Err: err,
44+
}
45+
}
46+
47+
func parseEnodes(enodes []string) (map[string]*PingResult, []string) {
48+
// parse enode addreses into normal host + port addresses
49+
results := make(map[string]*PingResult, len(enodes))
50+
toPing := make([]string, 0)
51+
52+
var ip4 enr.IPv4
53+
var tcp enr.TCP
54+
for i := range enodes {
55+
node, err := enode.ParseV4(enodes[i])
56+
if err != nil {
57+
// using enode since it's irrelevant but needs to be unique
58+
errStr := err.Error()
59+
results[enodes[i]] = &PingResult{ENode: enodes[i], Err: &errStr}
60+
}
61+
// helpers for extracting values from enode.Node
62+
node.Load(&ip4)
63+
node.Load(&tcp)
64+
addr := fmt.Sprintf("%s:%d", net.IP(ip4).String(), tcp)
65+
results[addr] = &PingResult{ENode: enodes[i]}
66+
toPing = append(toPing, addr)
67+
}
68+
return results, toPing
69+
}
70+
71+
func mapValues(m map[string]*PingResult) []PingResult {
72+
rval := make([]PingResult, len(m))
73+
var i int
74+
for _, value := range m {
75+
rval[i] = *value
76+
i++
77+
}
78+
return rval
79+
}
80+
81+
func (a *API) Ping(ctx context.Context, pq PingQuery) ([]PingResult, error) {
82+
timeout := time.Duration(pq.TimeoutMs) * time.Millisecond
83+
84+
// parse enodes into pingable addresses
85+
resultsMap, toPing := parseEnodes(pq.Addresses)
86+
87+
// run the checks concurrently
88+
results, err := rtt.CheckHosts(toPing, timeout)
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
// set ping results
94+
for i := range results {
95+
r := results[i]
96+
resultsMap[r.Addr].Update(r.RTTMs, r.Err)
97+
}
98+
99+
return mapValues(resultsMap), nil
100+
}

vendor/github.com/status-im/tcp-shaker/.gitignore

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/status-im/tcp-shaker/.travis.yml

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/status-im/tcp-shaker/CONTRIBUTING.md

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/status-im/tcp-shaker/LICENSE

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)