Skip to content

Commit 24d680f

Browse files
committed
recorder: reset in case of time driftings (#4778)
1 parent 9c59304 commit 24d680f

File tree

11 files changed

+213
-147
lines changed

11 files changed

+213
-147
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/bluenviron/gohlslib/v2 v2.2.4
1414
github.com/bluenviron/gortmplib v0.1.1
1515
github.com/bluenviron/gortsplib/v5 v5.2.0
16-
github.com/bluenviron/mediacommon/v2 v2.5.1
16+
github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb
1717
github.com/datarhei/gosrt v0.9.0
1818
github.com/fsnotify/fsnotify v1.9.0
1919
github.com/gin-contrib/pprof v1.5.3

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ github.com/bluenviron/gortsplib/v5 v5.2.0 h1:yk0H9Z1Z+H41/x5hDt84rKm6+MNA483NsRX
4141
github.com/bluenviron/gortsplib/v5 v5.2.0/go.mod h1:UYCbHEb0T49kBDgIlTJaZOchD2f5g1JigFmmxQfW7vY=
4242
github.com/bluenviron/mediacommon/v2 v2.5.1 h1:qB2fb5c0xyl5OB2gfSfulpEJn7Cdm3vI2n8wjiLMxKI=
4343
github.com/bluenviron/mediacommon/v2 v2.5.1/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE=
44+
github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb h1:42lRaSsrPXvwB9kLGIujU9yONrSPPp0j4Ohwg6zp/yw=
45+
github.com/bluenviron/mediacommon/v2 v2.5.2-0.20251201152746-8d059e8616fb/go.mod h1:5V15TiOfeaNVmZPVuOqAwqQSWyvMV86/dijDKu5q9Zs=
4446
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
4547
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
4648
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=

internal/recorder/format_fmp4.go

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ func jpegExtractSize(image []byte) (int, int, error) {
9898
}
9999
}
100100

