Skip to content

Commit 15e31d4

Browse files
authored
Merge pull request #1247 from apernet/wip-dumpstream
feat: add /dump/streams as a traffic stats API
2 parents c34f237 + 3e8c205 commit 15e31d4

File tree

7 files changed

+351
-6
lines changed

7 files changed

+351
-6
lines changed

core/internal/integration_tests/mocks/mock_TrafficLogger.go

Lines changed: 73 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/internal/integration_tests/trafficlogger_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func TestClientServerTrafficLoggerTCP(t *testing.T) {
6262
return nil
6363
})
6464
serverOb.EXPECT().TCP(addr).Return(sobConn, nil).Once()
65+
trafficLogger.EXPECT().TraceStream(mock.Anything, mock.Anything).Return().Once()
6566

6667
conn, err := c.TCP(addr)
6768
assert.NoError(t, err)
@@ -84,6 +85,7 @@ func TestClientServerTrafficLoggerTCP(t *testing.T) {
8485
time.Sleep(1 * time.Second) // Need some time for the server to receive the data
8586

8687
// Client reads from server again but blocked
88+
trafficLogger.EXPECT().UntraceStream(mock.Anything).Return().Once()
8789
trafficLogger.EXPECT().LogTraffic("nobody", uint64(0), uint64(4)).Return(false).Once()
8890
trafficLogger.EXPECT().LogOnlineState("nobody", false).Return().Once()
8991
sobConnCh <- []byte("nope")

core/internal/utils/atomic.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,33 @@ func (t *AtomicTime) Set(new time.Time) {
2222
func (t *AtomicTime) Get() time.Time {
2323
return t.v.Load().(time.Time)
2424
}
25+
26+
type Atomic[T any] struct {
27+
v atomic.Value
28+
}
29+
30+
func (a *Atomic[T]) Load() T {
31+
value := a.v.Load()
32+
if value == nil {
33+
var zero T
34+
return zero
35+
}
36+
return value.(T)
37+
}
38+
39+
func (a *Atomic[T]) Store(value T) {
40+
a.v.Store(value)
41+
}
42+
43+
func (a *Atomic[T]) Swap(new T) T {
44+
old := a.v.Swap(new)
45+
if old == nil {
46+
var zero T
47+
return zero
48+
}
49+
return old.(T)
50+
}
51+
52+
func (a *Atomic[T]) CompareAndSwap(old, new T) bool {
53+
return a.v.CompareAndSwap(old, new)
54+
}

core/server/config.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"crypto/tls"
55
"net"
66
"net/http"
7+
"sync/atomic"
78
"time"
89

910
"github.com/apernet/hysteria/core/v2/errors"
1011
"github.com/apernet/hysteria/core/v2/internal/pmtud"
12+
"github.com/apernet/hysteria/core/v2/internal/utils"
1113
"github.com/apernet/quic-go"
1214
)
1315

@@ -212,4 +214,66 @@ type EventLogger interface {
212214
type TrafficLogger interface {
213215
LogTraffic(id string, tx, rx uint64) (ok bool)
214216
LogOnlineState(id string, online bool)
217+
TraceStream(stream quic.Stream, stats *StreamStats)
218+
UntraceStream(stream quic.Stream)
219+
}
220+
221+
type StreamState int
222+
223+
const (
224+
// StreamStateInitial indicates the initial state of a stream.
225+
// Client has opened the stream, but we have not received the proxy request yet.
226+
StreamStateInitial StreamState = iota
227+
228+
// StreamStateHooking indicates that the hook (usually sniff) is processing.
229+
// Client has sent the proxy request, but sniff requires more data to complete.
230+
StreamStateHooking
231+
232+
// StreamStateConnecting indicates that we are connecting to the proxy target.
233+
StreamStateConnecting
234+
235+
// StreamStateEstablished indicates the proxy is established.
236+
StreamStateEstablished
237+
238+
// StreamStateClosed indicates the stream is closed.
239+
StreamStateClosed
240+
)
241+
242+
func (s StreamState) String() string {
243+
switch s {
244+
case StreamStateInitial:
245+
return "init"
246+
case StreamStateHooking:
247+
return "hook"
248+
case StreamStateConnecting:
249+
return "connect"
250+
case StreamStateEstablished:
251+
return "estab"
252+
case StreamStateClosed:
253+
return "closed"
254+
default:
255+
return "unknown"
256+
}
257+
}
258+
259+
type StreamStats struct {
260+
State utils.Atomic[StreamState]
261+
262+
AuthID string
263+
ConnID uint32
264+
InitialTime time.Time
265+
266+
ReqAddr utils.Atomic[string]
267+
HookedReqAddr utils.Atomic[string]
268+
269+
Tx atomic.Uint64
270+
Rx atomic.Uint64
271+
272+
LastActiveTime utils.Atomic[time.Time]
273+
}
274+
275+
func (s *StreamStats) setHookedReqAddr(addr string) {
276+
if addr != s.ReqAddr.Load() {
277+
s.HookedReqAddr.Store(addr)
278+
}
215279
}

core/server/copy.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package server
33
import (
44
"errors"
55
"io"
6+
"time"
67
)
78

89
var errDisconnect = errors.New("traffic logger requested disconnect")
@@ -31,23 +32,27 @@ func copyBufferLog(dst io.Writer, src io.Reader, log func(n uint64) bool) error
3132
}
3233
}
3334

