Skip to content

node: allow websocket and HTTP on the same port #20810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
0ee8a31
errors in comments for http.go
renaynay Feb 28, 2020
b8aaf24
consistency
renaynay Feb 28, 2020
7b81c6c
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 1, 2020
8911212
correcting typos in documentation for subscription.go
renaynay Mar 1, 2020
94e2d2a
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 11, 2020
1a1aaaf
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 12, 2020
1f7c7d0
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 16, 2020
1fc5827
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 16, 2020
a3204ee
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 23, 2020
aef6981
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 23, 2020
991237a
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 24, 2020
e13d78d
dirty functional implementation, some tests broken, some commented out
renaynay Mar 24, 2020
6becdde
added space
renaynay Mar 24, 2020
46eb11c
handler stack creation moved from rpc to node pkg
renaynay Mar 24, 2020
62bb958
fixed retesteth
renaynay Mar 24, 2020
fdff70c
fixed some TODOs
renaynay Mar 24, 2020
5eb0146
moved whitelist generation and api registration into separate method
renaynay Mar 24, 2020
307f70b
Merge pull request #3 from renaynay/handle-ws-only-if-specified
renaynay Mar 24, 2020
631488c
Merge pull request #2 from renaynay/handler-stack
renaynay Mar 24, 2020
339ba89
move handler creation to pkg node, remove deprecated function
renaynay Mar 25, 2020
8adb2f2
updating retesteth.go to remove adding the ws handler
renaynay Mar 25, 2020
eced2db
remove gzip file
renaynay Mar 25, 2020
74cd4f2
dont overwrite registering apis
renaynay Mar 25, 2020
8766f6a
removing some unnecessary whitespace
renaynay Mar 25, 2020
a87383b
fixed merge conflict
renaynay Mar 25, 2020
3131d6e
Merge pull request #4 from renaynay/node-handles-handler-creation
renaynay Mar 25, 2020
3de74d2
fixed some errors related to crashing travis build
renaynay Mar 25, 2020
a305f5a
removing unnecessary function
renaynay Mar 25, 2020
440ccbe
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 26, 2020
ba2c128
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 27, 2020
c8c18ba
fixed a typo in graphql service
renaynay Mar 27, 2020
04bae1d
Merge branch 'master' into multiplex-ws-rpc
renaynay Mar 27, 2020
4605994
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 27, 2020
2321f64
Merge branch 'master' into multiplex-ws-rpc
renaynay Mar 27, 2020
142b5f7
some docs
renaynay Mar 30, 2020
ef49e55
Merge branch 'master' of github.com:ethereum/go-ethereum
renaynay Mar 30, 2020
2538a69
Merge branch 'master' into multiplex-ws-rpc
renaynay Mar 30, 2020
a08ccfe
graphql works with new http server setup (#7)
renaynay Mar 30, 2020
aab171f
Matches the startup process of websocket server to that of http serve…
renaynay Mar 30, 2020
bdd1091
using rpc.CheckTimeouts instead of duplicate timeout check
renaynay Mar 30, 2020
51c6d69
Tests for handling websocket and http requests on the same server (#6)
renaynay Mar 30, 2020
03172fa
fixed incorrect call to register apis from whitelist in clef/main.go
renaynay Mar 30, 2020
9651389
added error check
renaynay Mar 30, 2020
4261aea
fixed import issue
renaynay Mar 30, 2020
c4f6bfc
removed some unnecessary whitespace
renaynay Mar 30, 2020
f9f85c4
removed some todos and fixed comment according to martins suggestion
renaynay Mar 30, 2020
e4d31d2
fixed using a port thats already in use for a test
renaynay Mar 31, 2020
1772c8c
move http endpoint startup from rpc to node pkg (#9)
renaynay Mar 31, 2020
7ce555d
removed todos and changed http port in test once again to check travi…
renaynay Mar 31, 2020
6e135ec
tests were attempting to restart server, fixed
renaynay Mar 31, 2020
19f658d
deleting whitespace and re-ordering args for test function
renaynay Apr 8, 2020
eebca1d
removed unnecessary channel / goroutine from ws and http tests
renaynay Apr 8, 2020
4e71c2a
cmd/geth: fix whitespace
fjl Apr 8, 2020
1fb1018
cmd/clef: fix whitespace
fjl Apr 8, 2020
1751298
cmd/clef: fix lint
fjl Apr 8, 2020
97d89ca
cmd/geth: fix lint
fjl Apr 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/clef/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,16 @@ func signer(c *cli.Context) error {
vhosts := splitAndTrim(c.GlobalString(utils.RPCVirtualHostsFlag.Name))
cors := splitAndTrim(c.GlobalString(utils.RPCCORSDomainFlag.Name))

srv := rpc.NewServer()
err := node.RegisterApisFromWhitelist(rpcAPI, []string{"account"}, srv, false)
if err != nil {
utils.Fatalf("Could not register API: %w", err)
}
handler := node.NewHTTPHandlerStack(srv, cors, vhosts)

// start http server
httpEndpoint := fmt.Sprintf("%s:%d", c.GlobalString(utils.RPCListenAddrFlag.Name), c.Int(rpcPortFlag.Name))
listener, _, err := rpc.StartHTTPEndpoint(httpEndpoint, rpcAPI, []string{"account"}, cors, vhosts, rpc.DefaultHTTPTimeouts)
listener, err := node.StartHTTPEndpoint(httpEndpoint, rpc.DefaultHTTPTimeouts, handler)
if err != nil {
utils.Fatalf("Could not start RPC api: %v", err)
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/geth/retesteth.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,14 +888,22 @@ func retesteth(ctx *cli.Context) error {
vhosts := splitAndTrim(ctx.GlobalString(utils.RPCVirtualHostsFlag.Name))
cors := splitAndTrim(ctx.GlobalString(utils.RPCCORSDomainFlag.Name))

// register apis and create handler stack
srv := rpc.NewServer()
err := node.RegisterApisFromWhitelist(rpcAPI, []string{"test", "eth", "debug", "web3"}, srv, false)
if err != nil {
utils.Fatalf("Could not register RPC apis: %w", err)
}
handler := node.NewHTTPHandlerStack(srv, cors, vhosts)

// start http server
var RetestethHTTPTimeouts = rpc.HTTPTimeouts{
ReadTimeout: 120 * time.Second,
WriteTimeout: 120 * time.Second,
IdleTimeout: 120 * time.Second,
}
httpEndpoint := fmt.Sprintf("%s:%d", ctx.GlobalString(utils.RPCListenAddrFlag.Name), ctx.Int(rpcPortFlag.Name))
listener, _, err := rpc.StartHTTPEndpoint(httpEndpoint, rpcAPI, []string{"test", "eth", "debug", "web3"}, cors, vhosts, RetestethHTTPTimeouts)
listener, err := node.StartHTTPEndpoint(httpEndpoint, RetestethHTTPTimeouts, handler)
if err != nil {
utils.Fatalf("Could not start RPC api: %v", err)
}
Expand Down
14 changes: 13 additions & 1 deletion graphql/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/graph-gophers/graphql-go"
Expand Down Expand Up @@ -68,7 +69,18 @@ func (s *Service) Start(server *p2p.Server) error {
if s.listener, err = net.Listen("tcp", s.endpoint); err != nil {
return err
}
go rpc.NewHTTPServer(s.cors, s.vhosts, s.timeouts, s.handler).Serve(s.listener)
// create handler stack and wrap the graphql handler
handler := node.NewHTTPHandlerStack(s.handler, s.cors, s.vhosts)
// make sure timeout values are meaningful
node.CheckTimeouts(&s.timeouts)
// create http server
httpSrv := &http.Server{
Handler: handler,
ReadTimeout: s.timeouts.ReadTimeout,
WriteTimeout: s.timeouts.WriteTimeout,
IdleTimeout: s.timeouts.IdleTimeout,
}
go httpSrv.Serve(s.listener)
log.Info("GraphQL endpoint opened", "url", fmt.Sprintf("http://%s", s.endpoint))
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
}
}

if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts, api.node.config.HTTPTimeouts); err != nil {
if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts, api.node.config.HTTPTimeouts, api.node.config.WSOrigins); err != nil {
return false, err
}
return true, nil
Expand Down
99 changes: 99 additions & 0 deletions node/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package node

import (
"net"
"net/http"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

// StartHTTPEndpoint starts the HTTP RPC endpoint.
func StartHTTPEndpoint(endpoint string, timeouts rpc.HTTPTimeouts, handler http.Handler) (net.Listener, error) {
// start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, err
}
// make sure timeout values are meaningful
CheckTimeouts(&timeouts)
// Bundle and start the HTTP server
httpSrv := &http.Server{
Handler: handler,
ReadTimeout: timeouts.ReadTimeout,
WriteTimeout: timeouts.WriteTimeout,
IdleTimeout: timeouts.IdleTimeout,
}
go httpSrv.Serve(listener)
return listener, err
}

// startWSEndpoint starts a websocket endpoint.
func startWSEndpoint(endpoint string, handler http.Handler) (net.Listener, error) {
// start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, err
}
wsSrv := &http.Server{Handler: handler}
go wsSrv.Serve(listener)
return listener, err
}

// checkModuleAvailability checks that all names given in modules are actually
// available API services. It assumes that the MetadataApi module ("rpc") is always available;
// the registration of this "rpc" module happens in NewServer() and is thus common to all endpoints.
func checkModuleAvailability(modules []string, apis []rpc.API) (bad, available []string) {
availableSet := make(map[string]struct{})
for _, api := range apis {
if _, ok := availableSet[api.Namespace]; !ok {
availableSet[api.Namespace] = struct{}{}
available = append(available, api.Namespace)
}
}
for _, name := range modules {
if _, ok := availableSet[name]; !ok && name != rpc.MetadataApi {
bad = append(bad, name)
}
}
return bad, available
}

// CheckTimeouts ensures that timeout values are meaningful
func CheckTimeouts(timeouts *rpc.HTTPTimeouts) {
if timeouts.ReadTimeout < time.Second {
log.Warn("Sanitizing invalid HTTP read timeout", "provided", timeouts.ReadTimeout, "updated", rpc.DefaultHTTPTimeouts.ReadTimeout)
timeouts.ReadTimeout = rpc.DefaultHTTPTimeouts.ReadTimeout
}
if timeouts.WriteTimeout < time.Second {
log.Warn("Sanitizing invalid HTTP write timeout", "provided", timeouts.WriteTimeout, "updated", rpc.DefaultHTTPTimeouts.WriteTimeout)
timeouts.WriteTimeout = rpc.DefaultHTTPTimeouts.WriteTimeout
}
if timeouts.IdleTimeout < time.Second {
log.Warn("Sanitizing invalid HTTP idle timeout", "provided", timeouts.IdleTimeout, "updated", rpc.DefaultHTTPTimeouts.IdleTimeout)
timeouts.IdleTimeout = rpc.DefaultHTTPTimeouts.IdleTimeout
}
}
69 changes: 58 additions & 11 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,21 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts, n.config.HTTPTimeouts); err != nil {
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts, n.config.HTTPTimeouts, n.config.WSOrigins); err != nil {
n.stopIPC()
n.stopInProc()
return err
}
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
n.stopHTTP()
n.stopIPC()
n.stopInProc()
return err
// if endpoints are not the same, start separate servers
if n.httpEndpoint != n.wsEndpoint {
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
n.stopHTTP()
n.stopIPC()
n.stopInProc()
return err
}
}

// All API endpoints started successfully
n.rpcAPIs = apis
return nil
Expand Down Expand Up @@ -359,22 +363,36 @@ func (n *Node) stopIPC() {
}

// startHTTP initializes and starts the HTTP RPC endpoint.
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error {
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts, wsOrigins []string) error {
// Short circuit if the HTTP endpoint isn't being exposed
if endpoint == "" {
return nil
}
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts)
// register apis and create handler stack
srv := rpc.NewServer()
err := RegisterApisFromWhitelist(apis, modules, srv, false)
if err != nil {
return err
}
handler := NewHTTPHandlerStack(srv, cors, vhosts)
// wrap handler in websocket handler only if websocket port is the same as http rpc
if n.httpEndpoint == n.wsEndpoint {
handler = NewWebsocketUpgradeHandler(handler, srv.WebsocketHandler(wsOrigins))
}
listener, err := StartHTTPEndpoint(endpoint, timeouts, handler)
if err != nil {
return err
}
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", listener.Addr()),
"cors", strings.Join(cors, ","),
"vhosts", strings.Join(vhosts, ","))
if n.httpEndpoint == n.wsEndpoint {
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", listener.Addr()))
}
// All listeners booted successfully
n.httpEndpoint = endpoint
n.httpListener = listener
n.httpHandler = handler
n.httpHandler = srv

