Skip to content

Commit 07d909f

Browse files
authored
node: allow websocket and HTTP on the same port (#20810)
This change makes it possible to run geth with JSON-RPC over HTTP and WebSocket on the same TCP port. The default port for WebSocket is still 8546. geth --rpc --rpcport 8545 --ws --wsport 8545 This also removes a lot of deprecated API surface from package rpc. The rpc package is now purely about serving JSON-RPC and no longer provides a way to start an HTTP server.
1 parent 5065cde commit 07d909f

File tree

13 files changed

+443
-268
lines changed

13 files changed

+443
-268
lines changed

cmd/clef/main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,9 +583,16 @@ func signer(c *cli.Context) error {
583583
vhosts := splitAndTrim(c.GlobalString(utils.RPCVirtualHostsFlag.Name))
584584
cors := splitAndTrim(c.GlobalString(utils.RPCCORSDomainFlag.Name))
585585

586+
srv := rpc.NewServer()
587+
err := node.RegisterApisFromWhitelist(rpcAPI, []string{"account"}, srv, false)
588+
if err != nil {
589+
utils.Fatalf("Could not register API: %w", err)
590+
}
591+
handler := node.NewHTTPHandlerStack(srv, cors, vhosts)
592+
586593
// start http server
587594
httpEndpoint := fmt.Sprintf("%s:%d", c.GlobalString(utils.RPCListenAddrFlag.Name), c.Int(rpcPortFlag.Name))
588-
listener, _, err := rpc.StartHTTPEndpoint(httpEndpoint, rpcAPI, []string{"account"}, cors, vhosts, rpc.DefaultHTTPTimeouts)
595+
listener, err := node.StartHTTPEndpoint(httpEndpoint, rpc.DefaultHTTPTimeouts, handler)
589596
if err != nil {
590597
utils.Fatalf("Could not start RPC api: %v", err)
591598
}

cmd/geth/retesteth.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -890,14 +890,22 @@ func retesteth(ctx *cli.Context) error {
890890
vhosts := splitAndTrim(ctx.GlobalString(utils.RPCVirtualHostsFlag.Name))
891891
cors := splitAndTrim(ctx.GlobalString(utils.RPCCORSDomainFlag.Name))
892892

893+
// register apis and create handler stack
894+
srv := rpc.NewServer()
895+
err := node.RegisterApisFromWhitelist(rpcAPI, []string{"test", "eth", "debug", "web3"}, srv, false)
896+
if err != nil {
897+
utils.Fatalf("Could not register RPC apis: %w", err)
898+
}
899+
handler := node.NewHTTPHandlerStack(srv, cors, vhosts)
900+
893901
// start http server
894902
var RetestethHTTPTimeouts = rpc.HTTPTimeouts{
895903
ReadTimeout: 120 * time.Second,
896904
WriteTimeout: 120 * time.Second,
897905
IdleTimeout: 120 * time.Second,
898906
}
899907
httpEndpoint := fmt.Sprintf("%s:%d", ctx.GlobalString(utils.RPCListenAddrFlag.Name), ctx.Int(rpcPortFlag.Name))
900-
listener, _, err := rpc.StartHTTPEndpoint(httpEndpoint, rpcAPI, []string{"test", "eth", "debug", "web3"}, cors, vhosts, RetestethHTTPTimeouts)
908+
listener, err := node.StartHTTPEndpoint(httpEndpoint, RetestethHTTPTimeouts, handler)
901909
if err != nil {
902910
utils.Fatalf("Could not start RPC api: %v", err)
903911
}

graphql/service.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/ethereum/go-ethereum/internal/ethapi"
2525
"github.com/ethereum/go-ethereum/log"
26+
"github.com/ethereum/go-ethereum/node"
2627
"github.com/ethereum/go-ethereum/p2p"
2728
"github.com/ethereum/go-ethereum/rpc"
2829
"github.com/graph-gophers/graphql-go"
@@ -68,7 +69,18 @@ func (s *Service) Start(server *p2p.Server) error {
6869
if s.listener, err = net.Listen("tcp", s.endpoint); err != nil {
6970
return err
7071
}
71-
go rpc.NewHTTPServer(s.cors, s.vhosts, s.timeouts, s.handler).Serve(s.listener)
72+
// create handler stack and wrap the graphql handler
73+
handler := node.NewHTTPHandlerStack(s.handler, s.cors, s.vhosts)
74+
// make sure timeout values are meaningful
75+
node.CheckTimeouts(&s.timeouts)
76+
// create http server
77+
httpSrv := &http.Server{
78+
Handler: handler,
79+
ReadTimeout: s.timeouts.ReadTimeout,
80+
WriteTimeout: s.timeouts.WriteTimeout,
81+
IdleTimeout: s.timeouts.IdleTimeout,
82+
}
83+
go httpSrv.Serve(s.listener)
7284
log.Info("GraphQL endpoint opened", "url", fmt.Sprintf("http://%s", s.endpoint))
7385
return nil
7486
}

node/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
186186
}
187187
}
188188

189-
if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts, api.node.config.HTTPTimeouts); err != nil {
189+
if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts, api.node.config.HTTPTimeouts, api.node.config.WSOrigins); err != nil {
190190
return false, err
191191
}
192192
return true, nil

