Skip to content

Commit 916f2a4

Browse files
committed
ai/live: Gateway-native WHEP server.
1 parent f2bc929 commit 916f2a4

File tree

7 files changed

+531
-3
lines changed

7 files changed

+531
-3
lines changed

core/livepeernode.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/golang/glog"
22+
"github.com/livepeer/go-livepeer/media"
2223
"github.com/livepeer/go-livepeer/pm"
2324
"github.com/livepeer/go-livepeer/trickle"
2425

@@ -174,6 +175,9 @@ type LivePipeline struct {
174175
Pipeline string
175176
ControlPub *trickle.TricklePublisher
176177
StopControl func()
178+
OutCond *sync.Cond
179+
OutWriter *media.RingBuffer
180+
Closed bool
177181
}
178182

179183
// NewLivepeerNode creates a new Livepeer Node. Eth can be nil.

media/rtp_timestamper.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package media
2+
3+
const (
4+
// MPEG‑TS PTS is 33 bits wide; it wraps at 2^33.
5+
mpegtsMaxPts = 1 << 33
6+
halfPts = mpegtsMaxPts / 2
7+
)
8+
9+
// RTPTimestamper keeps track of wrap‑arounds in a 33‑bit PTS stream.
10+
type RTPTimestamper struct {
11+
lastRaw int64 // last raw PTS seen
12+
wrapCount int64 // how many times we've wrapped
13+
initialized bool // false until first Unwrap call
14+
}
15+
16+
// NewRTPTimestamper creates a fresh unwrapper.
17+
func NewRTPTimestamper() *RTPTimestamper {
18+
return &RTPTimestamper{}
19+
}
20+
21+
// Unwrap returns a monotonic 64‑bit timeline by accounting for wrap‑arounds.
22+
func (u *RTPTimestamper) ToTS(rawPts int64) uint32 {
23+
if u.initialized {
24+
delta := rawPts - u.lastRaw
25+
if delta < -halfPts {
26+
// wrapped forward
27+
u.wrapCount++
28+
} else if delta > halfPts {
29+
// wrapped backward (unlikely in normal streams)
30+
u.wrapCount--
31+
}
32+
} else {
33+
u.initialized = true
34+
}
35+
u.lastRaw = rawPts
36+
return uint32(rawPts + u.wrapCount*mpegtsMaxPts)
37+
}

media/rtp_timestamper_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package media
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
const (
10+
mod32 = 1 << 32 // 4 294 967 296
11+
)
12+
13+
func TestRTPTimestamper(t *testing.T) {
14+
tests := []struct {
15+
name string
16+
inputs []int64
17+
wantRtp []uint32
18+
}{
19+
{
20+
name: "Initialization",
21+
inputs: []int64{0},
22+
wantRtp: []uint32{0},
23+
},
24+
{
25+
name: "Monotonic increase",
26+
inputs: []int64{0, 1, 2, 1000},
27+
wantRtp: []uint32{0, 1, 2, 1000},
28+
},
29+
{
30+
name: "Small backwards jump (no wrap)",
31+
inputs: []int64{1000, 990},
32+
wantRtp: []uint32{1000, 990},
33+
},
34+
{
35+
name: "Forward 33-bit PTS wrap",
36+
inputs: []int64{mpegtsMaxPts - 2, 2},
37+
wantRtp: []uint32{
38+
uint32((mpegtsMaxPts - 2) % mod32),
39+
uint32((2 + mpegtsMaxPts) % mod32),
40+
},
41+
},
42+
{
43+
name: "Backward 33-bit PTS wrap",
44+
inputs: []int64{5, mpegtsMaxPts - 3},
45+
wantRtp: []uint32{
46+
5,
47+
uint32(mod32 - 3),
48+
},
49+
},
50+
{
51+
name: "Crossing 32-bit boundary (wrap in RTP)",
52+
inputs: []int64{halfPts - 1, halfPts, halfPts + 1},
53+
wantRtp: []uint32{uint32(halfPts - 1), 0, 1},
54+
},
55+
{
56+
name: "Random jitter near wrap threshold",
57+
inputs: []int64{mpegtsMaxPts - halfPts/2, 2},
58+
wantRtp: []uint32{
59+
uint32((mpegtsMaxPts - halfPts/2) % mod32),
60+
2,
61+
},
62+
},
63+
{
64+
name: "Large monotonic advance (< halfPts)",
65+
inputs: []int64{100, halfPts - 1},
66+
wantRtp: []uint32{100, uint32(halfPts - 1)},
67+
},
68+
{
69+
name: "Multiple successive wraps",
70+
inputs: []int64{mpegtsMaxPts - 1, 2, mpegtsMaxPts - 1, 2},
71+
wantRtp: []uint32{
72+
uint32((mpegtsMaxPts - 1) % mod32),
73+
uint32((2 + mpegtsMaxPts) % mod32),
74+
uint32((mpegtsMaxPts - 1) % mod32),
75+
uint32((2 + mpegtsMaxPts) % mod32),
76+
},
77+
},
78+
{
79+
name: "Idempotent Unwrap (same raw twice)",
80+
inputs: []int64{500, 500},
81+
wantRtp: []uint32{500, 500},
82+
},
83+
}
84+
85+
for _, tc := range tests {
86+
t.Run(tc.name, func(t *testing.T) {
87+
u := NewRTPTimestamper()
88+
gotRtp := make([]uint32, len(tc.inputs))
89+
for i, raw := range tc.inputs {
90+
ext := u.ToTS(raw)
91+
gotRtp[i] = ext
92+
}
93+
require.Equal(t, tc.wantRtp, gotRtp)
94+
})
95+
}
96+
}

0 commit comments

Comments
 (0)