Skip to content

Commit 6107dea

Browse files
authored
add RTSP-over-WebSocket (#891) (#898)
1 parent 1616c8c commit 6107dea

16 files changed

+493
-69
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Features:
1414

1515
* Client
1616
* Support secure protocol variants (RTSPS, TLS, SRTP, MIKEY)
17-
* Support RTSP-over-HTTP, RTSP-over-HTTPS
17+
* Support tunneling (RTSP-over-HTTP, RTSP-over-WebSocket)
1818
* Query servers about available media streams
1919
* Read media streams from a server ("play")
2020
* Read streams with the UDP, UDP-multicast or TCP transport protocol
@@ -30,7 +30,7 @@ Features:
3030
* Pause without disconnecting from the server
3131
* Server
3232
* Support secure protocol variants (RTSPS, TLS, SRTP, MIKEY)
33-
* Support RTSP-over-HTTP, RTSP-over-HTTPS
33+
* Support tunneling (RTSP-over-HTTP, RTSP-over-WebSocket)
3434
* Handle requests from clients
3535
* Validate client credentials
3636
* Read media streams from clients ("record")

client.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,13 +1137,22 @@ func (c *Client) connOpen() error {
11371137

11381138
var nconn net.Conn
11391139

1140-
if c.Tunnel == TunnelHTTP {
1140+
switch c.Tunnel {
1141+
case TunnelHTTP:
11411142
var err error
1142-
nconn, err = newClientHTTPTunnel(dialCtx, c.DialContext, addr, tlsConfig)
1143+
nconn, err = newClientTunnelHTTP(dialCtx, c.DialContext, addr, tlsConfig)
11431144
if err != nil {
11441145
return err
11451146
}
1146-
} else {
1147+
1148+
case TunnelWebSocket:
1149+
var err error
1150+
nconn, err = newClientTunnelWebSocket(dialCtx, c.DialContext, addr, tlsConfig)
1151+
if err != nil {
1152+
return err
1153+
}
1154+
1155+
default:
11471156
var err error
11481157
nconn, err = c.DialContext(dialCtx, "tcp", addr)
11491158
if err != nil {

client_test.go

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gortsplib
33
import (
44
"bufio"
55
"bytes"
6+
"context"
67
"crypto/tls"
78
"net"
89
"net/http"
@@ -593,7 +594,7 @@ func TestClientRelativeContentBase(t *testing.T) {
593594
require.Equal(t, "rtsp://localhost:8554/relative-content-base", desc.BaseURL.String())
594595
}
595596

596-
func TestClientHTTPTunnel(t *testing.T) {
597+
func TestClientTunnelHTTP(t *testing.T) {
597598
for _, ca := range []string{"http", "https"} {
598599
t.Run(ca, func(t *testing.T) {
599600
var l net.Listener
@@ -768,8 +769,103 @@ func TestClientHTTPTunnel(t *testing.T) {
768769
require.NoError(t, err)
769770
defer c.Close()
770771

771-
_, _, err = c.Describe(u)
772+
_, res, err := c.Describe(u)
772773
require.NoError(t, err)
774+
require.Equal(t, base.StatusOK, res.StatusCode)
775+
})
776+
}
777+
}
778+
779+
func TestClientTunnelWebSocket(t *testing.T) {
780+
for _, ca := range []string{"ws", "wss"} {
781+
t.Run(ca, func(t *testing.T) {
782+
var scheme string
783+
if ca == "ws" {
784+
scheme = "rtsp"
785+
} else {
786+
scheme = "rtsps"
787+
}
788+
789+
s := &http.Server{
790+
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
791+
require.Equal(t, r.Header.Get("Sec-WebSocket-Protocol"), "rtsp.onvif.org")
792+
793+
wconn, err := upgrader.Upgrade(w, r, nil)
794+
require.NoError(t, err)
795+
defer wconn.Close() //nolint:errcheck
796+
797+
conn := conn.NewConn(bufio.NewReader(&wsReader{wc: wconn}), &wsWriter{wc: wconn})
798+
799+
req, err2 := conn.ReadRequest()
800+
require.NoError(t, err2)
801+
require.Equal(t, base.Options, req.Method)
802+
803+
err2 = conn.WriteResponse(&base.Response{
804+
StatusCode: base.StatusOK,
805+
Header: base.Header{
806+
"Public": base.HeaderValue{strings.Join([]string{
807+
string(base.Describe),
808+
}, ", ")},
809+
},
810+
})
811+
require.NoError(t, err2)
812+
813+
req, err2 = conn.ReadRequest()
814+
require.NoError(t, err2)
815+
require.Equal(t, base.Describe, req.Method)
816+
require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL)
817+
818+
medias := []*description.Media{testH264Media}
819+
820+
err2 = conn.WriteResponse(&base.Response{
821+
StatusCode: base.StatusOK,
822+
Header: base.Header{
823+
"Content-Type": base.HeaderValue{"application/sdp; charset=utf-8"},
824+
"Content-Base": base.HeaderValue{"/relative-content-base"},
825+
},
826+
Body: mediasToSDP(medias),
827+
})
828+
require.NoError(t, err2)
829+
}),
830+
}
831+
832+
var ln net.Listener
833+
834+
if ca == "ws" {
835+
var err error
836+
ln, err = net.Listen("tcp", "localhost:8554")
837+
require.NoError(t, err)
838+
} else {
839+
cert, err := tls.X509KeyPair(serverCert, serverKey)
840+
require.NoError(t, err)
841+
842+
ln, err = tls.Listen("tcp", "localhost:8554", &tls.Config{Certificates: []tls.Certificate{cert}})
843+
require.NoError(t, err)
844+
defer ln.Close()
845+
}
846+
847+
go s.Serve(ln)
848+
defer s.Shutdown(context.Background())
849+
850+
u, err := base.ParseURL(scheme + "://localhost:8554/teststream")
851+
require.NoError(t, err)
852+
853+
c := Client{
854+
Scheme: u.Scheme,
855+
Host: u.Host,
856+
Tunnel: TunnelWebSocket,
857+
TLSConfig: &tls.Config{
858+
InsecureSkipVerify: true,
859+
},
860+
}
861+
862+
err = c.Start()
863+
require.NoError(t, err)
864+
defer c.Close()
865+
866+
_, res, err := c.Describe(u)
867+
require.NoError(t, err)
868+
require.Equal(t, base.StatusOK, res.StatusCode)
773869
})
774870
}
775871
}

client_http_tunnel.go renamed to client_tunnel_http.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,53 +14,53 @@ import (
1414
"github.com/google/uuid"
1515
)
1616

17-
type clientHTTPTunnel struct {
17+
type clientTunnelHTTP struct {
1818
readChan net.Conn
1919
readBuf *bufio.Reader
2020
writeChan net.Conn
2121
}
2222

23-
func (c *clientHTTPTunnel) Read(p []byte) (n int, err error) {
23+
func (c *clientTunnelHTTP) Read(p []byte) (n int, err error) {
2424
return c.readBuf.Read(p)
2525
}
2626

27-
func (c *clientHTTPTunnel) Write(p []byte) (n int, err error) {
27+
func (c *clientTunnelHTTP) Write(p []byte) (n int, err error) {
2828
return c.writeChan.Write([]byte(base64.StdEncoding.EncodeToString(p)))
2929
}
3030

31-
func (c *clientHTTPTunnel) Close() error {
31+
func (c *clientTunnelHTTP) Close() error {
3232
c.readChan.Close()
3333
c.writeChan.Close()
3434
return nil
3535
}
3636

37-
func (c *clientHTTPTunnel) LocalAddr() net.Addr {
37+
func (c *clientTunnelHTTP) LocalAddr() net.Addr {
3838
return c.readChan.LocalAddr()
3939
}
4040

41-
func (c *clientHTTPTunnel) RemoteAddr() net.Addr {
41+
func (c *clientTunnelHTTP) RemoteAddr() net.Addr {
4242
return c.readChan.RemoteAddr()
4343
}
4444

45-
func (c *clientHTTPTunnel) SetDeadline(_ time.Time) error {
45+
func (c *clientTunnelHTTP) SetDeadline(_ time.Time) error {
4646
panic("unimplemented")
4747
}
4848

49-
func (c *clientHTTPTunnel) SetReadDeadline(t time.Time) error {
49+
func (c *clientTunnelHTTP) SetReadDeadline(t time.Time) error {
5050
return c.readChan.SetReadDeadline(t)
5151
}
5252

53-
func (c *clientHTTPTunnel) SetWriteDeadline(t time.Time) error {
53+
func (c *clientTunnelHTTP) SetWriteDeadline(t time.Time) error {
5454
return c.writeChan.SetWriteDeadline(t)
5555
}
5656

57-
func newClientHTTPTunnel(
57+
func newClientTunnelHTTP(
5858
ctx context.Context,
5959
dialContext func(ctx context.Context, network, address string) (net.Conn, error),
6060
addr string,
6161
tlsConfig *tls.Config,
6262
) (net.Conn, error) {
63-
c := &clientHTTPTunnel{}
63+
c := &clientTunnelHTTP{}
6464

6565
var err error
6666
c.readChan, err = dialContext(ctx, "tcp", addr)

client_tunnel_websocket.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package gortsplib
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"io"
7+
"net"
8+
"time"
9+
10+
"github.com/gorilla/websocket"
11+
)
12+
13+
type clientTunnelWebSocket struct {
14+
wconn *websocket.Conn
15+
r io.Reader
16+
w io.Writer
17+
}
18+
19+
func (tu *clientTunnelWebSocket) Read(b []byte) (int, error) {
20+
return tu.r.Read(b)
21+
}
22+
23+
func (tu *clientTunnelWebSocket) Write(b []byte) (int, error) {
24+
return tu.w.Write(b)
25+
}
26+
27+
func (tu *clientTunnelWebSocket) Close() error {
28+
return tu.wconn.Close()
29+
}
30+
31+
func (tu *clientTunnelWebSocket) LocalAddr() net.Addr {
32+
return tu.wconn.LocalAddr()
33+
}
34+
35+
func (tu *clientTunnelWebSocket) RemoteAddr() net.Addr {
36+
return tu.wconn.RemoteAddr()
37+
}
38+
39+
func (tu *clientTunnelWebSocket) SetDeadline(_ time.Time) error {
40+
return nil
41+
}
42+
43+
func (tu *clientTunnelWebSocket) SetReadDeadline(t time.Time) error {
44+
return tu.wconn.SetReadDeadline(t)
45+
}
46+
47+
func (tu *clientTunnelWebSocket) SetWriteDeadline(t time.Time) error {
48+
return tu.wconn.SetWriteDeadline(t)
49+
}
50+
51+
func newClientTunnelWebSocket(
52+
ctx context.Context,
53+
dialContext func(ctx context.Context, network, address string) (net.Conn, error),
54+
addr string,
55+
tlsConfig *tls.Config,
56+
) (net.Conn, error) {
57+
c := &clientTunnelWebSocket{}
58+
59+
var ur string
60+
if tlsConfig != nil {
61+
ur = "wss"
62+
} else {
63+
ur = "ws"
64+
}
65+
ur += "://" + addr + "/"
66+
67+
var err error
68+
c.wconn, _, err = (&websocket.Dialer{
69+
NetDialContext: dialContext,
70+
TLSClientConfig: tlsConfig,
71+
Subprotocols: []string{"rtsp.onvif.org"},
72+
}).DialContext(ctx, ur, nil) //nolint:bodyclose
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
c.r = &wsReader{wc: c.wconn}
78+
c.w = &wsWriter{wc: c.wconn}
79+
80+
return c, nil
81+
}

conn_transport.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ type Tunnel int
77
const (
88
TunnelNone Tunnel = iota
99
TunnelHTTP
10+
TunnelWebSocket
1011
)
1112

1213
// ConnTransport contains details about the transport of a connection.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.24.0
55
require (
66
github.com/bluenviron/mediacommon/v2 v2.4.2
77
github.com/google/uuid v1.6.0
8+
github.com/gorilla/websocket v1.5.3
89
github.com/pion/rtcp v1.2.15
910
github.com/pion/rtp v1.8.22
1011
github.com/pion/sdp/v3 v3.0.16

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
99
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1010
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
1111
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
12+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
13+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
1214
github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8=
1315
github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so=
1416
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=

internal/teste2e/client_vs_server_test.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,22 @@ func TestClientVsServer(t *testing.T) {
132132
readerProto: "tcp",
133133
readerTunnel: "http",
134134
},
135+
{
136+
publisherScheme: "rtsp",
137+
publisherProto: "tcp",
138+
publisherTunnel: "websocket",
139+
readerScheme: "rtsp",
140+
readerProto: "udp",
141+
readerTunnel: "none",
142+
},
143+
{
144+
publisherScheme: "rtsp",
145+
publisherProto: "tcp",
146+
publisherTunnel: "none",
147+
readerScheme: "rtsp",
148+
readerProto: "tcp",
149+
readerTunnel: "websocket",
150+
},
135151
} {
136152
t.Run(strings.Join([]string{
137153
ca.publisherScheme,
@@ -166,9 +182,12 @@ func TestClientVsServer(t *testing.T) {
166182
}
167183

168184
var publisherTunnel gortsplib.Tunnel
169-
if ca.publisherTunnel == "http" {
185+
switch ca.publisherTunnel {
186+
case "http":
170187
publisherTunnel = gortsplib.TunnelHTTP
171-
} else {
188+
case "websocket":
189+
publisherTunnel = gortsplib.TunnelWebSocket
190+
default:
172191
publisherTunnel = gortsplib.TunnelNone
173192
}
174193

@@ -192,9 +211,12 @@ func TestClientVsServer(t *testing.T) {
192211
time.Sleep(1 * time.Second)
193212

194213
var readerTunnel gortsplib.Tunnel
195-
if ca.readerTunnel == "http" {
214+
switch ca.readerTunnel {
215+
case "http":
196216
readerTunnel = gortsplib.TunnelHTTP
197-
} else {
217+
case "websocket":
218+
readerTunnel = gortsplib.TunnelWebSocket
219+
default:
198220
readerTunnel = gortsplib.TunnelNone
199221
}
200222

server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func (s *Server) runInner() error {
420420
sc := &ServerConn{
421421
s: s,
422422
nconn: nconn,
423-
isHTTP: true,
423+
tunnel: TunnelHTTP,
424424
}
425425
sc.initialize()
426426
s.conns[sc] = struct{}{}

0 commit comments

Comments
 (0)