101+
type formatFMP4Sample struct {
102+
*fmp4.Sample
103+
dts int64
104+
ntp time.Time
105+
}
106+
101107
type formatFMP4 struct {
102108
ri *recorderInstance
103109

@@ -111,18 +117,15 @@ func (f *formatFMP4) initialize() bool {
111117
nextID := 1
112118

113119
addTrack := func(format rtspformat.Format, codec mp4.Codec) *formatFMP4Track {
114-
initTrack := &fmp4.InitTrack{
115-
TimeScale: uint32(format.ClockRate()),
116-
Codec: codec,
117-
}
118-
initTrack.ID = nextID
119-
nextID++
120-
121120
track := &formatFMP4Track{
122121
f: f,
123-
initTrack: initTrack,
122+
id: nextID,
123+
clockRate: uint32(format.ClockRate()),
124+
codec: codec,
124125
}
126+
track.initialize()
125127

128+
nextID++
126129
f.tracks = append(f.tracks, track)
127130
return track
128131
}
@@ -180,7 +183,7 @@ func (f *formatFMP4) initialize() bool {
180183
return err
181184
}
182185

183-
return track.write(&sample{
186+
return track.write(&formatFMP4Sample{
184187
Sample: &sampl,
185188
dts: u.PTS,
186189
ntp: u.NTP,
@@ -257,7 +260,7 @@ func (f *formatFMP4) initialize() bool {
257260
firstReceived = true
258261
}
259262

260-
return track.write(&sample{
263+
return track.write(&formatFMP4Sample{
261264
Sample: &fmp4.Sample{
262265
IsNonSyncSample: !randomAccess,
263266
Payload: u.Payload.(unit.PayloadVP9),
@@ -348,7 +351,7 @@ func (f *formatFMP4) initialize() bool {
348351
return err
349352
}
350353

351-
return track.write(&sample{
354+
return track.write(&formatFMP4Sample{
352355
Sample: &sampl,
353356
dts: dts,
354357
ntp: u.NTP,
@@ -424,7 +427,7 @@ func (f *formatFMP4) initialize() bool {
424427
return err
425428
}
426429

427-
return track.write(&sample{
430+
return track.write(&formatFMP4Sample{
428431
Sample: &sampl,
429432
dts: dts,
430433
ntp: u.NTP,
@@ -481,7 +484,7 @@ func (f *formatFMP4) initialize() bool {
481484
}
482485
lastPTS = u.PTS
483486

484-
return track.write(&sample{
487+
return track.write(&formatFMP4Sample{
485488
Sample: &fmp4.Sample{
486489
Payload: u.Payload.(unit.PayloadMPEG4Video),
487490
IsNonSyncSample: !randomAccess,
@@ -532,7 +535,7 @@ func (f *formatFMP4) initialize() bool {
532535
}
533536
lastPTS = u.PTS
534537

535-
return track.write(&sample{
538+
return track.write(&formatFMP4Sample{
536539
Sample: &fmp4.Sample{
537540
Payload: u.Payload.(unit.PayloadMPEG1Video),
538541
IsNonSyncSample: !randomAccess,
@@ -570,7 +573,7 @@ func (f *formatFMP4) initialize() bool {
570573
f.updateCodecParams()
571574
}
572575

573-
return track.write(&sample{
576+
return track.write(&formatFMP4Sample{
574577
Sample: &fmp4.Sample{
575578
Payload: u.Payload.(unit.PayloadMJPEG),
576579
},
@@ -596,7 +599,7 @@ func (f *formatFMP4) initialize() bool {
596599
pts := u.PTS
597600

598601
for _, packet := range u.Payload.(unit.PayloadOpus) {
599-
err := track.write(&sample{
602+
err := track.write(&formatFMP4Sample{
600603
Sample: &fmp4.Sample{
601604
Payload: packet,
602605
},
@@ -630,7 +633,7 @@ func (f *formatFMP4) initialize() bool {
630633
for i, au := range u.Payload.(unit.PayloadMPEG4Audio) {
631634
pts := u.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit
632635

633-
err := track.write(&sample{
636+
err := track.write(&formatFMP4Sample{
634637
Sample: &fmp4.Sample{
635638
Payload: au,
636639
},
@@ -667,7 +670,7 @@ func (f *formatFMP4) initialize() bool {
667670
return err
668671
}
669672

670-
return track.write(&sample{
673+
return track.write(&formatFMP4Sample{
671674
Sample: &fmp4.Sample{
672675
Payload: ame.Payloads[0][0][0],
673676
},
@@ -710,7 +713,7 @@ func (f *formatFMP4) initialize() bool {
710713
f.updateCodecParams()
711714
}
712715

713-
err = track.write(&sample{
716+
err = track.write(&formatFMP4Sample{
714717
Sample: &fmp4.Sample{
715718
Payload: frame,
716719
},
@@ -779,7 +782,7 @@ func (f *formatFMP4) initialize() bool {
779782

780783
pts := u.PTS + int64(i)*ac3.SamplesPerFrame
781784

782-
err = track.write(&sample{
785+
err = track.write(&formatFMP4Sample{
783786
Sample: &fmp4.Sample{
784787
Payload: frame,
785788
},
@@ -825,7 +828,7 @@ func (f *formatFMP4) initialize() bool {
825828
lpcm = al
826829
}
827830

828-
return track.write(&sample{
831+
return track.write(&formatFMP4Sample{
829832
Sample: &fmp4.Sample{
830833
Payload: lpcm,
831834
},
@@ -851,7 +854,7 @@ func (f *formatFMP4) initialize() bool {
851854
return nil
852855
}
853856

854-
return track.write(&sample{
857+
return track.write(&formatFMP4Sample{
855858
Sample: &fmp4.Sample{
856859
Payload: u.Payload.(unit.PayloadLPCM),
857860
},

internal/recorder/format_fmp4_part.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (p *formatFMP4Part) close(w io.Writer) error {
5656
return writePart(w, p.number, p.partTracks)
5757
}
5858

59-
func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.Duration) error {
59+
func (p *formatFMP4Part) write(track *formatFMP4Track, sample *formatFMP4Sample, dts time.Duration) error {
6060
size := uint64(len(sample.Payload))
6161
if (p.size + size) > uint64(p.maxPartSize) {
6262
return fmt.Errorf("reached maximum part size")

internal/recorder/format_fmp4_segment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (s *formatFMP4Segment) closeCurPart() error {
201201
return s.curPart.close(s.fi)
202202
}
203203

204-
func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts time.Duration) error {
204+
func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *formatFMP4Sample, dts time.Duration) error {
205205
endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale))
206206
if endDTS > s.endDTS {
207207
s.endDTS = endDTS

internal/recorder/format_fmp4_track.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package recorder
22

33
import (
4+
"fmt"
45
"time"
56

67
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
8+
"github.com/bluenviron/mediacommon/v2/pkg/formats/mp4"
79
"github.com/bluenviron/mediamtx/internal/logger"
810
)
911

@@ -43,12 +45,26 @@ func nextSegmentStartingPos(tracks []*formatFMP4Track) (time.Time, time.Duration
4345

4446
type formatFMP4Track struct {
4547
f *formatFMP4
46-
initTrack *fmp4.InitTrack
48+
id int
49+
clockRate uint32
50+
codec mp4.Codec
51+
52+
initTrack *fmp4.InitTrack
53+
nextSample *formatFMP4Sample
54+
startInitialized bool
55+
startDTS time.Duration
56+
startNTP time.Time
57+
}
4758

48-
nextSample *sample
59+
func (t *formatFMP4Track) initialize() {
60+
t.initTrack = &fmp4.InitTrack{
61+
ID: t.id,
62+
TimeScale: t.clockRate,
63+
Codec: t.codec,
64+
}
4965
}
5066

51-
func (t *formatFMP4Track) write(sample *sample) error {
67+
func (t *formatFMP4Track) write(sample *formatFMP4Sample) error {
5268
// wait the first video sample before setting hasVideo
5369
if t.initTrack.Codec.IsVideo() {
5470
t.f.hasVideo = true
@@ -69,6 +85,17 @@ func (t *formatFMP4Track) write(sample *sample) error {
6985

7086
dts := timestampToDuration(sample.dts, int(t.initTrack.TimeScale))
7187

88+
if !t.startInitialized {
89+
t.startDTS = dts
90+
t.startNTP = sample.ntp
91+
t.startInitialized = true
92+
} else {
93+
drift := sample.ntp.Sub(t.startNTP) - (dts - t.startDTS)
94+
if drift < -ntpDriftTolerance || drift > ntpDriftTolerance {
95+
return fmt.Errorf("detected drift between recording duration and absolute time, resetting")
96+
}
97+
}
98+
7299
if t.f.currentSegment == nil {
73100
t.f.currentSegment = &formatFMP4Segment{
74101
f: t.f,
@@ -78,9 +105,11 @@ func (t *formatFMP4Track) write(sample *sample) error {
78105
}
79106
t.f.currentSegment.initialize()
80107
t.f.nextSegmentNumber++
81-
} else if (dts - t.f.currentSegment.startDTS) < 0 { // BaseTime is negative, this is not supported by fMP4
82-
t.f.ri.Log(logger.Warn, "sample of track %d received too late, discarding", t.initTrack.ID)
83-
return nil
108+
} else {
109+
if (dts - t.f.currentSegment.startDTS) < 0 { // BaseTime is negative, this is not supported by fMP4
110+
t.f.ri.Log(logger.Warn, "sample of track %d received too late, discarding", t.initTrack.ID)
111+
return nil
112+
}
84113
}
85114

86115
err := t.f.currentSegment.write(t, sample, dts)

0 commit comments

Comments
 (0)