Skip to content

Commit 5457866

Browse files
committed
estimate absolute timestamp more precisely
When the absolute timestamp of incoming frames was not available, it was replaced with the current timestamp, which is influenced by latency over time. This mechanism is replaced by an algorithm that stores the current timestamp when latency is the lowest and uses it as reference throughout the entire stream.
1 parent 5b09da8 commit 5457866

File tree

31 files changed

+295
-177
lines changed

31 files changed

+295
-177
lines changed

internal/core/path.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ func (pa *path) doReloadConf(newConf *conf.Path) {
377377
}
378378

379379
func (pa *path) doSourceStaticSetReady(req defs.PathSourceStaticSetReadyReq) {
380-
err := pa.setReady(req.Desc, req.GenerateRTPPackets)
380+
err := pa.setReady(req.Desc, req.GenerateRTPPackets, req.FillNTP)
381381
if err != nil {
382382
req.Res <- defs.PathSourceStaticSetReadyRes{Err: err}
383383
return
@@ -474,7 +474,7 @@ func (pa *path) doAddPublisher(req defs.PathAddPublisherReq) {
474474
pa.source = req.Author
475475
pa.publisherQuery = req.AccessRequest.Query
476476

477-
err := pa.setReady(req.Desc, req.GenerateRTPPackets)
477+
err := pa.setReady(req.Desc, req.GenerateRTPPackets, req.FillNTP)
478478
if err != nil {
479479
pa.source = nil
480480
req.Res <- defs.PathAddPublisherRes{Err: err}
@@ -684,12 +684,13 @@ func (pa *path) onDemandPublisherStop(reason string) {
684684
pa.onDemandPublisherState = pathOnDemandStateInitial
685685
}
686686

687-
func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error {
687+
func (pa *path) setReady(desc *description.Session, generateRTPPackets bool, fillNTP bool) error {
688688
pa.stream = &stream.Stream{
689689
WriteQueueSize: pa.writeQueueSize,
690690
RTPMaxPayloadSize: pa.rtpMaxPayloadSize,
691691
Desc: desc,
692-
GenerateRTPPackets: allocateEncoder,
692+
GenerateRTPPackets: generateRTPPackets,
693+
FillNTP: fillNTP,
693694
Parent: pa.source,
694695
}
695696
err := pa.stream.Initialize()

internal/defs/path.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type PathAddPublisherReq struct {
6767
Author Publisher
6868
Desc *description.Session
6969
GenerateRTPPackets bool
70+
FillNTP bool
7071
ConfToCompare *conf.Path
7172
AccessRequest PathAccessRequest
7273
Res chan PathAddPublisherRes
@@ -108,6 +109,7 @@ type PathSourceStaticSetReadyRes struct {
108109
type PathSourceStaticSetReadyReq struct {
109110
Desc *description.Session
110111
GenerateRTPPackets bool
112+
FillNTP bool
111113
Res chan PathSourceStaticSetReadyRes
112114
}
113115

internal/ntpestimator/estimator.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Package ntpestimator contains a NTP estimator.
2+
package ntpestimator
3+
4+
import (
5+
"time"
6+
)
7+
8+
var timeNow = time.Now
9+
10+
func multiplyAndDivide(v, m, d time.Duration) time.Duration {
11+
secs := v / d
12+
dec := v % d
13+
return (secs*m + dec*m/d)
14+
}
15+
16+
// Estimator is a NTP estimator.
17+
type Estimator struct {
18+
ClockRate int
19+
20+
refNTP time.Time
21+
refPTS int64
22+
}
23+
24+
var zero = time.Time{}
25+
26+
// Estimate returns estimated NTP.
27+
func (e *Estimator) Estimate(pts int64) time.Time {
28+
now := timeNow()
29+
30+
if e.refNTP.Equal(zero) {
31+
e.refNTP = now
32+
e.refPTS = pts
33+
return now
34+
}
35+
36+
computed := e.refNTP.Add((multiplyAndDivide(time.Duration(pts-e.refPTS), time.Second, time.Duration(e.ClockRate))))
37+
38+
if computed.After(now) {
39+
e.refNTP = now
40+
e.refPTS = pts
41+
return now
42+
}
43+
44+
return computed
45+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package ntpestimator
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestEstimator(t *testing.T) {
11+
e := &Estimator{ClockRate: 90000}
12+
13+
timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC) }
14+
ntp := e.Estimate(90000)
15+
require.Equal(t, time.Date(2003, 11, 4, 23, 15, 7, 0, time.UTC), ntp)
16+
17+
timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 8, 0, time.UTC) }
18+
ntp = e.Estimate(2 * 90000)
19+
require.Equal(t, time.Date(2003, 11, 4, 23, 15, 8, 0, time.UTC), ntp)
20+
21+
timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 10, 0, time.UTC) }
22+
ntp = e.Estimate(3 * 90000)
23+
require.Equal(t, time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC), ntp)
24+
25+
timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC) }
26+
ntp = e.Estimate(4 * 90000)
27+
require.Equal(t, time.Date(2003, 11, 4, 23, 15, 9, 0, time.UTC), ntp)
28+
29+
timeNow = func() time.Time { return time.Date(2003, 11, 4, 23, 15, 15, 0, time.UTC) }
30+
ntp = e.Estimate(5 * 90000)
31+
require.Equal(t, time.Date(2003, 11, 4, 23, 15, 10, 0, time.UTC), ntp)
32+
}