node/endpoints.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2018 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package node
18+
19+
import (
20+
"net"
21+
"net/http"
22+
"time"
23+
24+
"github.com/ethereum/go-ethereum/log"
25+
"github.com/ethereum/go-ethereum/rpc"
26+
)
27+
28+
// StartHTTPEndpoint starts the HTTP RPC endpoint.
29+
func StartHTTPEndpoint(endpoint string, timeouts rpc.HTTPTimeouts, handler http.Handler) (net.Listener, error) {
30+
// start the HTTP listener
31+
var (
32+
listener net.Listener
33+
err error
34+
)
35+
if listener, err = net.Listen("tcp", endpoint); err != nil {
36+
return nil, err
37+
}
38+
// make sure timeout values are meaningful
39+
CheckTimeouts(&timeouts)
40+
// Bundle and start the HTTP server
41+
httpSrv := &http.Server{
42+
Handler: handler,
43+
ReadTimeout: timeouts.ReadTimeout,
44+
WriteTimeout: timeouts.WriteTimeout,
45+
IdleTimeout: timeouts.IdleTimeout,
46+
}
47+
go httpSrv.Serve(listener)
48+
return listener, err
49+
}
50+
51+
// startWSEndpoint starts a websocket endpoint.
52+
func startWSEndpoint(endpoint string, handler http.Handler) (net.Listener, error) {
53+
// start the HTTP listener
54+
var (
55+
listener net.Listener
56+
err error
57+
)
58+
if listener, err = net.Listen("tcp", endpoint); err != nil {
59+
return nil, err
60+
}
61+
wsSrv := &http.Server{Handler: handler}
62+
go wsSrv.Serve(listener)
63+
return listener, err
64+
}
65+
66+
// checkModuleAvailability checks that all names given in modules are actually
67+
// available API services. It assumes that the MetadataApi module ("rpc") is always available;
68+
// the registration of this "rpc" module happens in NewServer() and is thus common to all endpoints.
69+
func checkModuleAvailability(modules []string, apis []rpc.API) (bad, available []string) {
70+
availableSet := make(map[string]struct{})
71+
for _, api := range apis {
72+
if _, ok := availableSet[api.Namespace]; !ok {
73+
availableSet[api.Namespace] = struct{}{}
74+
available = append(available, api.Namespace)
75+
}
76+
}
77+
for _, name := range modules {
78+
if _, ok := availableSet[name]; !ok && name != rpc.MetadataApi {
79+
bad = append(bad, name)
80+
}
81+
}
82+
return bad, available
83+
}
84+
85+
// CheckTimeouts ensures that timeout values are meaningful
86+
func CheckTimeouts(timeouts *rpc.HTTPTimeouts) {
87+
if timeouts.ReadTimeout < time.Second {
88+
log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", rpc.DefaultHTTPTimeouts.ReadTimeout)
89+
timeouts.ReadTimeout = rpc.DefaultHTTPTimeouts.ReadTimeout
90+
}
91+
if timeouts.WriteTimeout < time.Second {
92+
log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", rpc.DefaultHTTPTimeouts.WriteTimeout)
93+
timeouts.WriteTimeout = rpc.DefaultHTTPTimeouts.WriteTimeout
94+
}
95+
if timeouts.IdleTimeout < time.Second {
96+
log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", rpc.DefaultHTTPTimeouts.IdleTimeout)
97+
timeouts.IdleTimeout = rpc.DefaultHTTPTimeouts.IdleTimeout
98+
}
99+
}

