Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/livepeer/go-livepeer/media"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/trickle"

Expand Down Expand Up @@ -183,6 +184,9 @@ type LivePipeline struct {
ControlPub *trickle.TricklePublisher
StopControl func()
ReportUpdate func([]byte)
OutCond *sync.Cond
OutWriter *media.RingBuffer
Closed bool
}

// NewLivepeerNode creates a new Livepeer Node. Eth can be nil.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/livepeer/go-livepeer

go 1.23.2
go 1.25.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess this will be already in main


require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
Expand Down
17 changes: 15 additions & 2 deletions media/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"io"
"sync"
"sync/atomic"
)

/*
Expand Down Expand Up @@ -213,12 +214,24 @@ func (rb *RingBuffer) MakeReader() *RingBufferReader {
}

type RingBufferReader struct {
rb *RingBuffer
nb int64
rb *RingBuffer
nb int64
closed atomic.Bool
}

func (rbr *RingBufferReader) Read(p []byte) (int, error) {
if closed := rbr.closed.Load(); closed {
return 0, io.EOF
}
n, err := rbr.rb.readFrom(p, rbr.nb)
rbr.nb += int64(n)
return n, err
}

func (rbr *RingBufferReader) Close() error {
// NB: If already blocking in Read(),
// EOF occurs on the *next* Read() call
// This does not wake up blocked readers.
rbr.closed.Store(true)
return nil
}
58 changes: 58 additions & 0 deletions media/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -415,3 +416,60 @@ func TestRingbuffer_WraparoundReads(t *testing.T) {
assert.Nil(err)
assert.Equal("67890", string(buf5))
}

func sync_TestRingbuffer_ReadClose(t *testing.T) {
assert := assert.New(t)

rbc := &RingBufferConfig{BufferLen: 8}
rb, err := NewRingBuffer(rbc)
assert.Nil(err)

reader := rb.MakeReader()
reader2 := rb.MakeReader()
var wg sync.WaitGroup

wg.Go(func() {
buf := make([]byte, 5)
n, err := reader.Read(buf)
assert.Equal(4, n)
assert.Nil(err)
assert.Equal([]byte{1, 2, 3, 4, 0}, buf)
n, err = reader.Read(buf)
assert.Equal(0, n)
assert.Equal(io.EOF, err)
})

wg.Go(func() {
// reader should be drained
buf := make([]byte, 5)
n, err := reader2.Read(buf)
assert.Equal(4, n)
assert.Nil(err)
assert.Equal([]byte{1, 2, 3, 4, 0}, buf)
n, err = reader2.Read(buf)
assert.Equal([]byte{5, 6, 7, 8, 0}, buf)
assert.Equal(4, n)
n, err = reader2.Read(buf)
assert.Equal(0, n)
assert.Equal(io.EOF, err)
})

// give the reader goroutine a moment to block waiting for data
time.Sleep(5 * time.Millisecond)

// close the reader concurrently
err = reader.Close()
assert.Nil(err)

rb.Write([]byte{1, 2, 3, 4})
time.Sleep(1 * time.Millisecond)
rb.Write([]byte{5, 6, 7, 8})

rb.Close()

wgWait(&wg)
}

func TestRingbuffer_ReadClose(t *testing.T) {
synctest.Test(t, sync_TestRingbuffer_ReadClose)
}
37 changes: 37 additions & 0 deletions media/rtp_timestamper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package media

const (
// MPEG‑TS PTS is 33 bits wide; it wraps at 2^33.
mpegtsMaxPts = 1 << 33
halfPts = mpegtsMaxPts / 2
)

// RTPTimestamper keeps track of wrap‑arounds in a 33‑bit PTS stream.
type RTPTimestamper struct {
lastRaw int64 // last raw PTS seen
wrapCount int64 // how many times we've wrapped
initialized bool // false until first Unwrap call
}

// NewRTPTimestamper creates a fresh unwrapper.
func NewRTPTimestamper() *RTPTimestamper {
return &RTPTimestamper{}
}

// returns a monotonic 32‑bit timeline by accounting for 33-bit wrap‑arounds.
func (u *RTPTimestamper) ToTS(rawPts int64) uint32 {
if u.initialized {
delta := rawPts - u.lastRaw
if delta < -halfPts {
// wrapped forward
u.wrapCount++
} else if delta > halfPts {
// wrapped backward (unlikely in normal streams)
u.wrapCount--
}
} else {
u.initialized = true
}
u.lastRaw = rawPts
return uint32(rawPts + u.wrapCount*mpegtsMaxPts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this overflow as soon as u.wrapCount > 0? Am I missing sth or maybe you need a % with sth?

https://go.dev/play/p/dVuB5eJTbbQ

./prog.go:8:21: constant 8589963326 overflows uint32

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would overflow a uint32 by wrapping back to zero. The main problem is there would be two discontinuous overflows: first when it overflows 32 bits during the cast to RTP timestamp, then again when the mpegts timestamp overflows 33 bits. So this bit of plumbing is to make those timestamps consistent.

The unit tests aren't the best at describing this intended behavior right now, so I'll add some comments here and rework the tests a bit so things are clearer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I assume we want it to overflow and "mod" to a value that fits into a uint32, instead of a panic. How come this doesn't panic like the playground snippet I shared?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, probably some compile-time constant propagation + bounds checking. With real variables this doesn't happen, maybe because all the intermediate types are int64 until the last minute. The uint32 conversion does mod the value during overflows.

Here is the conversion code with a simplified set of test cases that shows the overflow behavior: https://go.dev/play/p/_6NNdC7LbSN

BTW had GPT math this out and seems like the explicit wraparound tracking isn't actually necessary since 2^33 - 2^32 = 2^32 ... so maybe we can just do a straight cast from the int64 timestamp to uint32 unless we want to also track the actual timestamps for some reason, idk

}
96 changes: 96 additions & 0 deletions media/rtp_timestamper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package media

import (
"testing"

"github.com/stretchr/testify/require"
)

const (
mod32 = 1 << 32 // 4 294 967 296
)

func TestRTPTimestamper(t *testing.T) {
tests := []struct {
name string
inputs []int64
wantRtp []uint32
}{
{
name: "Initialization",
inputs: []int64{0},
wantRtp: []uint32{0},
},
{
name: "Monotonic increase",
inputs: []int64{0, 1, 2, 1000},
wantRtp: []uint32{0, 1, 2, 1000},
},
{
name: "Small backwards jump (no wrap)",
inputs: []int64{1000, 990},
wantRtp: []uint32{1000, 990},
},
{
name: "Forward 33-bit PTS wrap",
inputs: []int64{mpegtsMaxPts - 2, 2},
wantRtp: []uint32{
uint32((mpegtsMaxPts - 2) % mod32),
uint32((2 + mpegtsMaxPts) % mod32),
},
},
{
name: "Backward 33-bit PTS wrap",
inputs: []int64{5, mpegtsMaxPts - 3},
wantRtp: []uint32{
5,
uint32(mod32 - 3),
},
},
{
name: "Crossing 32-bit boundary (wrap in RTP)",
inputs: []int64{halfPts - 1, halfPts, halfPts + 1},
wantRtp: []uint32{uint32(halfPts - 1), 0, 1},
},
{
name: "Random jitter near wrap threshold",
inputs: []int64{mpegtsMaxPts - halfPts/2, 2},
wantRtp: []uint32{
uint32((mpegtsMaxPts - halfPts/2) % mod32),
2,
},
},
{
name: "Large monotonic advance (< halfPts)",
inputs: []int64{100, halfPts - 1},
wantRtp: []uint32{100, uint32(halfPts - 1)},
},
{
name: "Multiple successive wraps",
inputs: []int64{mpegtsMaxPts - 1, 2, mpegtsMaxPts - 1, 2},
wantRtp: []uint32{
uint32((mpegtsMaxPts - 1) % mod32),
uint32((2 + mpegtsMaxPts) % mod32),
uint32((mpegtsMaxPts - 1) % mod32),
uint32((2 + mpegtsMaxPts) % mod32),
},
},
{
name: "Idempotent Unwrap (same raw twice)",
inputs: []int64{500, 500},
wantRtp: []uint32{500, 500},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
u := NewRTPTimestamper()
gotRtp := make([]uint32, len(tc.inputs))
for i, raw := range tc.inputs {
ext := u.ToTS(raw)
gotRtp[i] = ext
}
require.Equal(t, tc.wantRtp, gotRtp)
})
}
}
Loading
Loading