Skip to content

Commit f078e12

Browse files
committed
server: fix panic due to regression in #887
This happened when writing a TCP packet to a conn after a read error.
1 parent ead4471 commit f078e12

File tree

7 files changed

+67
-42
lines changed

7 files changed

+67
-42
lines changed

client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,7 @@ func (c *Client) doClose() {
975975

976976
if c.reader != nil {
977977
c.nconn.Close()
978-
c.reader.wait()
978+
c.reader.close()
979979
c.reader = nil
980980
c.nconn = nil
981981
c.conn = nil

client_reader.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,16 @@ type clientReader struct {
1212

1313
mutex sync.Mutex
1414
allowInterleavedFrames bool
15+
16+
terminate chan struct{}
17+
18+
done chan struct{}
1519
}
1620

1721
func (r *clientReader) start() {
22+
r.terminate = make(chan struct{})
23+
r.done = make(chan struct{})
24+
1825
go r.run()
1926
}
2027

@@ -24,19 +31,20 @@ func (r *clientReader) setAllowInterleavedFrames(v bool) {
2431
r.allowInterleavedFrames = v
2532
}
2633

27-
func (r *clientReader) wait() {
28-
for {
29-
select {
30-
case <-r.c.chResponse:
31-
case <-r.c.chRequest:
32-
case <-r.c.chReadError:
33-
return
34-
}
35-
}
34+
func (r *clientReader) close() {
35+
close(r.terminate)
36+
<-r.done
3637
}
3738

3839
func (r *clientReader) run() {
39-
r.c.chReadError <- r.runInner()
40+
defer close(r.done)
41+
42+
err := r.runInner()
43+
44+
select {
45+
case r.c.chReadError <- err:
46+
case <-r.terminate:
47+
}
4048
}
4149

4250
func (r *clientReader) runInner() error {
@@ -48,10 +56,16 @@ func (r *clientReader) runInner() error {
4856

4957
switch what := what.(type) {
5058
case *base.Response:
51-
r.c.chResponse <- what
59+
select {
60+
case r.c.chResponse <- what:
61+
case <-r.terminate:
62+
}
5263

5364
case *base.Request:
54-
r.c.chRequest <- what
65+
select {
66+
case r.c.chRequest <- what:
67+
case <-r.terminate:
68+
}
5569

5670
case *base.InterleavedFrame:
5771
r.mutex.Lock()

server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,9 @@ func (s *Server) runInner() error {
401401
sc.Close()
402402

403403
case req := <-s.chHandleHTTPChannel:
404+
if _, ok := s.conns[req.sc]; !ok {
405+
continue
406+
}
404407
if !req.write {
405408
req.sc.httpReadTunnelID = req.tunnelID
406409
s.httpReadChannels[req.sc] = req.res
@@ -551,8 +554,6 @@ func (s *Server) handleHTTPChannel(req sessionHandleHTTPChannelReq) error {
551554

552555
select {
553556
case s.chHandleHTTPChannel <- req:
554-
case <-req.sc.ctx.Done():
555-
return fmt.Errorf("terminated")
556557
case <-s.ctx.Done():
557558
return fmt.Errorf("terminated")
558559
}

server_conn.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/bluenviron/gortsplib/v4/pkg/auth"
1717
"github.com/bluenviron/gortsplib/v4/pkg/base"
1818
"github.com/bluenviron/gortsplib/v4/pkg/bytecounter"
19+
"github.com/bluenviron/gortsplib/v4/pkg/conn"
1920
"github.com/bluenviron/gortsplib/v4/pkg/description"
2021
"github.com/bluenviron/gortsplib/v4/pkg/headers"
2122
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
@@ -205,8 +206,8 @@ type ServerConn struct {
205206
userData interface{}
206207
remoteAddr *net.TCPAddr
207208
bc *bytecounter.ByteCounter
209+
conn *conn.Conn
208210
session *ServerSession
209-
reader *serverConnReader
210211
authNonce string
211212
httpReadBuf *bufio.Reader
212213
httpReadTunnelID string
@@ -362,10 +363,10 @@ func (sc *ServerConn) run() {
362363
})
363364
}
364365

365-
sc.reader = &serverConnReader{
366+
reader := &serverConnReader{
366367
sc: sc,
367368
}
368-
sc.reader.initialize()
369+
reader.initialize()
369370

370371
err := sc.runInner()
371372

@@ -375,9 +376,7 @@ func (sc *ServerConn) run() {
375376
sc.nconn.Close()
376377
}
377378

378-
if sc.reader != nil {
379-
sc.reader.wait()
380-
}
379+
reader.wait()
381380

382381
if sc.session != nil {
383382
sc.session.removeConn(sc)
@@ -400,7 +399,6 @@ func (sc *ServerConn) runInner() error {
400399
req.res <- sc.handleRequestOuter(req.req)
401400

402401
case err := <-sc.chReadError:
403-
sc.reader = nil
404402
return err
405403

406404
case ss := <-sc.chRemoveSession:
@@ -629,7 +627,7 @@ func (sc *ServerConn) handleRequestOuter(req *base.Request) error {
629627
}
630628

631629
sc.nconn.SetWriteDeadline(time.Now().Add(sc.s.WriteTimeout))
632-
err2 := sc.reader.conn.WriteResponse(res)
630+
err2 := sc.conn.WriteResponse(res)
633631
if err == nil && err2 != nil {
634632
err = err2
635633
}

server_conn_reader.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,30 @@ func isSwitchReadFuncError(err error) bool {
3636
}
3737

3838
type serverConnReader struct {
39-
sc *ServerConn
40-
conn *conn.Conn
39+
sc *ServerConn
40+
41+
done chan struct{}
4142
}
4243

4344
func (cr *serverConnReader) initialize() {
45+
cr.done = make(chan struct{})
46+
4447
go cr.run()
4548
}
4649

4750
func (cr *serverConnReader) wait() {
48-
for {
49-
select {
50-
case <-cr.sc.chReadError:
51-
return
52-
53-
case req := <-cr.sc.chRequest:
54-
req.res <- fmt.Errorf("terminated")
55-
}
56-
}
51+
<-cr.done
5752
}
5853

5954
func (cr *serverConnReader) run() {
60-
cr.sc.chReadError <- cr.runInner()
55+
defer close(cr.done)
56+
57+
err := cr.runInner()
58+
59+
select {
60+
case cr.sc.chReadError <- err:
61+
case <-cr.sc.ctx.Done():
62+
}
6163
}
6264

6365
func (cr *serverConnReader) runInner() error {
@@ -71,7 +73,7 @@ func (cr *serverConnReader) runInner() error {
7173
}
7274
}
7375

74-
cr.conn = conn.NewConn(bufio.NewReader(rw), rw)
76+
cr.sc.conn = conn.NewConn(bufio.NewReader(rw), rw)
7577

7678
readFunc := cr.readFuncStandard
7779

@@ -171,7 +173,7 @@ func (cr *serverConnReader) readFuncStandard() error {
171173
cr.sc.nconn.SetReadDeadline(time.Time{})
172174

173175
for {
174-
what, err := cr.conn.Read()
176+
what, err := cr.sc.conn.Read()
175177
if err != nil {
176178
return err
177179
}
@@ -180,7 +182,12 @@ func (cr *serverConnReader) readFuncStandard() error {
180182
case *base.Request:
181183
cres := make(chan error)
182184
req := readReq{req: what, res: cres}
183-
cr.sc.chRequest <- req
185+
186+
select {
187+
case cr.sc.chRequest <- req:
188+
case <-cr.sc.ctx.Done():
189+
return fmt.Errorf("terminated")
190+
}
184191

185192
err = <-cres
186193
if err != nil {
@@ -207,7 +214,7 @@ func (cr *serverConnReader) readFuncTCP() error {
207214
cr.sc.nconn.SetReadDeadline(time.Now().Add(cr.sc.s.ReadTimeout))
208215
}
209216

210-
what, err := cr.conn.Read()
217+
what, err := cr.sc.conn.Read()
211218
if err != nil {
212219
return err
213220
}
@@ -216,7 +223,12 @@ func (cr *serverConnReader) readFuncTCP() error {
216223
case *base.Request:
217224
cres := make(chan error)
218225
req := readReq{req: what, res: cres}
219-
cr.sc.chRequest <- req
226+
227+
select {
228+
case cr.sc.chRequest <- req:
229+
case <-cr.sc.ctx.Done():
230+
return fmt.Errorf("terminated")
231+
}
220232

221233
err = <-cres
222234
if err != nil {

server_session_format.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (sf *serverSessionFormat) writePacketRTPInQueueTCP(payload []byte) error {
179179
sf.sm.ss.tcpFrame.Channel = sf.sm.tcpChannel
180180
sf.sm.ss.tcpFrame.Payload = payload
181181
sf.sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sf.sm.ss.s.WriteTimeout))
182-
err := sf.sm.ss.tcpConn.reader.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer)
182+
err := sf.sm.ss.tcpConn.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer)
183183
if err != nil {
184184
return err
185185
}

server_session_media.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
483483
sm.ss.tcpFrame.Channel = sm.tcpChannel + 1
484484
sm.ss.tcpFrame.Payload = payload
485485
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
486-
err := sm.ss.tcpConn.reader.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
486+
err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
487487
if err != nil {
488488
return err
489489
}

0 commit comments

Comments
 (0)