Skip to content

Commit 02e2b9d

Browse files
authored
recorder: write additional infos inside segments (#5083)
write stream ID, segment number, DTS, NTP in a dedicated box. This allows to improve the merge algorithm in the playback server.
1 parent ccaccc5 commit 02e2b9d

File tree

11 files changed

+166
-80
lines changed

11 files changed

+166
-80
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/bluenviron/gohlslib/v2 v2.2.3
1313
github.com/bluenviron/gortmplib v0.1.0
1414
github.com/bluenviron/gortsplib/v5 v5.0.1
15-
github.com/bluenviron/mediacommon/v2 v2.4.3
15+
github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199
1616
github.com/datarhei/gosrt v0.9.0
1717
github.com/fsnotify/fsnotify v1.9.0
1818
github.com/gin-contrib/pprof v1.5.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ github.com/bluenviron/gortmplib v0.1.0 h1:VtcmOrSsNNmjF5/hFVk/bvNFdairX2+ejhRIZ9
3737
github.com/bluenviron/gortmplib v0.1.0/go.mod h1:FtII41guzpc9wMhdmZFsIKuC2hXN3yCMRsFlHYp1qQA=
3838
github.com/bluenviron/gortsplib/v5 v5.0.1 h1:mM3zK1T0WojnAwQTQ/IK3mDnOpy8Yvm11QK7UmB8grk=
3939
github.com/bluenviron/gortsplib/v5 v5.0.1/go.mod h1:jc0WG8LUa6Kb8lkDbLCJzAFdc/Ek7vM2rlO4VCHhbEQ=
40-
github.com/bluenviron/mediacommon/v2 v2.4.3 h1:GFXKaMFgnQqbKv+uaAEfmgBZXBBRTtwabVepDowVDtM=
41-
github.com/bluenviron/mediacommon/v2 v2.4.3/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE=
40+
github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199 h1:Ib+Azwbjy02RKurl+QHeKn+ZkLBdOzOFrxkUs425KHU=
41+
github.com/bluenviron/mediacommon/v2 v2.4.4-0.20251012155238-8c0b4c88a199/go.mod h1:zy1fODPuS/kBd93ftgJS1Jhvjq7LFWfAo32KP7By9AE=
4242
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
4343
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
4444
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=

internal/recorder/format_fmp4.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,10 @@ func jpegExtractSize(image []byte) (int, int, error) {
101101
type formatFMP4 struct {
102102
ri *recorderInstance
103103

104-
tracks []*formatFMP4Track
105-
hasVideo bool
106-
currentSegment *formatFMP4Segment
104+
tracks []*formatFMP4Track
105+
hasVideo bool
106+
currentSegment *formatFMP4Segment
107+
nextSegmentNumber uint64
107108
}
108109

109110
func (f *formatFMP4) initialize() bool {

internal/recorder/format_fmp4_part.go

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,11 @@ package recorder
33
import (
44
"fmt"
55
"io"
6-
"os"
7-
"path/filepath"
86
"time"
97

108
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
119
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer"
12-
13-
"github.com/bluenviron/mediamtx/internal/logger"
14-
"github.com/bluenviron/mediamtx/internal/recordstore"
10+
"github.com/bluenviron/mediamtx/internal/conf"
1511
)
1612

1713
func writePart(
@@ -42,9 +38,10 @@ func writePart(
4238
}
4339

4440
type formatFMP4Part struct {
45-
s *formatFMP4Segment
46-
sequenceNumber uint32
47-
startDTS time.Duration
41+
maxPartSize conf.StringSize
42+
segmentStartDTS time.Duration
43+
number uint32
44+
startDTS time.Duration
4845

4946
partTracks map[*formatFMP4Track]*fmp4.PartTrack
5047
size uint64
@@ -55,38 +52,13 @@ func (p *formatFMP4Part) initialize() {
5552
p.partTracks = make(map[*formatFMP4Track]*fmp4.PartTrack)
5653
}
5754

58-
func (p *formatFMP4Part) close() error {
59-
if p.s.fi == nil {
60-
p.s.path = recordstore.Path{Start: p.s.startNTP}.Encode(p.s.f.ri.pathFormat2)
61-
p.s.f.ri.Log(logger.Debug, "creating segment %s", p.s.path)
62-
63-
err := os.MkdirAll(filepath.Dir(p.s.path), 0o755)
64-
if err != nil {
65-
return err
66-
}
67-
68-
fi, err := os.Create(p.s.path)
69-
if err != nil {
70-
return err
71-
}
72-
73-
p.s.f.ri.onSegmentCreate(p.s.path)
74-
75-
err = writeInit(fi, p.s.f.tracks)
76-
if err != nil {
77-
fi.Close()
78-
return err
79-
}
80-
81-
p.s.fi = fi
82-
}
83-
84-
return writePart(p.s.fi, p.sequenceNumber, p.partTracks)
55+
func (p *formatFMP4Part) close(w io.Writer) error {
56+
return writePart(w, p.number, p.partTracks)
8557
}
8658

8759
func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.Duration) error {
8860
size := uint64(len(sample.Payload))
89-
if (p.size + size) > uint64(p.s.f.ri.maxPartSize) {
61+
if (p.size + size) > uint64(p.maxPartSize) {
9062
return fmt.Errorf("reached maximum part size")
9163
}
9264
p.size += size
@@ -95,7 +67,7 @@ func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dts time.
9567
if !ok {
9668
partTrack = &fmp4.PartTrack{
9769
ID: track.initTrack.ID,
98-
BaseTime: uint64(multiplyAndDivide(int64(dts-p.s.startDTS),
70+
BaseTime: uint64(multiplyAndDivide(int64(dts-p.segmentStartDTS),
9971
int64(track.initTrack.TimeScale), int64(time.Second))),
10072
}
10173
p.partTracks[track] = partTrack

internal/recorder/format_fmp4_segment.go

Lines changed: 80 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,44 @@ import (
55
"fmt"
66
"io"
77
"os"
8+
"path/filepath"
89
"time"
910

10-
"github.com/abema/go-mp4"
11+
amp4 "github.com/abema/go-mp4"
12+
"github.com/google/uuid"
13+
1114
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4"
1215
"github.com/bluenviron/mediacommon/v2/pkg/formats/fmp4/seekablebuffer"
13-
1416
"github.com/bluenviron/mediamtx/internal/logger"
17+
"github.com/bluenviron/mediamtx/internal/recordstore"
1518
)
1619

17-
func writeInit(f io.Writer, tracks []*formatFMP4Track) error {
20+
func writeInit(
21+
f io.Writer,
22+
streamID uuid.UUID,
23+
segmentNumber uint64,
24+
dts time.Duration,
25+
ntp time.Time,
26+
tracks []*formatFMP4Track,
27+
) error {
1828
fmp4Tracks := make([]*fmp4.InitTrack, len(tracks))
1929
for i, track := range tracks {
2030
fmp4Tracks[i] = track.initTrack
2131
}
2232

2333
init := fmp4.Init{
2434
Tracks: fmp4Tracks,
35+
UserData: []amp4.IBox{
36+
&recordstore.Mtxi{
37+
FullBox: amp4.FullBox{
38+
Version: 0,
39+
},
40+
StreamID: streamID,
41+
SegmentNumber: segmentNumber,
42+
DTS: int64(dts),
43+
NTP: ntp.UnixNano(),
44+
},
45+
},
2546
}
2647

2748
var buf seekablebuffer.Buffer
@@ -77,8 +98,8 @@ func writeDuration(f io.ReadWriteSeeker, d time.Duration) error {
7798
return err
7899
}
79100

80-
var mvhd mp4.Mvhd
81-
_, err = mp4.Unmarshal(f, uint64(moovSize-8), &mvhd, mp4.Context{})
101+
var mvhd amp4.Mvhd
102+
_, err = amp4.Unmarshal(f, uint64(moovSize-8), &mvhd, amp4.Context{})
82103
if err != nil {
83104
return err
84105
}
@@ -90,7 +111,7 @@ func writeDuration(f io.ReadWriteSeeker, d time.Duration) error {
90111
return err
91112
}
92113

93-
_, err = mp4.Marshal(f, &mvhd, mp4.Context{})
114+
_, err = amp4.Marshal(f, &mvhd, amp4.Context{})
94115
if err != nil {
95116
return err
96117
}
@@ -102,12 +123,13 @@ type formatFMP4Segment struct {
102123
f *formatFMP4
103124
startDTS time.Duration
104125
startNTP time.Time
126+
number uint64
105127

106-
path string
107-
fi *os.File
108-
curPart *formatFMP4Part
109-
endDTS time.Duration
110-
nextSequenceNumber uint32
128+
path string
129+
fi *os.File
130+
curPart *formatFMP4Part
131+
endDTS time.Duration
132+
nextPartNumber uint32
111133
}
112134

113135
func (s *formatFMP4Segment) initialize() {
@@ -118,7 +140,7 @@ func (s *formatFMP4Segment) close() error {
118140
var err error
119141

120142
if s.curPart != nil {
121-
err = s.curPart.close()
143+
err = s.closeCurPart()
122144
}
123145

124146
if s.fi != nil {
@@ -144,6 +166,41 @@ func (s *formatFMP4Segment) close() error {
144166
return err
145167
}
146168

169+
func (s *formatFMP4Segment) closeCurPart() error {
170+
if s.fi == nil {
171+
s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ri.pathFormat2)
172+
s.f.ri.Log(logger.Debug, "creating segment %s", s.path)
173+
174+
err := os.MkdirAll(filepath.Dir(s.path), 0o755)
175+
if err != nil {
176+
return err
177+
}
178+
179+
fi, err := os.Create(s.path)
180+
if err != nil {
181+
return err
182+
}
183+
184+
s.f.ri.onSegmentCreate(s.path)
185+
186+
err = writeInit(
187+
fi,
188+
s.f.ri.streamID,
189+
s.number,
190+
s.startDTS,
191+
s.startNTP,
192+
s.f.tracks)
193+
if err != nil {
194+
fi.Close()
195+
return err
196+
}
197+
198+
s.fi = fi
199+
}
200+
201+
return s.curPart.close(s.fi)
202+
}
203+
147204
func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts time.Duration) error {
148205
endDTS := dts + timestampToDuration(int64(sample.Duration), int(track.initTrack.TimeScale))
149206
if endDTS > s.endDTS {
@@ -152,27 +209,29 @@ func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dts ti
152209

153210
if s.curPart == nil {
154211
s.curPart = &formatFMP4Part{
155-
s: s,
156-
sequenceNumber: s.nextSequenceNumber,
157-
startDTS: dts,
212+
maxPartSize: s.f.ri.maxPartSize,
213+
segmentStartDTS: s.startDTS,
214+
number: s.nextPartNumber,
215+
startDTS: dts,
158216
}
159217
s.curPart.initialize()
160-
s.nextSequenceNumber++
218+
s.nextPartNumber++
161219
} else if s.curPart.duration() >= s.f.ri.partDuration {
162-
err := s.curPart.close()
220+
err := s.closeCurPart()
163221
s.curPart = nil
164222

165223
if err != nil {
166224
return err
167225
}
168226

169227
s.curPart = &formatFMP4Part{
170-
s: s,
171-
sequenceNumber: s.nextSequenceNumber,
172-
startDTS: dts,
228+
maxPartSize: s.f.ri.maxPartSize,
229+
segmentStartDTS: s.startDTS,
230+
number: s.nextPartNumber,
231+
startDTS: dts,
173232
}
174233
s.curPart.initialize()
175-
s.nextSequenceNumber++
234+
s.nextPartNumber++
176235
}
177236

178237
return s.curPart.write(track, sample, dts)

internal/recorder/format_fmp4_track.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ func (t *formatFMP4Track) write(sample *sample) error {
6767
f: t.f,
6868
startDTS: dts,
6969
startNTP: sample.ntp,
70+
number: t.f.nextSegmentNumber,
7071
}
7172
t.f.currentSegment.initialize()
73+
t.f.nextSegmentNumber++
7274
} else if (dts - t.f.currentSegment.startDTS) < 0 { // BaseTime is negative, this is not supported by fMP4
7375
t.f.ri.Log(logger.Warn, "sample of track %d received too late, discarding", t.initTrack.ID)
7476
return nil
@@ -95,8 +97,10 @@ func (t *formatFMP4Track) write(sample *sample) error {
9597
f: t.f,
9698
startDTS: oldestDTS,
9799
startNTP: oldestNTP,
100+
number: t.f.nextSegmentNumber,
98101
}
99102
t.f.currentSegment.initialize()
103+
t.f.nextSegmentNumber++
100104
}
101105

102106
return nil

internal/recorder/format_mpegts.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -468,11 +468,16 @@ func (f *formatMPEGTS) write(
468468
switch {
469469
case f.currentSegment == nil:
470470
f.currentSegment = &formatMPEGTSSegment{
471-
f: f,
472-
startDTS: dts,
473-
startNTP: ntp,
471+
pathFormat2: f.ri.pathFormat2,
472+
flush: f.bw.Flush,
473+
onSegmentCreate: f.ri.onSegmentCreate,
474+
onSegmentComplete: f.ri.onSegmentComplete,
475+
startDTS: dts,
476+
startNTP: ntp,
477+
log: f.ri,
474478
}
475479
f.currentSegment.initialize()
480+
f.dw.setTarget(f.currentSegment)
476481
case (!f.hasVideo || isVideo) &&
477482
randomAccess &&
478483
(dts-f.currentSegment.startDTS) >= f.ri.segmentDuration:
@@ -483,11 +488,16 @@ func (f *formatMPEGTS) write(
483488
}
484489

485490
f.currentSegment = &formatMPEGTSSegment{
486-
f: f,
487-
startDTS: dts,
488-
startNTP: ntp,
491+
pathFormat2: f.ri.pathFormat2,
492+
flush: f.bw.Flush,
493+
onSegmentCreate: f.ri.onSegmentCreate,
494+
onSegmentComplete: f.ri.onSegmentComplete,
495+
startDTS: dts,
496+
startNTP: ntp,
497+
log: f.ri,
489498
}
490499
f.currentSegment.initialize()
500+
f.dw.setTarget(f.currentSegment)
491501

492502
case (dts - f.currentSegment.lastFlush) >= f.ri.partDuration:
493503
err := f.bw.Flush()

0 commit comments

Comments
 (0)