return nil
}
Expand All @@ -399,15 +417,22 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
if endpoint == "" {
return nil
}
listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll)

srv := rpc.NewServer()
handler := srv.WebsocketHandler(wsOrigins)
err := RegisterApisFromWhitelist(apis, modules, srv, exposeAll)
if err != nil {
return err
}
listener, err := startWSEndpoint(endpoint, handler)
if err != nil {
return err
}
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
n.wsListener = listener
n.wsHandler = handler
n.wsHandler = srv

return nil
}
Expand Down Expand Up @@ -664,3 +689,25 @@ func (n *Node) apis() []rpc.API {
},
}
}

// RegisterApisFromWhitelist checks the given modules' availability, generates a whitelist based on the allowed modules,
// and then registers all of the APIs exposed by the services.
func RegisterApisFromWhitelist(apis []rpc.API, modules []string, srv *rpc.Server, exposeAll bool) error {
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available)
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
for _, api := range apis {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := srv.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
}
}
return nil
}
58 changes: 58 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package node
import (
"errors"
"io/ioutil"
"net/http"
"os"
"reflect"
"testing"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"

"github.com/stretchr/testify/assert"
)

var (
Expand Down Expand Up @@ -597,3 +600,58 @@ func TestAPIGather(t *testing.T) {
}
}
}

