Skip to content

Commit 65da49f

Browse files
authored
optimize code (#878)
* remove unused code * initialize UDP listeners and SRTP before initializing medias * make rtpSender and rtpReceiver available before PLAY / RECORD * use writerMutex to protect writer only
1 parent 1bc8966 commit 65da49f

9 files changed

+400
-456
lines changed

client.go

Lines changed: 134 additions & 82 deletions
Large diffs are not rendered by default.

client_format.go

Lines changed: 11 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package gortsplib
22

33
import (
4-
"slices"
54
"sync/atomic"
65
"time"
76

@@ -14,37 +13,12 @@ import (
1413
"github.com/bluenviron/gortsplib/v4/pkg/rtpsender"
1514
)
1615

17-
func clientPickLocalSSRC(cf *clientFormat) (uint32, error) {
18-
var takenSSRCs []uint32 //nolint:prealloc
19-
20-
for _, cm := range cf.cm.c.setuppedMedias {
21-
for _, cf := range cm.formats {
22-
takenSSRCs = append(takenSSRCs, cf.localSSRC)
23-
}
24-
}
25-
26-
for _, cf := range cf.cm.formats {
27-
takenSSRCs = append(takenSSRCs, cf.localSSRC)
28-
}
29-
30-
for {
31-
ssrc, err := randUint32()
32-
if err != nil {
33-
return 0, err
34-
}
35-
36-
if ssrc != 0 && !slices.Contains(takenSSRCs, ssrc) {
37-
return ssrc, nil
38-
}
39-
}
40-
}
41-
4216
type clientFormat struct {
4317
cm *clientMedia
4418
format format.Format
19+
localSSRC uint32
4520
onPacketRTP OnPacketRTPFunc
4621

47-
localSSRC uint32
4822
rtpReceiver *rtpreceiver.Receiver // play
4923
rtpSender *rtpsender.Sender // record or back channel
5024
writePacketRTPInQueue func([]byte) error
@@ -53,32 +27,18 @@ type clientFormat struct {
5327
rtpPacketsLost *uint64
5428
}
5529

56-
func (cf *clientFormat) initialize() error {
57-
if cf.cm.c.state == clientStatePreRecord {
58-
cf.localSSRC = cf.cm.c.announceData[cf.cm.media].formats[cf.format.PayloadType()].localSSRC
59-
} else {
60-
var err error
61-
cf.localSSRC, err = clientPickLocalSSRC(cf)
62-
if err != nil {
63-
return err
64-
}
65-
}
66-
30+
func (cf *clientFormat) initialize() {
6731
cf.rtpPacketsReceived = new(uint64)
6832
cf.rtpPacketsSent = new(uint64)
6933
cf.rtpPacketsLost = new(uint64)
7034

71-
return nil
72-
}
73-
74-
func (cf *clientFormat) start() {
7535
if cf.cm.udpRTPListener != nil {
7636
cf.writePacketRTPInQueue = cf.writePacketRTPInQueueUDP
7737
} else {
7838
cf.writePacketRTPInQueue = cf.writePacketRTPInQueueTCP
7939
}
8040

81-
if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel {
41+
if cf.cm.c.state == clientStatePreRecord || cf.cm.media.IsBackChannel {
8242
cf.rtpSender = &rtpsender.Sender{
8343
ClockRate: cf.format.ClockRate(),
8444
Period: cf.cm.c.senderReportPeriod,
@@ -110,7 +70,7 @@ func (cf *clientFormat) start() {
11070
}
11171
}
11272

113-
func (cf *clientFormat) stop() {
73+
func (cf *clientFormat) close() {
11474
if cf.rtpReceiver != nil {
11575
cf.rtpReceiver.Close()
11676
cf.rtpReceiver = nil
@@ -178,6 +138,13 @@ func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error {
178138
buf = encr
179139
}
180140

141+
cf.cm.c.writerMutex.RLock()
142+
defer cf.cm.c.writerMutex.RUnlock()
143+
144+
if cf.cm.c.writer == nil {
145+
return nil
146+
}
147+
181148
ok := cf.cm.c.writer.push(func() error {
182149
return cf.writePacketRTPInQueue(buf)
183150
})

client_media.go

Lines changed: 77 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package gortsplib
22

33
import (
4-
"crypto/rand"
54
"fmt"
65
"net"
76
"strconv"
@@ -15,159 +14,124 @@ import (
1514
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
1615
)
1716

18-
type clientMedia struct {
19-
c *Client
20-
media *description.Media
21-
secure bool
22-
23-
srtpOutCtx *wrappedSRTPContext
24-
srtpInCtx *wrappedSRTPContext
25-
onPacketRTCP OnPacketRTCPFunc
26-
formats map[uint8]*clientFormat
27-
tcpChannel int
28-
udpRTPListener *clientUDPListener
29-
udpRTCPListener *clientUDPListener
30-
writePacketRTCPInQueue func([]byte) error
31-
bytesReceived *uint64
32-
bytesSent *uint64
33-
rtpPacketsInError *uint64
34-
rtcpPacketsReceived *uint64
35-
rtcpPacketsSent *uint64
36-
rtcpPacketsInError *uint64
37-
}
38-
39-
func (cm *clientMedia) initialize() error {
40-
cm.onPacketRTCP = func(rtcp.Packet) {}
41-
cm.bytesReceived = new(uint64)
42-
cm.bytesSent = new(uint64)
43-
cm.rtpPacketsInError = new(uint64)
44-
cm.rtcpPacketsReceived = new(uint64)
45-
cm.rtcpPacketsSent = new(uint64)
46-
cm.rtcpPacketsInError = new(uint64)
47-
48-
cm.formats = make(map[uint8]*clientFormat)
49-
50-
for _, forma := range cm.media.Formats {
51-
f := &clientFormat{
52-
cm: cm,
53-
format: forma,
54-
onPacketRTP: func(*rtp.Packet) {},
55-
}
56-
err := f.initialize()
57-
if err != nil {
58-
return err
59-
}
60-
61-
cm.formats[forma.PayloadType()] = f
62-
}
63-
64-
if cm.secure {
65-
var srtpOutKey []byte
66-
if cm.c.state == clientStatePreRecord {
67-
srtpOutKey = cm.c.announceData[cm.media].srtpOutKey
68-
} else {
69-
srtpOutKey = make([]byte, srtpKeyLength)
70-
_, err := rand.Read(srtpOutKey)
71-
if err != nil {
72-
return err
73-
}
74-
}
75-
76-
ssrcs := make([]uint32, len(cm.formats))
77-
n := 0
78-
for _, cf := range cm.formats {
79-
ssrcs[n] = cf.localSSRC
80-
n++
81-
}
82-
83-
cm.srtpOutCtx = &wrappedSRTPContext{
84-
key: srtpOutKey,
85-
ssrcs: ssrcs,
86-
}
87-
err := cm.srtpOutCtx.initialize()
88-
if err != nil {
89-
return err
90-
}
91-
}
92-
93-
return nil
94-
}
95-
96-
func (cm *clientMedia) close() {
97-
if cm.udpRTPListener != nil {
98-
cm.udpRTPListener.close()
99-
cm.udpRTCPListener.close()
100-
}
101-
}
102-
103-
func (cm *clientMedia) createUDPListeners(
17+
func createUDPListenerPair(
18+
c *Client,
10419
multicast bool,
10520
multicastInterface *net.Interface,
10621
rtpAddress string,
10722
rtcpAddress string,
108-
) error {
23+
) (*clientUDPListener, *clientUDPListener, error) {
10924
if rtpAddress != ":0" {
11025
l1 := &clientUDPListener{
111-
c: cm.c,
26+
c: c,
11227
multicast: multicast,
11328
multicastInterface: multicastInterface,
11429
address: rtpAddress,
11530
}
11631
err := l1.initialize()
11732
if err != nil {
118-
return err
33+
return nil, nil, err
11934
}
12035

12136
l2 := &clientUDPListener{
122-
c: cm.c,
37+
c: c,
12338
multicast: multicast,
12439
multicastInterface: multicastInterface,
12540
address: rtcpAddress,
12641
}
12742
err = l2.initialize()
12843
if err != nil {
12944
l1.close()
130-
return err
45+
return nil, nil, err
13146
}
13247

133-
cm.udpRTPListener, cm.udpRTCPListener = l1, l2
134-
return nil
48+
return l1, l2, nil
13549
}
13650

13751
// pick two consecutive ports in range 65535-10000
13852
// RTP port must be even and RTCP port odd
13953
for {
14054
v, err := randInRange((65535 - 10000) / 2)
14155
if err != nil {
142-
return err
56+
return nil, nil, err
14357
}
14458

14559
rtpPort := v*2 + 10000
14660
rtcpPort := rtpPort + 1
14761

148-
cm.udpRTPListener = &clientUDPListener{
149-
c: cm.c,
62+
l1 := &clientUDPListener{
63+
c: c,
15064
address: net.JoinHostPort("", strconv.FormatInt(int64(rtpPort), 10)),
15165
}
152-
err = cm.udpRTPListener.initialize()
66+
err = l1.initialize()
15367
if err != nil {
154-
cm.udpRTPListener = nil
15568
continue
15669
}
15770

158-
cm.udpRTCPListener = &clientUDPListener{
159-
c: cm.c,
71+
l2 := &clientUDPListener{
72+
c: c,
16073
address: net.JoinHostPort("", strconv.FormatInt(int64(rtcpPort), 10)),
16174
}
162-
err = cm.udpRTCPListener.initialize()
75+
err = l2.initialize()
16376
if err != nil {
164-
cm.udpRTPListener.close()
165-
cm.udpRTPListener = nil
166-
cm.udpRTCPListener = nil
77+
l2.close()
16778
continue
16879
}
16980

170-
return nil
81+
return l1, l2, nil
82+
}
83+
}
84+
85+
type clientMedia struct {
86+
c *Client
87+
media *description.Media
88+
secure bool
89+
udpRTPListener *clientUDPListener
90+
udpRTCPListener *clientUDPListener
91+
tcpChannel int
92+
localSSRCs map[uint8]uint32
93+
srtpInCtx *wrappedSRTPContext
94+
srtpOutCtx *wrappedSRTPContext
95+
96+
onPacketRTCP OnPacketRTCPFunc
97+
formats map[uint8]*clientFormat
98+
writePacketRTCPInQueue func([]byte) error
99+
bytesReceived *uint64
100+
bytesSent *uint64
101+
rtpPacketsInError *uint64
102+
rtcpPacketsReceived *uint64
103+
rtcpPacketsSent *uint64
104+
rtcpPacketsInError *uint64
105+
}
106+
107+
func (cm *clientMedia) initialize() {
108+
cm.onPacketRTCP = func(rtcp.Packet) {}
109+
cm.bytesReceived = new(uint64)
110+
cm.bytesSent = new(uint64)
111+
cm.rtpPacketsInError = new(uint64)
112+
cm.rtcpPacketsReceived = new(uint64)
113+
cm.rtcpPacketsSent = new(uint64)
114+
cm.rtcpPacketsInError = new(uint64)
115+
116+
cm.formats = make(map[uint8]*clientFormat)
117+
118+
for _, forma := range cm.media.Formats {
119+
f := &clientFormat{
120+
cm: cm,
121+
format: forma,
122+
localSSRC: cm.localSSRCs[forma.PayloadType()],
123+
onPacketRTP: func(*rtp.Packet) {},
124+
}
125+
f.initialize()
126+
cm.formats[forma.PayloadType()] = f
127+
}
128+
}
129+
130+
func (cm *clientMedia) close() {
131+
cm.stop()
132+
133+
for _, ct := range cm.formats {
134+
ct.close()
171135
}
172136
}
173137

@@ -198,10 +162,6 @@ func (cm *clientMedia) start() {
198162
}
199163
}
200164

201-
for _, ct := range cm.formats {
202-
ct.start()
203-
}
204-
205165
if cm.udpRTPListener != nil {
206166
cm.udpRTPListener.start()
207167
cm.udpRTCPListener.start()
@@ -213,10 +173,6 @@ func (cm *clientMedia) stop() {
213173
cm.udpRTPListener.stop()
214174
cm.udpRTCPListener.stop()
215175
}
216-
217-
for _, ct := range cm.formats {
218-
ct.stop()
219-
}
220176
}
221177

222178
func (cm *clientMedia) findFormatByRemoteSSRC(ssrc uint32) *clientFormat {
@@ -460,6 +416,13 @@ func (cm *clientMedia) writePacketRTCP(pkt rtcp.Packet) error {
460416
buf = encr
461417
}
462418

419+
cm.c.writerMutex.RLock()
420+
defer cm.c.writerMutex.RUnlock()
421+
422+
if cm.c.writer == nil {
423+
return nil
424+
}
425+
463426
ok := cm.c.writer.push(func() error {
464427
return cm.writePacketRTCPInQueue(buf)
465428
})

0 commit comments

Comments
 (0)