Skip to content

Commit 500d18b

Browse files
authored
rtmp: fix timeout when publishing with GLive T80 (bluenviron#4002) (bluenviron#4583)
1 parent 3c70305 commit 500d18b

File tree

10 files changed

+286
-78
lines changed

10 files changed

+286
-78
lines changed

internal/core/core.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
//go:embed VERSION
4040
var version []byte
4141

42+
var timeNow = time.Now
43+
4244
var defaultConfPaths = []string{
4345
"rtsp-simple-server.yml",
4446
"mediamtx.yml",
@@ -462,6 +464,7 @@ func (p *Core) createResources(initial bool) error {
462464
RunOnConnectRestart: p.conf.RunOnConnectRestart,
463465
RunOnDisconnect: p.conf.RunOnDisconnect,
464466
ExternalCmdPool: p.externalCmdPool,
467+
TimeNow: timeNow,
465468
Metrics: p.metrics,
466469
PathManager: p.pathManager,
467470
Parent: p,
@@ -489,6 +492,7 @@ func (p *Core) createResources(initial bool) error {
489492
RunOnConnectRestart: p.conf.RunOnConnectRestart,
490493
RunOnDisconnect: p.conf.RunOnDisconnect,
491494
ExternalCmdPool: p.externalCmdPool,
495+
TimeNow: timeNow,
492496
Metrics: p.metrics,
493497
PathManager: p.pathManager,
494498
Parent: p,

internal/core/metrics_test.go

Lines changed: 154 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -42,35 +42,45 @@ func httpPullFile(t *testing.T, hc *http.Client, u string) []byte {
4242
}
4343

4444
func TestMetrics(t *testing.T) {
45-
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
46-
require.NoError(t, err)
47-
defer os.Remove(serverCertFpath)
45+
t.Run("initial", func(t *testing.T) {
46+
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
47+
require.NoError(t, err)
48+
defer os.Remove(serverCertFpath)
49+
50+
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
51+
require.NoError(t, err)
52+
defer os.Remove(serverKeyFpath)
53+
54+
n := 0
55+
timeNow = func() time.Time {
56+
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
57+
n++
58+
return d
59+
}
60+
defer func() {
61+
timeNow = time.Now
62+
}()
4863

49-
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
50-
require.NoError(t, err)
51-
defer os.Remove(serverKeyFpath)
52-
53-
p, ok := newInstance("api: yes\n" +
54-
"hlsAlwaysRemux: yes\n" +
55-
"metrics: yes\n" +
56-
"webrtcServerCert: " + serverCertFpath + "\n" +
57-
"webrtcServerKey: " + serverKeyFpath + "\n" +
58-
"rtspEncryption: optional\n" +
59-
"rtspServerCert: " + serverCertFpath + "\n" +
60-
"rtspServerKey: " + serverKeyFpath + "\n" +
61-
"rtmpEncryption: optional\n" +
62-
"rtmpServerCert: " + serverCertFpath + "\n" +
63-
"rtmpServerKey: " + serverKeyFpath + "\n" +
64-
"paths:\n" +
65-
" all_others:\n")
66-
require.Equal(t, true, ok)
67-
defer p.Close()
68-
69-
tr := &http.Transport{}
70-
defer tr.CloseIdleConnections()
71-
hc := &http.Client{Transport: tr}
64+
p, ok := newInstance("api: yes\n" +
65+
"hlsAlwaysRemux: yes\n" +
66+
"metrics: yes\n" +
67+
"webrtcServerCert: " + serverCertFpath + "\n" +
68+
"webrtcServerKey: " + serverKeyFpath + "\n" +
69+
"rtspEncryption: optional\n" +
70+
"rtspServerCert: " + serverCertFpath + "\n" +
71+
"rtspServerKey: " + serverKeyFpath + "\n" +
72+
"rtmpEncryption: optional\n" +
73+
"rtmpServerCert: " + serverCertFpath + "\n" +
74+
"rtmpServerKey: " + serverKeyFpath + "\n" +
75+
"paths:\n" +
76+
" all_others:\n")
77+
require.Equal(t, true, ok)
78+
defer p.Close()
79+
80+
tr := &http.Transport{}
81+
defer tr.CloseIdleConnections()
82+
hc := &http.Client{Transport: tr}
7283

73-
t.Run("initial", func(t *testing.T) {
7484
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
7585

7686
require.Equal(t, `paths 0
@@ -169,6 +179,44 @@ webrtc_sessions_bytes_sent 0
169179
})
170180

171181
t.Run("with data", func(t *testing.T) {
182+
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
183+
require.NoError(t, err)
184+
defer os.Remove(serverCertFpath)
185+
186+
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
187+
require.NoError(t, err)
188+
defer os.Remove(serverKeyFpath)
189+
190+
n := 0
191+
timeNow = func() time.Time {
192+
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
193+
n++
194+
return d
195+
}
196+
defer func() {
197+
timeNow = time.Now
198+
}()
199+
200+
p, ok := newInstance("api: yes\n" +
201+
"hlsAlwaysRemux: yes\n" +
202+
"metrics: yes\n" +
203+
"webrtcServerCert: " + serverCertFpath + "\n" +
204+
"webrtcServerKey: " + serverKeyFpath + "\n" +
205+
"rtspEncryption: optional\n" +
206+
"rtspServerCert: " + serverCertFpath + "\n" +
207+
"rtspServerKey: " + serverKeyFpath + "\n" +
208+
"rtmpEncryption: optional\n" +
209+
"rtmpServerCert: " + serverCertFpath + "\n" +
210+
"rtmpServerKey: " + serverKeyFpath + "\n" +
211+
"paths:\n" +
212+
" all_others:\n")
213+
require.Equal(t, true, ok)
214+
defer p.Close()
215+
216+
tr := &http.Transport{}
217+
defer tr.CloseIdleConnections()
218+
hc := &http.Client{Transport: tr}
219+
172220
terminate := make(chan struct{})
173221
var wg sync.WaitGroup
174222
wg.Add(6)
@@ -193,6 +241,8 @@ webrtc_sessions_bytes_sent 0
193241
<-terminate
194242
}()
195243

244+
rtmpDone := make(chan struct{})
245+
196246
go func() {
197247
defer wg.Done()
198248

@@ -217,12 +267,16 @@ webrtc_sessions_bytes_sent 0
217267
err = w.WriteH264(2*time.Second, 2*time.Second, [][]byte{{5, 2, 3, 4}})
218268
require.NoError(t, err)
219269

270+
close(rtmpDone)
271+
220272
<-terminate
221273
}()
222274

223275
go func() {
224276
defer wg.Done()
225277

278+
<-rtmpDone
279+
226280
u, err := url.Parse("rtmps://localhost:1936/rtmps_path")
227281
require.NoError(t, err)
228282

@@ -248,6 +302,8 @@ webrtc_sessions_bytes_sent 0
248302
<-terminate
249303
}()
250304

305+
webrtcReady := make(chan struct{})
306+
251307
go func() {
252308
defer wg.Done()
253309

@@ -258,38 +314,52 @@ webrtc_sessions_bytes_sent 0
258314
defer tr.CloseIdleConnections()
259315
hc2 := &http.Client{Transport: tr}
260316

261-
track := &webrtc.OutgoingTrack{
317+
track1 := &webrtc.OutgoingTrack{
262318
Caps: pwebrtc.RTPCodecCapability{
263319
MimeType: pwebrtc.MimeTypeH264,
264320
ClockRate: 90000,
265321
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
266322
},
267323
}
268324

325+
track2 := &webrtc.OutgoingTrack{
326+
Caps: pwebrtc.RTPCodecCapability{
327+
MimeType: pwebrtc.MimeTypeOpus,
328+
ClockRate: 48000,
329+
Channels: 2,
330+
SDPFmtpLine: "minptime=10;useinbandfec=1;stereo=1;sprop-stereo=1",
331+
},
332+
}
333+
269334
s := &whip.Client{
270335
HTTPClient: hc2,
271336
URL: su,
272337
Log: test.NilLogger,
273338
Publish: true,
274-
OutgoingTracks: []*webrtc.OutgoingTrack{track},
339+
OutgoingTracks: []*webrtc.OutgoingTrack{track1, track2},
275340
}
276341

277342
err = s.Initialize(context.Background())
278343
require.NoError(t, err)
279344
defer checkClose(t, s.Close)
280345

281-
err = track.WriteRTP(&rtp.Packet{
282-
Header: rtp.Header{
283-
Version: 2,
284-
Marker: true,
285-
PayloadType: 96,
286-
SequenceNumber: 123,
287-
Timestamp: 45343,
288-
SSRC: 563423,
289-
},
290-
Payload: []byte{1},
291-
})
292-
require.NoError(t, err)
346+
for _, track := range s.OutgoingTracks {
347+
err = track.WriteRTP(&rtp.Packet{
348+
Header: rtp.Header{
349+
Version: 2,
350+
Marker: true,
351+
PayloadType: 96,
352+
SequenceNumber: 123,
353+
Timestamp: 45343,
354+
SSRC: 563423,
355+
},
356+
Payload: []byte{1},
357+
})
358+
require.NoError(t, err)
359+
}
360+
361+
close(webrtcReady)
362+
293363
<-terminate
294364
}()
295365

@@ -328,7 +398,8 @@ webrtc_sessions_bytes_sent 0
328398
<-terminate
329399
}()
330400

331-
time.Sleep(500*time.Millisecond + 2*time.Second)
401+
<-webrtcReady
402+
time.Sleep(1 * time.Second)
332403

333404
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
334405

@@ -426,12 +497,12 @@ webrtc_sessions_bytes_sent 0
426497
`srt_conns_bytes_send_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
427498
`srt_conns_bytes_received_drop\{id=".*?",state="publish"\} [0-9]+`+"\n"+
428499
`srt_conns_bytes_received_undecrypt\{id=".*?",state="publish"\} [0-9]+`+"\n"+
429-
`srt_conns_us_packets_send_period\{id=".*?",state="publish"\} \d+\.\d+`+"\n"+
500+
`srt_conns_us_packets_send_period\{id=".*?",state="publish"\} \d+(\.\d+)?`+"\n"+
430501
`srt_conns_packets_flow_window\{id=".*?",state="publish"\} [0-9]+`+"\n"+
431502
`srt_conns_packets_flight_size\{id=".*?",state="publish"\} [0-9]+`+"\n"+
432-
`srt_conns_ms_rtt\{id=".*?",state="publish"\} \d+\.\d+`+"\n"+
503+
`srt_conns_ms_rtt\{id=".*?",state="publish"\} \d+(\.\d+)?`+"\n"+
433504
`srt_conns_mbps_send_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
434-
`srt_conns_mbps_receive_rate\{id=".*?",state="publish"\} [0-9]+`+"\n"+
505+
`srt_conns_mbps_receive_rate\{id=".*?",state="publish"\} \d+(\.\d+)?`+"\n"+
435506
`srt_conns_mbps_link_capacity\{id=".*?",state="publish"\} [0-9]+`+"\n"+
436507
`srt_conns_bytes_avail_send_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
437508
`srt_conns_bytes_avail_receive_buf\{id=".*?",state="publish"\} [0-9]+`+"\n"+
@@ -460,6 +531,44 @@ webrtc_sessions_bytes_sent 0
460531
})
461532

462533
t.Run("servers disabled", func(t *testing.T) {
534+
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
535+
require.NoError(t, err)
536+
defer os.Remove(serverCertFpath)
537+
538+
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
539+
require.NoError(t, err)
540+
defer os.Remove(serverKeyFpath)
541+
542+
n := 0
543+
timeNow = func() time.Time {
544+
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
545+
n++
546+
return d
547+
}
548+
defer func() {
549+
timeNow = time.Now
550+
}()
551+
552+
p, ok := newInstance("api: yes\n" +
553+
"hlsAlwaysRemux: yes\n" +
554+
"metrics: yes\n" +
555+
"webrtcServerCert: " + serverCertFpath + "\n" +
556+
"webrtcServerKey: " + serverKeyFpath + "\n" +
557+
"rtspEncryption: optional\n" +
558+
"rtspServerCert: " + serverCertFpath + "\n" +
559+
"rtspServerKey: " + serverKeyFpath + "\n" +
560+
"rtmpEncryption: optional\n" +
561+
"rtmpServerCert: " + serverCertFpath + "\n" +
562+
"rtmpServerKey: " + serverKeyFpath + "\n" +
563+
"paths:\n" +
564+
" all_others:\n")
565+
require.Equal(t, true, ok)
566+
defer p.Close()
567+
568+
tr := &http.Transport{}
569+
defer tr.CloseIdleConnections()
570+
hc := &http.Client{Transport: tr}
571+
463572
httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", map[string]interface{}{
464573
"rtsp": false,
465574
"rtmp": false,

internal/core/path_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,16 @@ func TestPathRunOnRead(t *testing.T) {
447447
require.NoError(t, err)
448448
defer conn.Close()
449449

450+
n := 0
451+
timeNow := func() time.Time {
452+
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
453+
n++
454+
return d
455+
}
456+
450457
r := &rtmp.Reader{
451-
Conn: conn,
458+
Conn: conn,
459+
TimeNow: timeNow,
452460
}
453461
err = r.Initialize()
454462
require.NoError(t, err)
@@ -483,8 +491,16 @@ func TestPathRunOnRead(t *testing.T) {
483491
}
484492
}()
485493

494+
n := 0
495+
timeNow := func() time.Time {
496+
d := time.Date(2009, 5, 20, 22, 15, 25, 427000, time.Local).Add(time.Duration(n) * 2 * time.Second)
497+
n++
498+
return d
499+
}
500+
486501
r := &rtmp.Reader{
487-
Conn: conn,
502+
Conn: conn,
503+
TimeNow: timeNow,
488504
}
489505
err = r.Initialize()
490506
require.NoError(t, err)

0 commit comments

Comments
 (0)