node/node.go

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,17 +291,21 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
291291
n.stopInProc()
292292
return err
293293
}
294-
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts, n.config.HTTPTimeouts); err != nil {
294+
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts, n.config.HTTPTimeouts, n.config.WSOrigins); err != nil {
295295
n.stopIPC()
296296
n.stopInProc()
297297
return err
298298
}
299-
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
300-
n.stopHTTP()
301-
n.stopIPC()
302-
n.stopInProc()
303-
return err
299+
// if endpoints are not the same, start separate servers
300+
if n.httpEndpoint != n.wsEndpoint {
301+
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
302+
n.stopHTTP()
303+
n.stopIPC()
304+
n.stopInProc()
305+
return err
306+
}
304307
}
308+
305309
// All API endpoints started successfully
306310
n.rpcAPIs = apis
307311
return nil
@@ -359,22 +363,36 @@ func (n *Node) stopIPC() {
359363
}
360364

361365
// startHTTP initializes and starts the HTTP RPC endpoint.
362-
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error {
366+
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts, wsOrigins []string) error {
363367
// Short circuit if the HTTP endpoint isn't being exposed
364368
if endpoint == "" {
365369
return nil
366370
}
367-
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts)
371+
// register apis and create handler stack
372+
srv := rpc.NewServer()
373+
err := RegisterApisFromWhitelist(apis, modules, srv, false)
374+
if err != nil {
375+
return err
376+
}
377+
handler := NewHTTPHandlerStack(srv, cors, vhosts)
378+
// wrap handler in websocket handler only if websocket port is the same as http rpc
379+
if n.httpEndpoint == n.wsEndpoint {
380+
handler = NewWebsocketUpgradeHandler(handler, srv.WebsocketHandler(wsOrigins))
381+
}
382+
listener, err := StartHTTPEndpoint(endpoint, timeouts, handler)
368383
if err != nil {
369384
return err
370385
}
371386
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", listener.Addr()),
372387
"cors", strings.Join(cors, ","),
373388
"vhosts", strings.Join(vhosts, ","))
389+
if n.httpEndpoint == n.wsEndpoint {
390+
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", listener.Addr()))
391+
}
374392
// All listeners booted successfully
375393
n.httpEndpoint = endpoint
376394
n.httpListener = listener
377-
n.httpHandler = handler
395+
n.httpHandler = srv
378396

379397
return nil
380398
}
@@ -399,15 +417,22 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
399417
if endpoint == "" {
400418
return nil
401419
}
402-
listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll)
420+
421+
srv := rpc.NewServer()
422+
handler := srv.WebsocketHandler(wsOrigins)
423+
err := RegisterApisFromWhitelist(apis, modules, srv, exposeAll)
424+
if err != nil {
425+
return err
426+
}
427+
listener, err := startWSEndpoint(endpoint, handler)
403428
if err != nil {
404429
return err
405430
}
406431
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr()))
407432
// All listeners booted successfully
408433
n.wsEndpoint = endpoint
409434
n.wsListener = listener
410-
n.wsHandler = handler
435+
n.wsHandler = srv
411436

