Skip to content

Commit 61382e4

Browse files
authored
fix memory leak when reloading the configuration (#4855)
When a path has a MPEG-TS, RTP or WebRTC source and the path configuration is reloaded, a routine was left open because the reload channel was not handled. This fixes the issue.
1 parent 6d4dfff commit 61382e4

File tree

10 files changed

+158
-93
lines changed

10 files changed

+158
-93
lines changed

internal/staticsources/hls/source_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,20 @@ func TestSource(t *testing.T) {
102102
ctx, ctxCancel := context.WithCancel(context.Background())
103103
defer ctxCancel()
104104

105+
reloadConf := make(chan *conf.Path)
106+
105107
go func() {
106108
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
107109
Context: ctx,
108110
ResolvedSource: "http://localhost:5780/stream.m3u8",
109111
Conf: &conf.Path{},
112+
ReloadConf: reloadConf,
110113
})
111114
close(done)
112115
}()
113116

114117
<-p.Unit
118+
119+
// the source must be listening on ReloadConf
120+
reloadConf <- nil
115121
}

internal/staticsources/mpegts/source.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,19 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
6666
readerErr <- s.runReader(nc)
6767
}()
6868

69-
select {
70-
case err = <-readerErr:
71-
nc.Close()
72-
return err
69+
for {
70+
select {
71+
case err = <-readerErr:
72+
nc.Close()
73+
return err
7374

74-
case <-params.Context.Done():
75-
nc.Close()
76-
<-readerErr
77-
return fmt.Errorf("terminated")
75+
case <-params.ReloadConf:
76+
77+
case <-params.Context.Done():
78+
nc.Close()
79+
<-readerErr
80+
return fmt.Errorf("terminated")
81+
}
7882
}
7983
}
8084

internal/staticsources/mpegts/source_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ func TestSourceUDP(t *testing.T) {
6969
ctx, ctxCancel := context.WithCancel(context.Background())
7070
defer ctxCancel()
7171

72+
reloadConf := make(chan *conf.Path)
73+
7274
go func() {
7375
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
7476
Context: ctx,
7577
ResolvedSource: src,
7678
Conf: &conf.Path{},
79+
ReloadConf: reloadConf,
7780
})
7881
close(done)
7982
}()
@@ -128,6 +131,9 @@ func TestSourceUDP(t *testing.T) {
128131
require.NoError(t, err)
129132

130133
<-p.Unit
134+
135+
// the source must be listening on ReloadConf
136+
reloadConf <- nil
131137
})
132138
}
133139
}

internal/staticsources/rtmp/source_test.go

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -51,46 +51,6 @@ func TestSource(t *testing.T) {
5151

5252
defer ln.Close()
5353

54-
go func() {
55-
for {
56-
nconn, err := ln.Accept()
57-
require.NoError(t, err)
58-
defer nconn.Close()
59-
60-
conn := &rtmp.ServerConn{
61-
RW: nconn,
62-
}
63-
err = conn.Initialize()
64-
require.NoError(t, err)
65-
66-
if auth == "auth" {
67-
err = conn.CheckCredentials("myuser", "mypass")
68-
if err != nil {
69-
continue
70-
}
71-
}
72-
73-
err = conn.Accept()
74-
require.NoError(t, err)
75-
76-
w := &rtmp.Writer{
77-
Conn: conn,
78-
VideoTrack: test.FormatH264,
79-
AudioTrack: test.FormatMPEG4Audio,
80-
}
81-
err = w.Initialize()
82-
require.NoError(t, err)
83-
84-
err = w.WriteH264(2*time.Second, 2*time.Second, [][]byte{{5, 2, 3, 4}})
85-
require.NoError(t, err)
86-
87-
err = w.WriteH264(3*time.Second, 3*time.Second, [][]byte{{5, 2, 3, 4}})
88-
require.NoError(t, err)
89-
90-
break
91-
}
92-
}()
93-
9454
var source string
9555

9656
if encryption == "plain" {
@@ -121,18 +81,62 @@ func TestSource(t *testing.T) {
12181
ctx, ctxCancel := context.WithCancel(context.Background())
12282
defer ctxCancel()
12383

84+
reloadConf := make(chan *conf.Path)
85+
12486
go func() {
12587
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
12688
Context: ctx,
12789
ResolvedSource: source,
12890
Conf: &conf.Path{
12991
SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
13092
},
93+
ReloadConf: reloadConf,
13194
})
13295
close(done)
13396
}()
13497

98+
for {
99+
nconn, err := ln.Accept()
100+
require.NoError(t, err)
101+
defer nconn.Close()
102+
103+
conn := &rtmp.ServerConn{
104+
RW: nconn,
105+
}
106+
err = conn.Initialize()
107+
require.NoError(t, err)
108+
109+
if auth == "auth" {
110+
err = conn.CheckCredentials("myuser", "mypass")
111+
if err != nil {
112+
continue
113+
}
114+
}
115+
116+
err = conn.Accept()
117+
require.NoError(t, err)
118+
119+
w := &rtmp.Writer{
120+
Conn: conn,
121+
VideoTrack: test.FormatH264,
122+
AudioTrack: test.FormatMPEG4Audio,
123+
}
124+
err = w.Initialize()
125+
require.NoError(t, err)
126+
127+
err = w.WriteH264(2*time.Second, 2*time.Second, [][]byte{{5, 2, 3, 4}})
128+
require.NoError(t, err)
129+
130+
err = w.WriteH264(3*time.Second, 3*time.Second, [][]byte{{5, 2, 3, 4}})
131+
require.NoError(t, err)
132+
133+
break
134+
}
135+
135136
<-p.Unit
137+
138+
// the source must be listening on ReloadConf
139+
reloadConf <- nil
136140
})
137141
}
138142
}