func TestWebsocketHTTPOnSamePort_WebsocketRequest(t *testing.T) {
node := startHTTP(t)
defer node.stopHTTP()

wsReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:7453", nil)
if err != nil {
t.Error("could not issue new http request ", err)
}
wsReq.Header.Set("Connection", "upgrade")
wsReq.Header.Set("Upgrade", "websocket")
wsReq.Header.Set("Sec-WebSocket-Version", "13")
wsReq.Header.Set("Sec-Websocket-Key", "SGVsbG8sIHdvcmxkIQ==")

resp := doHTTPRequest(t, wsReq)
assert.Equal(t, "websocket", resp.Header.Get("Upgrade"))
}

func TestWebsocketHTTPOnSamePort_HTTPRequest(t *testing.T) {
node := startHTTP(t)
defer node.stopHTTP()

httpReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:7453", nil)
if err != nil {
t.Error("could not issue new http request ", err)
}
httpReq.Header.Set("Accept-Encoding", "gzip")

resp := doHTTPRequest(t, httpReq)
assert.Equal(t, "gzip", resp.Header.Get("Content-Encoding"))
}

func startHTTP(t *testing.T) *Node {
conf := &Config{HTTPPort: 7453, WSPort: 7453}
node, err := New(conf)
if err != nil {
t.Error("could not create a new node ", err)
}

err = node.startHTTP("127.0.0.1:7453", []rpc.API{}, []string{}, []string{}, []string{}, rpc.HTTPTimeouts{}, []string{})
if err != nil {
t.Error("could not start http service on node ", err)
}

return node
}

func doHTTPRequest(t *testing.T, req *http.Request) *http.Response {
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Error("could not issue a GET request to the given endpoint", err)
}
return resp
}
Loading