Skip to content

Commit 5f2edd2

Browse files
committed
ai/live: Gateway-native WHEP server.
Adds a native WHEP server to the gateway for playback. This offers: * Immediate connectivity. WHEP playback can begin as soon as the WHIP response is received, rather than having to poll the WHEP endpoint repeatedly until the output is available. The WHEP response will simply hang until the output is ready. * Faster startup times, especially in no-audio cases. We use the mpegts PMT to know immediately which tracks are available. This avoids having to probe several seconds for an audio track that may never come. * Removes the requirement to have a MediaMTX server in the mix. This can substantially simplify the infrastructure requirements for many deployments. However, MediaMTX would still be needed for RTMP ingest or non-WHEP playback. * The promise of much finer control over tracks and stats reports. /// Detailed Changes * Adds a WHEP server. Supports video (H.264) and audio (Opus). * Adds a mpegts-to-RTP timestamp converter to handle roll-overs between 33-bit mpegts (ugh) and 32-bit RTP timestamps. * Adds the output RingBuffer to the LivePipelines struct so it can be accessible via the WHEP handler. If the output is not yet ready, the WHEP handler waits until it is (via condvar). This allows WHEP sessions to be set up immediately as soon as ingest begins, regardless of whether the output is ready yet. * Updates the output RingBuffer to follow the io.Closer interface. Needed so we close the output buffer instead of writing to it indefinitely after a WHEP peerconnection goes away. * Off by default. The `LIVE_AI_WHEP_ADDR` environment variable needs to be set with at least a port number. This should be different from the WHIP port at `LIVE_AI_WHIP_ADDR`. * MediaMTX playback is still available; nothing changes there. * The swapping behavior does not change for now. If we swap orchestrators, the WHEP peerconnection will close and the client will have to re-connect WHEP (swaps do not affect ingest). /// Rollout Plan * Enable on staging via configuring a port via the `LIVE_AI_WHEP_ADDR` env var. Update `LIVE_AI_WHEP_URL` to point to the gateway instead of MediaMTX. * Leave things on staging for a couple weeks * When we are satisfied with staging, enable on production. Note that we can always enable WHEP on the gateway for manual testing without making it the playback default as long as the `LIVE_AI_WHEP_URL` env var points to MediaMTX.
1 parent d095d96 commit 5f2edd2

File tree

10 files changed

+617
-8
lines changed

10 files changed

+617
-8
lines changed

core/livepeernode.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/golang/glog"
22+
"github.com/livepeer/go-livepeer/media"
2223
"github.com/livepeer/go-livepeer/pm"
2324
"github.com/livepeer/go-livepeer/trickle"
2425

@@ -183,6 +184,9 @@ type LivePipeline struct {
183184
ControlPub *trickle.TricklePublisher
184185
StopControl func()
185186
ReportUpdate func([]byte)
187+
OutCond *sync.Cond
188+
OutWriter *media.RingBuffer
189+
Closed bool
186190
}
187191

188192
// NewLivepeerNode creates a new Livepeer Node. Eth can be nil.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/livepeer/go-livepeer
22

3-
go 1.23.2
3+
go 1.25.0
44

55
require (
66
contrib.go.opencensus.io/exporter/prometheus v0.4.2

media/ring.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"io"
66
"sync"
7+
"sync/atomic"
78
)
89

