Skip to content

Commit 0b67ed8

Browse files
feat:support receive stream via UDP (#326)
Co-authored-by: yjx <yjx>
1 parent 2311931 commit 0b67ed8

File tree

5 files changed

+316
-55
lines changed

5 files changed

+316
-55
lines changed

plugin/gb28181/dialog.go

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -136,18 +136,34 @@ func (d *Dialog) Start() (err error) {
136136
d.pullCtx.GoToStepConst(StepSIPPrepare)
137137

138138
//defer d.gb.dialogs.Remove(d)
139-
if d.gb.tcpPort > 0 {
140-
d.MediaPort = d.gb.tcpPort
141-
} else {
142-
if d.gb.MediaPort.Valid() {
143-
select {
144-
case d.MediaPort = <-d.gb.tcpPorts:
145-
default:
146-
d.pullCtx.Fail("no available tcp port")
147-
return fmt.Errorf("no available tcp port")
139+
if d.StreamMode == mrtp.StreamModeTCPPassive {
140+
if d.gb.tcpPort > 0 {
141+
d.MediaPort = d.gb.tcpPort
142+
} else {
143+
if d.gb.MediaPort.Valid() {
144+
select {
145+
case d.MediaPort = <-d.gb.tcpPorts:
146+
default:
147+
d.pullCtx.Fail("no available tcp port")
148+
return fmt.Errorf("no available tcp port")
149+
}
150+
} else {
151+
d.MediaPort = d.gb.MediaPort[0]
148152
}
153+
}
154+
} else if d.StreamMode == mrtp.StreamModeUDP {
155+
if d.gb.udpPort > 0 {
156+
d.MediaPort = d.gb.udpPort
149157
} else {
150-
d.MediaPort = d.gb.MediaPort[0]
158+
if d.gb.MediaPort.Valid() {
159+
select {
160+
case d.MediaPort = <-d.gb.udpPorts:
161+
default:
162+
return fmt.Errorf("no available udp port")
163+
}
164+
} else {
165+
d.MediaPort = d.gb.MediaPort[0]
166+
}
151167
}
152168
}
153169

@@ -214,7 +230,9 @@ func (d *Dialog) Start() (err error) {
214230
"a=connection:new",
215231
)
216232
case mrtp.StreamModeUDP:
233+
/* 支持udp收流 yjx
217234
return errors.New("do not support udp mode")
235+
*/
218236
default:
219237
sdpInfo = append(sdpInfo,
220238
"a=setup:passive",
@@ -356,7 +374,7 @@ func (d *Dialog) Run() (err error) {
356374
pub.Publisher = d.pullCtx.Publisher
357375
if d.StreamMode == mrtp.StreamModeTCPActive {
358376
pub.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
359-
} else {
377+
} else if d.StreamMode == mrtp.StreamModeTCPPassive {
360378
if d.gb.tcpPort > 0 {
361379
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
362380
if d.gb.netListener != nil {
@@ -370,6 +388,20 @@ func (d *Dialog) Run() (err error) {
370388
pub.SSRC = d.SSRC
371389
}
372390
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
391+
} else if d.StreamMode == mrtp.StreamModeUDP {
392+
if d.gb.udpPort > 0 {
393+
d.Info("into single port mode, use gb.udpPort", d.gb.udpPort)
394+
if d.gb.netUDPListener != nil {
395+
d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr())
396+
pub.ListenerUdp = d.gb.netUDPListener
397+
} else {
398+
d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort))
399+
pub.ListenerUdp, _ = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4)
400+
d.gb.netUDPListener = pub.ListenerUdp
401+
}
402+
}
403+
pub.SSRC = d.SSRC
404+
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
373405
}
374406
pub.StreamMode = d.StreamMode
375407
return d.RunTask(&pub)
@@ -380,9 +412,16 @@ func (d *Dialog) GetKey() string {
380412
}
381413

382414
func (d *Dialog) Dispose() {
383-
if d.gb.tcpPort == 0 {
384-
// 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用
385-
d.gb.tcpPorts <- d.MediaPort
415+
if d.StreamMode == mrtp.StreamModeUDP {
416+
if d.gb.udpPort == 0 { //多端口
417+
// 如果没有设置udp端口,则将MediaPort设置为0,表示不再使用
418+
d.gb.udpPorts <- d.MediaPort
419+
}
420+
} else {
421+
if d.gb.tcpPort == 0 {
422+
// 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用
423+
d.gb.tcpPorts <- d.MediaPort
424+
}
386425
}
387426
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId)
388427
if d.session != nil {

plugin/gb28181/index.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/emiago/sipgo"
1919
"github.com/emiago/sipgo/sip"
20+
"github.com/pion/rtp"
2021
"github.com/rs/zerolog"
2122
m7s "m7s.live/v5"
2223
"m7s.live/v5/pkg/config"
@@ -65,6 +66,9 @@ type GB28181Plugin struct {
6566
Platforms []*gb28181.PlatformModel
6667
channels util.Collection[string, *Channel]
6768
netListener net.Listener
69+
udpPorts chan uint16
70+
udpPort uint16
71+
netUDPListener *net.UDPConn
6872
}
6973

7074
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -89,6 +93,14 @@ func (gb *GB28181Plugin) Dispose() {
8993
gb.Info("netListener closed")
9094
}
9195
}
96+
if gb.netUDPListener != nil {
97+
err := gb.netUDPListener.Close()
98+
if err != nil {
99+
gb.Error("Close netUDPListener error", "error", err)
100+
} else {
101+
gb.Info("netUDPListener closed")
102+
}
103+
}
92104
}
93105

94106
func init() {
@@ -173,15 +185,40 @@ func (gb *GB28181Plugin) Start() (err error) {
173185
if gb.MediaPort.Valid() {
174186
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
175187
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
188+
gb.udpPorts = make(chan uint16, gb.MediaPort.Size())
176189
if gb.MediaPort.Size() == 0 {
177190
gb.tcpPort = gb.MediaPort[0]
178191
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
192+
//support udp
193+
{
194+
gb.udpPort = gb.MediaPort[0]
195+
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
196+
197+
if err != nil {
198+
gb.Error("start listen", "err", err)
199+
return errors.New("start udp listen, err" + err.Error())
200+
}
201+
go gb.ReadUdpInsinglePort()
202+
}
179203
} else if gb.MediaPort.Size() == 1 {
180204
gb.tcpPort = gb.MediaPort[0] + 1
181205
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
206+
//support udp
207+
{
208+
gb.udpPort = gb.MediaPort[0] + 1
209+
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
210+
211+
if err != nil {
212+
gb.Error("start listen", "err", err)
213+
return errors.New("start udp listen, err" + err.Error())
214+
}
215+
216+
go gb.ReadUdpInsinglePort()
217+
}
182218
} else {
183219
for i := range gb.MediaPort.Size() {
184220
gb.tcpPorts <- gb.MediaPort[0] + i
221+
gb.udpPorts <- gb.MediaPort[0] + i
185222
}
186223
}
187224
} else {
@@ -1009,3 +1046,19 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) {
10091046
return
10101047
}
10111048
}
1049+
1050+
func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) {
1051+
buffer := make(util.Buffer, 1024*1024)
1052+
var rtpPacket rtp.Packet
1053+
for {
1054+
n, _, err := gb.netUDPListener.ReadFromUDP(buffer)
1055+
if err != nil {
1056+
return err
1057+
}
1058+
1059+
ps := buffer[:n]
1060+
if err := rtpPacket.Unmarshal(ps); err != nil {
1061+
continue
1062+
}
1063+
}
1064+
}

plugin/rtp/pkg/reader.go

Lines changed: 99 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,11 @@ func (r *RTPTCPReader) Read(packet *rtp.Packet) (err error) {
6767
type RTPPayloadReader struct {
6868
IRTPReader
6969
rtp.Packet
70-
SSRC uint32 // RTP SSRC
71-
buffer util.MemoryReader
70+
SSRC uint32 // RTP SSRC
71+
buffer util.MemoryReader
72+
isUdp bool
73+
udpCache *PriorityQueueRtp
74+
UdpCacheSize int
7275
}
7376

7477
// func NewTCPRTPPayloadReaderForFeed() *RTPPayloadReader {
@@ -80,23 +83,76 @@ type RTPPayloadReader struct {
8083
// return r
8184
// }
8285

83-
func NewRTPPayloadReader(t IRTPReader) *RTPPayloadReader {
86+
func NewRTPPayloadReader(t IRTPReader, SSRC uint32, isUdp bool) *RTPPayloadReader {
8487
r := &RTPPayloadReader{}
88+
r.SSRC = SSRC
89+
r.isUdp = isUdp
8590
r.IRTPReader = t
8691
r.buffer.Memory = &util.Memory{}
92+
if isUdp {
93+
r.UdpCacheSize = 30
94+
}
95+
8796
return r
8897
}
8998

9099
func (r *RTPPayloadReader) Read(buf []byte) (n int, err error) {
91-
// 如果缓冲区中有数据,先读取缓冲区中的数据
92-
if r.buffer.Length > 0 {
93-
n, _ = r.buffer.Read(buf)
94-
return n, nil
100+
if r.isUdp {
101+
return r.ReadUDP(buf)
102+
} else {
103+
// 如果缓冲区中有数据,先读取缓冲区中的数据
104+
if r.buffer.Length > 0 {
105+
n, _ = r.buffer.Read(buf)
106+
return n, nil
107+
}
108+
for {
109+
lastSeq := r.SequenceNumber
110+
err = r.IRTPReader.Read(&r.Packet)
111+
if err != nil {
112+
err = errors.Join(err, fmt.Errorf("failed to read RTP packet"))
113+
return
114+
}
115+
116+
// 检查SSRC是否匹配
117+
if r.SSRC != 0 && r.SSRC != r.Packet.SSRC {
118+
// SSRC不匹配,继续读取下一个包
119+
continue
120+
}
121+
122+
// 检查序列号是否连续
123+
if lastSeq == 0 || r.SequenceNumber == lastSeq+1 {
124+
// 序列号连续,处理当前包的数据
125+
if lbuf, lpayload := len(buf), len(r.Payload); lbuf >= lpayload {
126+
// 缓冲区足够大,可以容纳整个负载
127+
copy(buf, r.Payload)
128+
n += lpayload
129+
130+
// 如果缓冲区还有剩余空间,继续读取下一个包
131+
if lbuf > lpayload {
132+
var nextn int
133+
nextn, err = r.Read(buf[lpayload:])
134+
if err != nil && err != io.EOF {
135+
return n, err
136+
}
137+
n += nextn
138+
}
139+
return
140+
} else {
141+
// 缓冲区不够大,只复制部分数据,将剩余数据放入缓冲区
142+
n += lbuf
143+
copy(buf, r.Payload[:lbuf])
144+
r.buffer.PushOne(r.Payload[lbuf:])
145+
r.buffer.Length = lpayload - lbuf
146+
return
147+
}
148+
}
149+
}
95150
}
151+
}
96152

153+
func (r *RTPPayloadReader) ReadUDP(buf []byte) (n int, err error) {
97154
// 读取新的RTP包
98155
for {
99-
lastSeq := r.SequenceNumber
100156
err = r.IRTPReader.Read(&r.Packet)
101157
if err != nil {
102158
err = errors.Join(err, fmt.Errorf("failed to read RTP packet"))
@@ -109,32 +165,41 @@ func (r *RTPPayloadReader) Read(buf []byte) (n int, err error) {
109165
continue
110166
}
111167

112-
// 检查序列号是否连续
113-
if lastSeq == 0 || r.SequenceNumber == lastSeq+1 {
114-
// 序列号连续,处理当前包的数据
115-
if lbuf, lpayload := len(buf), len(r.Payload); lbuf >= lpayload {
116-
// 缓冲区足够大,可以容纳整个负载
117-
copy(buf, r.Payload)
118-
n += lpayload
119-
120-
// 如果缓冲区还有剩余空间,继续读取下一个包
121-
if lbuf > lpayload {
122-
var nextn int
123-
nextn, err = r.Read(buf[lpayload:])
124-
if err != nil && err != io.EOF {
125-
return n, err
126-
}
127-
n += nextn
128-
}
129-
return
130-
} else {
131-
// 缓冲区不够大,只复制部分数据,将剩余数据放入缓冲区
132-
n += lbuf
133-
copy(buf, r.Payload[:lbuf])
134-
r.buffer.PushOne(r.Payload[lbuf:])
135-
r.buffer.Length = lpayload - lbuf
136-
return
137-
}
168+
if r.UdpCacheSize > 0 && r.udpCache == nil {
169+
r.udpCache = NewPqRtp()
170+
}
171+
172+
//加入缓存,自动排序
173+
rtpTmpCache := r.Packet
174+
rtpTmpCache.Payload = make([]byte, len(r.Payload))
175+
copy(rtpTmpCache.Payload, r.Payload)
176+
r.udpCache.Push(rtpTmpCache)
177+
178+
rtpTmp := r.Packet
179+
180+
if r.udpCache.Len() < r.UdpCacheSize-1 {
181+
continue
182+
} else {
183+
rtpTmp, _ = r.udpCache.Pop()
138184
}
185+
186+
copy(buf, rtpTmp.Payload)
187+
n += len(rtpTmp.Payload)
188+
return
189+
}
190+
}
191+
192+
func (r *RTPPayloadReader) PushRTP(rtp *rtp.Packet) {
193+
if r.SSRC != 0 && r.SSRC != r.Packet.SSRC {
194+
return
195+
}
196+
197+
if r.UdpCacheSize > 0 && r.udpCache == nil {
198+
r.udpCache = NewPqRtp()
139199
}
200+
201+
rtpTmpCache := *rtp
202+
rtpTmpCache.Payload = make([]byte, len(r.Payload))
203+
copy(rtpTmpCache.Payload, r.Payload)
204+
r.udpCache.Push(rtpTmpCache)
140205
}

0 commit comments

Comments
 (0)