internal/staticsources/rtp/source.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,19 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
8080
readerErr <- s.runReader(&desc, nc)
8181
}()
8282

83-
select {
84-
case err = <-readerErr:
85-
nc.Close()
86-
return err
83+
for {
84+
select {
85+
case err = <-readerErr:
86+
nc.Close()
87+
return err
8788

88-
case <-params.Context.Done():
89-
nc.Close()
90-
<-readerErr
91-
return fmt.Errorf("terminated")
89+
case <-params.ReloadConf:
90+
91+
case <-params.Context.Done():
92+
nc.Close()
93+
<-readerErr
94+
return fmt.Errorf("terminated")
95+
}
9296
}
9397
}
9498

internal/staticsources/rtp/source_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func TestSourceUDP(t *testing.T) {
6969
ctx, ctxCancel := context.WithCancel(context.Background())
7070
defer ctxCancel()
7171

72+
reloadConf := make(chan *conf.Path)
73+
7274
go func() {
7375
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
7476
Context: ctx,
@@ -83,6 +85,7 @@ func TestSourceUDP(t *testing.T) {
8385
"a=rtpmap:96 H264/90000\n" +
8486
"a=fmtp:96 profile-level-id=42e01e;packetization-mode=1\n",
8587
},
88+
ReloadConf: reloadConf,
8689
})
8790
close(done)
8891
}()
@@ -139,6 +142,9 @@ func TestSourceUDP(t *testing.T) {
139142
}
140143

141144
<-p.Unit
145+
146+
// the source must be listening on ReloadConf
147+
reloadConf <- nil
142148
})
143149
}
144150
}

internal/staticsources/rtsp/source_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,16 +169,22 @@ func TestSource(t *testing.T) {
169169
ctx, ctxCancel := context.WithCancel(context.Background())
170170
defer ctxCancel()
171171

172+
reloadConf := make(chan *conf.Path)
173+
172174
go func() {
173175
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
174176
Context: ctx,
175177
ResolvedSource: ur,
176178
Conf: cnf,
179+
ReloadConf: reloadConf,
177180
})
178181
close(done)
179182
}()
180183

181184
<-p.Unit
185+
186+
// the source must be listening on ReloadConf
187+
reloadConf <- nil
182188
})
183189
}
184190
}
Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package srt
22

33
import (
4-
"bufio"
54
"context"
65
"testing"
76
"time"
@@ -20,42 +19,8 @@ func TestSource(t *testing.T) {
2019
require.NoError(t, err)
2120
defer ln.Close()
2221

23-
go func() {
24-
req, err2 := ln.Accept2()
25-
require.NoError(t, err2)
26-
27-
require.Equal(t, "sidname", req.StreamId())
28-
err2 = req.SetPassphrase("ttest1234567")
29-
require.NoError(t, err2)
30-
31-
conn, err2 := req.Accept()
32-
require.NoError(t, err2)
33-
defer conn.Close()
34-
35-
track := &mpegts.Track{
36-
Codec: &mpegts.CodecH264{},
37-
}
38-
39-
bw := bufio.NewWriter(conn)
40-
w := &mpegts.Writer{W: bw, Tracks: []*mpegts.Track{track}}
41-
err2 = w.Initialize()
42-
require.NoError(t, err2)
43-
44-
err2 = w.WriteH264(track, 0, 0, [][]byte{{ // IDR
45-
5, 1,
46-
}})
47-
require.NoError(t, err2)
48-
49-
err2 = bw.Flush()
50-
require.NoError(t, err2)
51-
52-
// wait for internal SRT queue to be written
53-
time.Sleep(500 * time.Millisecond)
54-
}()
55-
5622
p := &test.StaticSourceParent{}
5723
p.Initialize()
58-
defer p.Close()
5924

6025
so := &Source{
6126
ReadTimeout: conf.Duration(10 * time.Second),
@@ -68,14 +33,50 @@ func TestSource(t *testing.T) {
6833
ctx, ctxCancel := context.WithCancel(context.Background())
6934
defer ctxCancel()
7035

36+
reloadConf := make(chan *conf.Path)
37+
7138
go func() {
7239
so.Run(defs.StaticSourceRunParams{ //nolint:errcheck
7340
Context: ctx,
7441
ResolvedSource: "srt://127.0.0.1:9002?streamid=sidname&passphrase=ttest1234567",
7542
Conf: &conf.Path{},
43+
ReloadConf: reloadConf,
7644
})
7745
close(done)
7846
}()
7947

48+
req, err2 := ln.Accept2()
49+
require.NoError(t, err2)
50+
51+
require.Equal(t, "sidname", req.StreamId())
52+
err2 = req.SetPassphrase("ttest1234567")
53+
require.NoError(t, err2)
54+
55+
conn, err2 := req.Accept()
56+
require.NoError(t, err2)
57+
defer conn.Close()
58+
59+
track := &mpegts.Track{Codec: &mpegts.CodecH264{}}
60+
61+
w := &mpegts.Writer{W: conn, Tracks: []*mpegts.Track{track}}
62+
err2 = w.Initialize()
63+
require.NoError(t, err2)
64+
65+
err2 = w.WriteH264(track, 0, 0, [][]byte{{ // IDR
66+
5, 1,
67+
}})
68+
require.NoError(t, err2)
69+
70+
err = w.WriteH264(track, 0, 0, [][]byte{{ // non-IDR
71+
5, 2,
72+
}})
73+
require.NoError(t, err)
74+
8075
<-p.Unit
76+
77+
// the source must be listening on ReloadConf
78+
reloadConf <- nil
79+
80+
// stop test reader before 2nd H264 packet is received to avoid a crash
81+
p.Close()
8182
}

0 commit comments

Comments
 (0)