910
/*
@@ -213,12 +214,24 @@ func (rb *RingBuffer) MakeReader() *RingBufferReader {
213214
}
214215

215216
type RingBufferReader struct {
216-
rb *RingBuffer
217-
nb int64
217+
rb *RingBuffer
218+
nb int64
219+
closed atomic.Bool
218220
}
219221

220222
func (rbr *RingBufferReader) Read(p []byte) (int, error) {
223+
if closed := rbr.closed.Load(); closed {
224+
return 0, io.EOF
225+
}
221226
n, err := rbr.rb.readFrom(p, rbr.nb)
222227
rbr.nb += int64(n)
223228
return n, err
224229
}
230+
231+
func (rbr *RingBufferReader) Close() error {
232+
// NB: If already blocking in Read(),
233+
// EOF occurs on the *next* Read() call
234+
// This does not wake up blocked readers.
235+
rbr.closed.Store(true)
236+
return nil
237+
}

media/ring_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"sync"
99
"testing"
10+
"testing/synctest"
1011
"time"
1112

1213
"github.com/stretchr/testify/assert"
@@ -415,3 +416,60 @@ func TestRingbuffer_WraparoundReads(t *testing.T) {
415416
assert.Nil(err)
416417
assert.Equal("67890", string(buf5))
417418
}
419+
420+
func sync_TestRingbuffer_ReadClose(t *testing.T) {
421+
assert := assert.New(t)
422+
423+
rbc := &RingBufferConfig{BufferLen: 8}
424+
rb, err := NewRingBuffer(rbc)
425+
assert.Nil(err)
426+
427+
reader := rb.MakeReader()
428+
reader2 := rb.MakeReader()
429+
var wg sync.WaitGroup
430+
431+
wg.Go(func() {
432+
buf := make([]byte, 5)
433+
n, err := reader.Read(buf)
434+
assert.Equal(4, n)
435+
assert.Nil(err)
436+
assert.Equal([]byte{1, 2, 3, 4, 0}, buf)
437+
n, err = reader.Read(buf)
438+
assert.Equal(0, n)
439+
assert.Equal(io.EOF, err)
440+
})
441+
442+
wg.Go(func() {
443+
// reader should be drained
444+
buf := make([]byte, 5)
445+
n, err := reader2.Read(buf)
446+
assert.Equal(4, n)
447+
assert.Nil(err)
448+
assert.Equal([]byte{1, 2, 3, 4, 0}, buf)
449+
n, err = reader2.Read(buf)
450+
assert.Equal([]byte{5, 6, 7, 8, 0}, buf)
451+
assert.Equal(4, n)
452+
n, err = reader2.Read(buf)
453+
assert.Equal(0, n)
454+
assert.Equal(io.EOF, err)
455+
})
456+
457+
// give the reader goroutine a moment to block waiting for data
458+
time.Sleep(5 * time.Millisecond)
459+
460+
// close the reader concurrently
461+
err = reader.Close()
462+
assert.Nil(err)
463+
464+
rb.Write([]byte{1, 2, 3, 4})
465+
time.Sleep(1 * time.Millisecond)
466+
rb.Write([]byte{5, 6, 7, 8})
467+
468+
rb.Close()
469+
470+
wgWait(&wg)
471+
}
472+
473+
func TestRingbuffer_ReadClose(t *testing.T) {
474+
synctest.Test(t, sync_TestRingbuffer_ReadClose)
475+
}

media/rtp_timestamper.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package media
2+
3+
const (
4+
// MPEG‑TS PTS is 33 bits wide; it wraps at 2^33.
5+
mpegtsMaxPts = 1 << 33
6+
halfPts = mpegtsMaxPts / 2
7+
)
8+
9+
// RTPTimestamper keeps track of wrap‑arounds in a 33‑bit PTS stream.
10+
type RTPTimestamper struct {
11+
lastRaw int64 // last raw PTS seen
12+
wrapCount int64 // how many times we've wrapped
13+
initialized bool // false until first Unwrap call
14+
}
15+
16+
// NewRTPTimestamper creates a fresh unwrapper.
17+
func NewRTPTimestamper() *RTPTimestamper {
18+
return &RTPTimestamper{}
19+
}
20+
21+
// returns a monotonic 32‑bit timeline by accounting for 33-bit wrap‑arounds.
22+
func (u *RTPTimestamper) ToTS(rawPts int64) uint32 {
23+
if u.initialized {
24+
delta := rawPts - u.lastRaw
25+
if delta < -halfPts {
26+
// wrapped forward
27+
u.wrapCount++
28+
} else if delta > halfPts {
29+
// wrapped backward (unlikely in normal streams)
30+
u.wrapCount--
31+
}
32+
} else {
33+
u.initialized = true
34+
}
35+
u.lastRaw = rawPts
36+
return uint32(rawPts + u.wrapCount*mpegtsMaxPts)
37+
}

media/rtp_timestamper_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package media
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
const (
10+
mod32 = 1 << 32 // 4 294 967 296
11+
)
12+
13+
func TestRTPTimestamper(t *testing.T) {
14+
tests := []struct {
15+
name string
16+
inputs []int64
17+
wantRtp []uint32
18+
}{
19+
{
20+
name: "Initialization",
21+
inputs: []int64{0},
22+
wantRtp: []uint32{0},
23+
},
24+
{
25+
name: "Monotonic increase",
26+
inputs: []int64{0, 1, 2, 1000},
27+
wantRtp: []uint32{0, 1, 2, 1000},
28+
},
29+
{
30+
name: "Small backwards jump (no wrap)",
31+
inputs: []int64{1000, 990},
32+
wantRtp: []uint32{1000, 990},
33+
},
34+
{
35+
name: "Forward 33-bit PTS wrap",
36+
inputs: []int64{mpegtsMaxPts - 2, 2},
37+
wantRtp: []uint32{
38+
uint32((mpegtsMaxPts - 2) % mod32),
39+
uint32((2 + mpegtsMaxPts) % mod32),
40+
},
41+
},
42+
{
43+
name: "Backward 33-bit PTS wrap",
44+
inputs: []int64{5, mpegtsMaxPts - 3},
45+
wantRtp: []uint32{
46+
5,
47+
uint32(mod32 - 3),
48+
},
49+
},
50+
{
51+
name: "Crossing 32-bit boundary (wrap in RTP)",
52+
inputs: []int64{halfPts - 1, halfPts, halfPts + 1},
53+
wantRtp: []uint32{uint32(halfPts - 1), 0, 1},
54+
},
55+
{
56+
name: "Random jitter near wrap threshold",
57+
inputs: []int64{mpegtsMaxPts - halfPts/2, 2},
58+
wantRtp: []uint32{
59+
uint32((mpegtsMaxPts - halfPts/2) % mod32),
60+
2,
61+
},
62+
},
63+
{
64+
name: "Large monotonic advance (< halfPts)",
65+
inputs: []int64{100, halfPts - 1},
66+
wantRtp: []uint32{100, uint32(halfPts - 1)},
67+
},
68+
{
69+
name: "Multiple successive wraps",
70+
inputs: []int64{mpegtsMaxPts - 1, 2, mpegtsMaxPts - 1, 2},
71+
wantRtp: []uint32{
72+
uint32((mpegtsMaxPts - 1) % mod32),
73+
uint32((2 + mpegtsMaxPts) % mod32),
74+
uint32((mpegtsMaxPts - 1) % mod32),
75+
uint32((2 + mpegtsMaxPts) % mod32),
76+
},
77+
},
78+
{
79+
name: "Idempotent Unwrap (same raw twice)",
80+
inputs: []int64{500, 500},
81+
wantRtp: []uint32{500, 500},
82+
},
83+
}
84+
85+
for _, tc := range tests {
86+
t.Run(tc.name, func(t *testing.T) {
87+
u := NewRTPTimestamper()
88+
gotRtp := make([]uint32, len(tc.inputs))
89+
for i, raw := range tc.inputs {
90+
ext := u.ToTS(raw)
91+
gotRtp[i] = ext
92+
}
93+
require.Equal(t, tc.wantRtp, gotRtp)
94+
})
95+
}
96+
}

0 commit comments

Comments
 (0)