Skip to content

Commit f006180

Browse files
committed
Support user-configured heartbeat intervals
There are two changes: * The heartbeat interval period is now a user specified option, where the default is 30s. * A heartbeat is only transmitted if there was no outbound traffic. This is an optimization that enables us to avoid transmitting a heartbeat if a normal data packet would have served the same purpose.
1 parent 8f73111 commit f006180

File tree

3 files changed

+76
-28
lines changed

3 files changed

+76
-28
lines changed

logger.go

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sort"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/dsnet/golib/unitconv"
@@ -20,7 +21,7 @@ type packetLogger struct {
2021
last time.Time // Last time we printed statistics
2122
c chan packetLog
2223
m map[packetLog]packetCount
23-
rx, tx struct{ okay, drop packetCount }
24+
rx, tx struct{ okay, drop packetCount } // Atomically updated
2425
}
2526

2627
type packetLog struct {
@@ -74,19 +75,53 @@ func (pl *packetLogger) Log(b []byte, d direction, dropped bool) {
7475
pl.c <- p
7576
}
7677

78+
// Stats returns statistics on the total number and sizes of packets
79+
// transmitted or received.
80+
func (pl *packetLogger) Stats() (s struct {
81+
Rx, Tx struct{ Okay, Drop struct{ Count, Sizes uint64 } }
82+
}) {
83+
s.Tx.Okay.Count = atomic.LoadUint64(&pl.tx.okay.count)
84+
s.Tx.Okay.Sizes = atomic.LoadUint64(&pl.tx.okay.sizes)
85+
s.Rx.Okay.Count = atomic.LoadUint64(&pl.rx.okay.count)
86+
s.Rx.Okay.Sizes = atomic.LoadUint64(&pl.rx.okay.sizes)
87+
s.Tx.Drop.Count = atomic.LoadUint64(&pl.tx.drop.count)
88+
s.Tx.Drop.Sizes = atomic.LoadUint64(&pl.tx.drop.sizes)
89+
s.Rx.Drop.Count = atomic.LoadUint64(&pl.rx.drop.count)
90+
s.Rx.Drop.Sizes = atomic.LoadUint64(&pl.rx.drop.sizes)
91+
return
92+
}
93+
7794
func (pl *packetLogger) monitor(ctx context.Context) {
7895
t := time.NewTicker(30 * time.Second)
7996
defer t.Stop()
8097

8198
for {
8299
select {
83100
case p := <-pl.c:
84-
size := p.length
101+
size := uint64(p.length)
102+
103+
// Update fine-granularity statistics.
85104
p.length = 0
86105
pc := pl.m[p]
87106
pc.count++
88-
pc.sizes += uint64(size)
107+
pc.sizes += size
89108
pl.m[p] = pc
109+
110+
// Update total packet statistics.
111+
switch {
112+
case !p.dropped && p.direction == outbound:
113+
atomic.AddUint64(&pl.tx.okay.count, 1)
114+
atomic.AddUint64(&pl.tx.okay.sizes, size)
115+
case !p.dropped && p.direction == inbound:
116+
atomic.AddUint64(&pl.rx.okay.count, 1)
117+
atomic.AddUint64(&pl.rx.okay.sizes, size)
118+
case p.dropped && p.direction == outbound:
119+
atomic.AddUint64(&pl.tx.drop.count, 1)
120+
atomic.AddUint64(&pl.tx.drop.sizes, size)
121+
case p.dropped && p.direction == inbound:
122+
atomic.AddUint64(&pl.rx.drop.count, 1)
123+
atomic.AddUint64(&pl.rx.drop.sizes, size)
124+
}
90125
case <-t.C:
91126
var count, sizes uint64
92127
for _, v := range pl.m {
@@ -113,19 +148,6 @@ func (pl *packetLogger) print() {
113148
stats := make([]string, 2) // First 2 lines for total stats
114149
protoNames := map[int]string{icmp: "ICMP", udp: "UDP", tcp: "TCP"}
115150
for k, v := range pl.m {
116-
var pc *packetCount
117-
switch {
118-
case !k.dropped && k.direction == outbound:
119-
pc = &pl.tx.okay
120-
case !k.dropped && k.direction == inbound:
121-
pc = &pl.rx.okay
122-
case k.dropped && k.direction == outbound:
123-
pc = &pl.tx.drop
124-
case k.dropped && k.direction == inbound:
125-
pc = &pl.rx.drop
126-
}
127-
pc.count += v.count
128-
pc.sizes += v.sizes
129151

130152
proto := protoNames[k.ipProtocol]
131153
if proto == "" {
@@ -142,10 +164,12 @@ func (pl *packetLogger) print() {
142164
stats = append(stats, fmt.Sprintf("\tIPv%d/%s %s - %cx %d %spackets (%sB)",
143165
k.ipVersion, proto, link, k.direction, v.count, drop, formatIEC(v.sizes)))
144166
}
167+
168+
s := pl.Stats()
145169
stats[0] = fmt.Sprintf("\tRx %d total packets (%sB), dropped %d total packets (%sB)",
146-
pl.rx.okay.count, formatIEC(pl.rx.okay.sizes), pl.rx.drop.count, formatIEC(pl.rx.drop.sizes))
170+
s.Rx.Okay.Count, formatIEC(s.Rx.Okay.Sizes), s.Rx.Drop.Count, formatIEC(s.Rx.Drop.Sizes))
147171
stats[1] = fmt.Sprintf("\tTx %d total packets (%sB), dropped %d total packets (%sB)",
148-
pl.tx.okay.count, formatIEC(pl.tx.okay.sizes), pl.tx.drop.count, formatIEC(pl.tx.drop.sizes))
172+
s.Tx.Okay.Count, formatIEC(s.Tx.Okay.Sizes), s.Tx.Drop.Count, formatIEC(s.Tx.Drop.Sizes))
149173
sort.Strings(stats[2:])
150174
period := time.Now().Round(time.Second).Sub(pl.last)
151175
pl.logger.Printf("Packet statistics (%v):\n%s", period, strings.Join(stats, "\n"))

main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"path"
5353
"runtime"
5454
"syscall"
55+
"time"
5556

5657
"github.com/dsnet/golib/jsonfmt"
5758
)
@@ -102,6 +103,15 @@ type TunnelConfig struct {
102103
// The set of allowed ports must match on both the client and server.
103104
AllowedPorts []uint16
104105

106+
// HeartbeatInterval is the amount of time in seconds without any
107+
// outbound traffic to wait before the tunnel client will send a heartbeat
108+
// message to the server. In the event that the client's address changed,
109+
// this informs the server of the new client address.
110+
//
111+
// This field only applies to the client.
112+
// The default value is 30.
113+
HeartbeatInterval *uint
114+
105115
// PacketMagic is used to generate a sequence of bytes that is prepended to
106116
// every TUN packet sent over UDP. Only inbound messages carrying the
107117
// magic sequence will be accepted. This mechanism is used as a trivial way
@@ -148,6 +158,10 @@ func loadConfig(conf string) (tunn tunnel, logger *log.Logger, closer func() err
148158
if config.TunnelAddress == config.TunnelPeerAddress {
149159
logger.Fatal("TunnelAddress and TunnelPeerAddress must not conflict")
150160
}
161+
if config.HeartbeatInterval == nil {
162+
config.HeartbeatInterval = new(uint)
163+
*config.HeartbeatInterval = 30
164+
}
151165
host, _, _ := net.SplitHostPort(config.NetworkAddress)
152166
serverMode := host == ""
153167

@@ -195,6 +209,7 @@ func loadConfig(conf string) (tunn tunnel, logger *log.Logger, closer func() err
195209
netAddr: config.NetworkAddress,
196210
ports: config.AllowedPorts,
197211
magic: config.PacketMagic,
212+
beatInterval: time.Second * time.Duration(*config.HeartbeatInterval),
198213
log: logger,
199214
}
200215
return tunn, logger, closer

tunnel.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type tunnel struct {
4141
netAddr string
4242
ports []uint16
4343
magic string
44+
beatInterval time.Duration
4445

4546
log logger
4647

@@ -113,6 +114,12 @@ func (t tunnel) run(ctx context.Context) {
113114
// TUN device and UDP socket have been set up. However, there is no good
114115
// support for doing so currently: https://golang.org/issue/1435
115116

117+
if t.testReady != nil {
118+
close(t.testReady)
119+
}
120+
pf := newPortFilter(t.ports)
121+
pl := newPacketLogger(ctx, &wg, t.log)
122+
116123
// On the client, start some goroutines to accommodate for the dynamically
117124
// changing environment that the client may be in.
118125
magic := md5.Sum([]byte(t.magic))
@@ -141,27 +148,29 @@ func (t tunnel) run(ctx context.Context) {
141148
// periodically ping the server to inform it of our new UDP address.
142149
// Sending a packet with only the magic header is sufficient.
143150
go func() {
144-
ticker := time.NewTicker(30 * time.Second)
151+
if t.beatInterval == 0 {
152+
return
153+
}
154+
ticker := time.NewTicker(t.beatInterval)
145155
defer ticker.Stop()
156+
var prevTxn uint64
146157
for range ticker.C {
147-
if isDone(ctx) {
158+
if isDone(ctx) { // Stop if done.
148159
return
149160
}
150161
raddr := t.loadRemoteAddr()
151-
if raddr == nil {
162+
if raddr == nil { // Skip if no remote endpoint.
152163
continue
153164
}
154-
sock.WriteToUDP(magic[:], raddr)
165+
txn := pl.Stats().Tx.Okay.Count
166+
if prevTxn == txn { // Only send if there is no outbound traffic
167+
sock.WriteToUDP(magic[:], raddr)
168+
}
169+
prevTxn = txn
155170
}
156171
}()
157172
}
158173

159-
if t.testReady != nil {
160-
close(t.testReady)
161-
}
162-
pf := newPortFilter(t.ports)
163-
pl := newPacketLogger(ctx, &wg, t.log)
164-
165174
// Handle outbound traffic.
166175
wg.Add(1)
167176
go func() {

0 commit comments

Comments
 (0)