internal/protocols/hls/to_stream.go

Lines changed: 63 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/bluenviron/gortsplib/v5/pkg/description"
1010
"github.com/bluenviron/gortsplib/v5/pkg/format"
1111
"github.com/bluenviron/mediamtx/internal/logger"
12+
"github.com/bluenviron/mediamtx/internal/ntpestimator"
1213
"github.com/bluenviron/mediamtx/internal/stream"
1314
"github.com/bluenviron/mediamtx/internal/unit"
1415
)
@@ -32,55 +33,54 @@ func multiplyAndDivide(v, m, d int64) int64 {
3233
func ToStream(
3334
c *gohlslib.Client,
3435
tracks []*gohlslib.Track,
35-
stream **stream.Stream,
36+
strm **stream.Stream,
3637
) ([]*description.Media, error) {
3738
var ntpStat ntpState
3839
var ntpStatMutex sync.Mutex
3940

40-
handleNTP := func(track *gohlslib.Track) time.Time {
41-
ntpStatMutex.Lock()
42-
defer ntpStatMutex.Unlock()
43-
44-
switch ntpStat {
45-
case ntpStateInitial:
46-
ntp, avail := c.AbsoluteTime(track)
47-
if !avail {
48-
ntpStat = ntpStateUnavailable
49-
return time.Now()
50-
}
51-
52-
ntpStat = ntpStateAvailable
53-
return ntp
54-
55-
case ntpStateAvailable:
56-
ntp, avail := c.AbsoluteTime(track)
57-
if !avail {
58-
panic("should not happen")
59-
}
60-
61-
return ntp
41+
var medias []*description.Media //nolint:prealloc
6242

63-
case ntpStateUnavailable:
64-
_, avail := c.AbsoluteTime(track)
65-
if avail {
66-
(*stream).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it")
67-
ntpStat = ntpStateDegraded
43+
for _, track := range tracks {
44+
ctrack := track
45+
ntpEstimator := &ntpestimator.Estimator{ClockRate: track.ClockRate}
46+
47+
handleNTP := func(pts int64) time.Time {
48+
ntpStatMutex.Lock()
49+
defer ntpStatMutex.Unlock()
50+
51+
switch ntpStat {
52+
case ntpStateInitial:
53+
ntp, avail := c.AbsoluteTime(ctrack)
54+
if !avail {
55+
ntpStat = ntpStateUnavailable
56+
return ntpEstimator.Estimate(pts)
57+
}
58+
ntpStat = ntpStateAvailable
59+
return ntp
60+
61+
case ntpStateAvailable:
62+
ntp, avail := c.AbsoluteTime(ctrack)
63+
if !avail {
64+
panic("should not happen")
65+
}
66+
return ntp
67+
68+
case ntpStateUnavailable:
69+
_, avail := c.AbsoluteTime(ctrack)
70+
if avail {
71+
(*strm).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it")
72+
ntpStat = ntpStateDegraded
73+
}
74+
return ntpEstimator.Estimate(pts)
75+
76+
default: // ntpStateDegraded
77+
return ntpEstimator.Estimate(pts)
6878
}
69-
70-
return time.Now()
71-
72-
default: // ntpStateDegraded
73-
return time.Now()
7479
}
75-
}
7680

77-
var medias []*description.Media //nolint:prealloc
78-
79-
for _, track := range tracks {
8081
var medi *description.Media
81-
clockRate := track.ClockRate
8282

83-
switch tcodec := track.Codec.(type) {
83+
switch tcodec := ctrack.Codec.(type) {
8484
case *codecs.AV1:
8585
medi = &description.Media{
8686
Type: description.MediaTypeVideo,
@@ -90,10 +90,10 @@ func ToStream(
9090
}
9191
newClockRate := medi.Formats[0].ClockRate()
9292

93-
c.OnDataAV1(track, func(pts int64, tu [][]byte) {
94-
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{
95-
NTP: handleNTP(track),
96-
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)),
93+
c.OnDataAV1(ctrack, func(pts int64, tu [][]byte) {
94+
(*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{
95+
NTP: handleNTP(pts),
96+
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)),
9797
Payload: unit.PayloadAV1(tu),
9898
})
9999
})
@@ -107,10 +107,10 @@ func ToStream(
107107
}
108108
newClockRate := medi.Formats[0].ClockRate()
109109

110-
c.OnDataVP9(track, func(pts int64, frame []byte) {
111-
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{
112-
NTP: handleNTP(track),
113-
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)),
110+
c.OnDataVP9(ctrack, func(pts int64, frame []byte) {
111+
(*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{
112+
NTP: handleNTP(pts),
113+
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)),
114114
Payload: unit.PayloadVP9(frame),
115115
})
116116
})
@@ -127,10 +127,10 @@ func ToStream(
127127
}
128128
newClockRate := medi.Formats[0].ClockRate()
129129

130-
c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) {
131-
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{
132-
NTP: handleNTP(track),
133-
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)),
130+
c.OnDataH26x(ctrack, func(pts int64, _ int64, au [][]byte) {
131+
(*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{
132+
NTP: handleNTP(pts),
133+
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)),
134134
Payload: unit.PayloadH265(au),
135135
})
136136
})
@@ -147,10 +147,10 @@ func ToStream(
147147
}
148148
newClockRate := medi.Formats[0].ClockRate()
149149

150-
c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) {
151-
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{
152-
NTP: handleNTP(track),
153-
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)),
150+
c.OnDataH26x(ctrack, func(pts int64, _ int64, au [][]byte) {
151+
(*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{
152+
NTP: handleNTP(pts),
153+
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)),
154154
Payload: unit.PayloadH264(au),
155155
})
156156
})
@@ -165,10 +165,10 @@ func ToStream(
165165
}
166166
newClockRate := medi.Formats[0].ClockRate()
167167

168-
c.OnDataOpus(track, func(pts int64, packets [][]byte) {
169-
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{
170-
NTP: handleNTP(track),
171-
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)),
168+
c.OnDataOpus(ctrack, func(pts int64, packets [][]byte) {
169+
(*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{
170+
NTP: handleNTP(pts),
171+
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)),
172172
Payload: unit.PayloadOpus(packets),
173173
})
174174
})
@@ -186,10 +186,10 @@ func ToStream(
186186
}
187187
newClockRate := medi.Formats[0].ClockRate()
188188

189-
c.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) {
190-
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Unit{
191-
NTP: handleNTP(track),
192-
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(clockRate)),
189+
c.OnDataMPEG4Audio(ctrack, func(pts int64, aus [][]byte) {
190+
(*strm).WriteUnit(medi, medi.Formats[0], &unit.Unit{
191+
NTP: handleNTP(pts),
192+
PTS: multiplyAndDivide(pts, int64(newClockRate), int64(ctrack.ClockRate)),
193193
Payload: unit.PayloadMPEG4Audio(aus),
194194
})
195195
})

0 commit comments

Comments
 (0)