412437
return nil
413438
}
@@ -664,3 +689,25 @@ func (n *Node) apis() []rpc.API {
664689
},
665690
}
666691
}
692+
693+
// RegisterApisFromWhitelist checks the given modules' availability, generates a whitelist based on the allowed modules,
694+
// and then registers all of the APIs exposed by the services.
695+
func RegisterApisFromWhitelist(apis []rpc.API, modules []string, srv *rpc.Server, exposeAll bool) error {
696+
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
697+
log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available)
698+
}
699+
// Generate the whitelist based on the allowed modules
700+
whitelist := make(map[string]bool)
701+
for _, module := range modules {
702+
whitelist[module] = true
703+
}
704+
// Register all the APIs exposed by the services
705+
for _, api := range apis {
706+
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
707+
if err := srv.RegisterName(api.Namespace, api.Service); err != nil {
708+
return err
709+
}
710+
}
711+
}
712+
return nil
713+
}

node/node_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package node
1919
import (
2020
"errors"
2121
"io/ioutil"
22+
"net/http"
2223
"os"
2324
"reflect"
2425
"testing"
@@ -27,6 +28,8 @@ import (
2728
"github.com/ethereum/go-ethereum/crypto"
2829
"github.com/ethereum/go-ethereum/p2p"
2930
"github.com/ethereum/go-ethereum/rpc"
31+
32+
"github.com/stretchr/testify/assert"
3033
)
3134

3235
var (
@@ -597,3 +600,58 @@ func TestAPIGather(t *testing.T) {
597600
}
598601
}
599602
}
603+
604+
func TestWebsocketHTTPOnSamePort_WebsocketRequest(t *testing.T) {
605+
node := startHTTP(t)
606+
defer node.stopHTTP()
607+
608+
wsReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:7453", nil)
609+
if err != nil {
610+
t.Error("could not issue new http request ", err)
611+
}
612+
wsReq.Header.Set("Connection", "upgrade")
613+
wsReq.Header.Set("Upgrade", "websocket")
614+
wsReq.Header.Set("Sec-WebSocket-Version", "13")
615+
wsReq.Header.Set("Sec-Websocket-Key", "SGVsbG8sIHdvcmxkIQ==")
616+
617+
resp := doHTTPRequest(t, wsReq)
618+
assert.Equal(t, "websocket", resp.Header.Get("Upgrade"))
619+
}
620+
621+
func TestWebsocketHTTPOnSamePort_HTTPRequest(t *testing.T) {
622+
node := startHTTP(t)
623+
defer node.stopHTTP()
624+
625+
httpReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:7453", nil)
626+
if err != nil {
627+
t.Error("could not issue new http request ", err)
628+
}
629+
httpReq.Header.Set("Accept-Encoding", "gzip")
630+
631+
resp := doHTTPRequest(t, httpReq)
632+
assert.Equal(t, "gzip", resp.Header.Get("Content-Encoding"))
633+
}
634+
635+
func startHTTP(t *testing.T) *Node {
636+
conf := &Config{HTTPPort: 7453, WSPort: 7453}
637+
node, err := New(conf)
638+
if err != nil {
639+
t.Error("could not create a new node ", err)
640+
}
641+
642+
err = node.startHTTP("127.0.0.1:7453", []rpc.API{}, []string{}, []string{}, []string{}, rpc.HTTPTimeouts{}, []string{})
643+
if err != nil {
644+
t.Error("could not start http service on node ", err)
645+
}
646+
647+
return node
648+
}
649+
650+
func doHTTPRequest(t *testing.T, req *http.Request) *http.Response {
651+
client := &http.Client{}
652+
resp, err := client.Do(req)
653+
if err != nil {
654+
t.Error("could not issue a GET request to the given endpoint", err)
655+
}
656+
return resp
657+
}

0 commit comments

Comments
 (0)