Skip to content

Commit f9ce572

Browse files
jereroblesaler9
andauthored
support assigning paths to different configurations without closing stream (#4576)
Co-authored-by: aler9 <[email protected]>
1 parent c21c969 commit f9ce572

File tree

2 files changed

+135
-18
lines changed

2 files changed

+135
-18
lines changed

internal/core/path_manager.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import (
1919
func pathConfCanBeUpdated(oldPathConf *conf.Path, newPathConf *conf.Path) bool {
2020
clone := oldPathConf.Clone()
2121

22+
clone.Name = newPathConf.Name
23+
clone.Regexp = newPathConf.Regexp
24+
2225
clone.Record = newPathConf.Record
2326

2427
clone.RPICameraBrightness = newPathConf.RPICameraBrightness
@@ -51,8 +54,9 @@ type pathSetHLSServerReq struct {
5154
}
5255

5356
type pathData struct {
54-
path *path
55-
ready bool
57+
path *path
58+
ready bool
59+
confName string
5660
}
5761

5862
type pathManagerParent interface {
@@ -209,34 +213,37 @@ func (pm *pathManager) doReloadConf(newPaths map[string]*conf.Path) {
209213
// process existing paths
210214
for pathName, pathData := range pm.paths {
211215
path := pathData.path
212-
pathConf, _, err := conf.FindPathConf(newPaths, pathName)
216+
newPathConf, _, err := conf.FindPathConf(newPaths, pathName)
213217
// path does not have a config anymore: delete it
214218
if err != nil {
215-
pm.removePath(path)
216-
path.close()
217-
path.wait() // avoid conflicts between sources
219+
pm.removeAndClosePath(path)
218220
continue
219221
}
220222

221-
// path now belongs to a different config: delete it
222-
if pathConf.Name != path.conf.Name {
223-
pm.removePath(path)
224-
path.close()
225-
path.wait() // avoid conflicts between sources
223+
// path now belongs to a different config
224+
if newPathConf.Name != pathData.confName {
225+
// path config can be hot reloaded
226+
oldPathConf := pm.pathConfs[pathData.confName]
227+
if pathConfCanBeUpdated(oldPathConf, newPathConf) {
228+
pm.paths[path.name].confName = newPathConf.Name
229+
go path.reloadConf(newPathConf)
230+
continue
231+
}
232+
233+
// Configuration cannot be hot reloaded: delete the path
234+
pm.removeAndClosePath(path)
226235
continue
227236
}
228237

229238
// path configuration has changed and cannot be hot reloaded: delete path
230-
if _, ok := confsToRecreate[pathConf.Name]; ok {
231-
pm.removePath(path)
232-
path.close()
233-
path.wait() // avoid conflicts between sources
239+
if _, ok := confsToRecreate[newPathConf.Name]; ok {
240+
pm.removeAndClosePath(path)
234241
continue
235242
}
236243

237244
// path configuration has changed but can be hot reloaded: reload it
238-
if _, ok := confsToReload[pathConf.Name]; ok {
239-
go path.reloadConf(pathConf)
245+
if _, ok := confsToReload[newPathConf.Name]; ok {
246+
go path.reloadConf(newPathConf)
240247
}
241248
}
242249

@@ -252,6 +259,12 @@ func (pm *pathManager) doReloadConf(newPaths map[string]*conf.Path) {
252259
}
253260
}
254261

262+
func (pm *pathManager) removeAndClosePath(path *path) {
263+
pm.removePath(path)
264+
path.close()
265+
path.wait() // avoid conflicts between sources
266+
}
267+
255268
func (pm *pathManager) doSetHLSServer(m *hls.Server) []defs.Path {
256269
pm.hlsServer = m
257270

@@ -425,7 +438,10 @@ func (pm *pathManager) createPath(
425438
}
426439
pa.initialize()
427440

428-
pm.paths[name] = &pathData{path: pa}
441+
pm.paths[name] = &pathData{
442+
path: pa,
443+
confName: pathConf.Name,
444+
}
429445
}
430446

431447
func (pm *pathManager) removePath(pa *path) {

internal/core/path_manager_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,18 @@ package core
33
import (
44
"bufio"
55
"net"
6+
"net/http"
67
"testing"
8+
"time"
79

10+
"github.com/bluenviron/gortsplib/v4"
811
"github.com/bluenviron/gortsplib/v4/pkg/base"
12+
"github.com/bluenviron/gortsplib/v4/pkg/description"
913
"github.com/bluenviron/gortsplib/v4/pkg/headers"
14+
"github.com/pion/rtp"
1015
"github.com/stretchr/testify/require"
16+
17+
"github.com/bluenviron/mediamtx/internal/test"
1118
)
1219

1320
func TestPathAutoDeletion(t *testing.T) {
@@ -82,3 +89,97 @@ func TestPathAutoDeletion(t *testing.T) {
8289
})
8390
}
8491
}
92+
93+
func TestPathConfigurationHotReload(t *testing.T) {
94+
// Start MediaMTX with basic configuration
95+
p, ok := newInstance("api: yes\n" +
96+
"paths:\n" +
97+
" all:\n" +
98+
" record: no\n")
99+
require.Equal(t, true, ok)
100+
defer p.Close()
101+
102+
// Set up HTTP client for API calls
103+
tr := &http.Transport{}
104+
defer tr.CloseIdleConnections()
105+
hc := &http.Client{Transport: tr}
106+
107+
// Create a publisher that will use the "all" configuration
108+
media0 := test.UniqueMediaH264()
109+
source := gortsplib.Client{}
110+
err := source.StartRecording(
111+
"rtsp://localhost:8554/undefined_stream",
112+
&description.Session{Medias: []*description.Media{media0}})
113+
require.NoError(t, err)
114+
defer source.Close()
115+
116+
// Send some data to establish the stream
117+
err = source.WritePacketRTP(media0, &rtp.Packet{
118+
Header: rtp.Header{
119+
Version: 2,
120+
PayloadType: 96,
121+
},
122+
Payload: []byte{5, 1, 2, 3, 4},
123+
})
124+
require.NoError(t, err)
125+
126+
time.Sleep(100 * time.Millisecond)
127+
128+
// Verify the path exists and is using the "all" configuration
129+
pathData, err := p.pathManager.APIPathsGet("undefined_stream")
130+
require.NoError(t, err)
131+
require.Equal(t, "undefined_stream", pathData.Name)
132+
require.Equal(t, "all", pathData.ConfName)
133+
134+
// Check the current configuration via API
135+
var allConfig map[string]interface{}
136+
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/all", nil, &allConfig)
137+
require.Equal(t, false, allConfig["record"]) // Should be false from "all" config
138+
139+
// Add a new specific configuration for "undefined_stream" with record enabled
140+
httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/undefined_stream",
141+
map[string]interface{}{
142+
"record": true,
143+
}, nil)
144+
145+
// Give the system time to process the configuration change
146+
time.Sleep(200 * time.Millisecond)
147+
148+
// Verify the path now uses the new specific configuration
149+
pathData, err = p.pathManager.APIPathsGet("undefined_stream")
150+
require.NoError(t, err)
151+
require.Equal(t, "undefined_stream", pathData.Name)
152+
require.Equal(t, "undefined_stream", pathData.ConfName) // Should now use the specific config
153+
154+
// Check the new configuration via API
155+
var newConfig map[string]interface{}
156+
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/undefined_stream", nil, &newConfig)
157+
require.Equal(t, true, newConfig["record"]) // Should be true from new config
158+
159+
// Verify the stream is still active and working
160+
err = source.WritePacketRTP(media0, &rtp.Packet{
161+
Header: rtp.Header{
162+
Version: 2,
163+
PayloadType: 96,
164+
SequenceNumber: 2,
165+
},
166+
Payload: []byte{5, 1, 2, 3, 4},
167+
})
168+
require.NoError(t, err)
169+
170+
// Verify the path is still ready and functional
171+
require.Equal(t, true, pathData.Ready)
172+
173+
// revert configuration
174+
httpRequest(t, hc, http.MethodDelete, "http://localhost:9997/v3/config/paths/delete/undefined_stream",
175+
nil, nil)
176+
177+
// Give the system time to process the configuration change
178+
time.Sleep(200 * time.Millisecond)
179+
180+
// Verify the path now uses the old configuration
181+
pathData, err = p.pathManager.APIPathsGet("undefined_stream")
182+
require.NoError(t, err)
183+
require.Equal(t, "undefined_stream", pathData.Name)
184+
require.Equal(t, "all", pathData.ConfName)
185+
}

0 commit comments

Comments
 (0)