34-
func copyTwoWayWithLogger(id string, serverRw, remoteRw io.ReadWriter, l TrafficLogger) error {
35+
func copyTwoWayEx(id string, serverRw, remoteRw io.ReadWriter, l TrafficLogger, stats *StreamStats) error {
3536
errChan := make(chan error, 2)
3637
go func() {
3738
errChan <- copyBufferLog(serverRw, remoteRw, func(n uint64) bool {
39+
stats.LastActiveTime.Store(time.Now())
40+
stats.Rx.Add(n)
3841
return l.LogTraffic(id, 0, n)
3942
})
4043
}()
4144
go func() {
4245
errChan <- copyBufferLog(remoteRw, serverRw, func(n uint64) bool {
46+
stats.LastActiveTime.Store(time.Now())
47+
stats.Tx.Add(n)
4348
return l.LogTraffic(id, n, 0)
4449
})
4550
}()
4651
// Block until one of the two goroutines returns
4752
return <-errChan
4853
}
4954

50-
// copyTwoWay is the "fast-path" version of copyTwoWayWithLogger that does not log traffic.
55+
// copyTwoWay is the "fast-path" version of copyTwoWayEx that does not log traffic or update stream stats.
5156
// It uses the built-in io.Copy instead of our own copyBufferLog.
5257
func copyTwoWay(serverRw, remoteRw io.ReadWriter) error {
5358
errChan := make(chan error, 2)

core/server/server.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package server
33
import (
44
"context"
55
"crypto/tls"
6+
"math/rand"
67
"net/http"
78
"sync"
9+
"time"
810

911
"github.com/apernet/quic-go"
1012
"github.com/apernet/quic-go/http3"
@@ -100,6 +102,7 @@ type h3sHandler struct {
100102
authenticated bool
101103
authMutex sync.Mutex
102104
authID string
105+
connID uint32 // a random id for dump streams
103106

104107
udpSM *udpSessionManager // Only set after authentication
105108
}
@@ -108,6 +111,7 @@ func newH3sHandler(config *Config, conn quic.Connection) *h3sHandler {
108111
return &h3sHandler{
109112
config: config,
110113
conn: conn,
114+
connID: rand.Uint32(),
111115
}
112116
}
113117

@@ -205,12 +209,29 @@ func (h *h3sHandler) ProxyStreamHijacker(ft http3.FrameType, id quic.ConnectionT
205209
}
206210

207211
func (h *h3sHandler) handleTCPRequest(stream quic.Stream) {
212+
trafficLogger := h.config.TrafficLogger
213+
streamStats := &StreamStats{
214+
AuthID: h.authID,
215+
ConnID: h.connID,
216+
InitialTime: time.Now(),
217+
}
218+
streamStats.State.Store(StreamStateInitial)
219+
streamStats.LastActiveTime.Store(time.Now())
220+
defer func() {
221+
streamStats.State.Store(StreamStateClosed)
222+
}()
223+
if trafficLogger != nil {
224+
trafficLogger.TraceStream(stream, streamStats)
225+
defer trafficLogger.UntraceStream(stream)
226+
}
227+
208228
// Read request
209229
reqAddr, err := protocol.ReadTCPRequest(stream)
210230
if err != nil {
211231
_ = stream.Close()
212232
return
213233
}
234+
streamStats.ReqAddr.Store(reqAddr)
214235
// Call the hook if set
215236
var putback []byte
216237
var hooked bool
@@ -220,19 +241,22 @@ func (h *h3sHandler) handleTCPRequest(stream quic.Stream) {
220241
// so that the client will send whatever request the hook wants to see.
221242
// This is essentially a server-side fast-open.
222243
if hooked {
244+
streamStats.State.Store(StreamStateHooking)
223245
_ = protocol.WriteTCPResponse(stream, true, "RequestHook enabled")
224246
putback, err = h.config.RequestHook.TCP(stream, &reqAddr)
225247
if err != nil {
226248
_ = stream.Close()
227249
return
228250
}
251+
streamStats.setHookedReqAddr(reqAddr)
229252
}
230253
}
231254
// Log the event
232255
if h.config.EventLogger != nil {
233256
h.config.EventLogger.TCPRequest(h.conn.RemoteAddr(), h.authID, reqAddr)
234257
}
235258
// Dial target
259+
streamStats.State.Store(StreamStateConnecting)
236260
tConn, err := h.config.Outbound.TCP(reqAddr)
237261
if err != nil {
238262
if !hooked {
@@ -248,13 +272,15 @@ func (h *h3sHandler) handleTCPRequest(stream quic.Stream) {
248272
if !hooked {
249273
_ = protocol.WriteTCPResponse(stream, true, "Connected")
250274
}
275+
streamStats.State.Store(StreamStateEstablished)
251276
// Put back the data if the hook requested
252277
if len(putback) > 0 {
253-
_, _ = tConn.Write(putback)
278+
n, _ := tConn.Write(putback)
279+
streamStats.Tx.Add(uint64(n))
254280
}
255281
// Start proxying
256-
if h.config.TrafficLogger != nil {
257-
err = copyTwoWayWithLogger(h.authID, stream, tConn, h.config.TrafficLogger)
282+
if trafficLogger != nil {
283+
err = copyTwoWayEx(h.authID, stream, tConn, trafficLogger, streamStats)
258284
} else {
259285
// Use the fast path if no traffic logger is set
260286
err = copyTwoWay(stream, tConn)

0 commit comments

Comments
 (0)