Skip to content

rpcs: simplify API for BlockService to handle multiple HTTP paths #5718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 26, 2024
7 changes: 7 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ func (b *basicRPCNode) RegisterHTTPHandler(path string, handler http.Handler) {
b.rmux.Handle(path, handler)
}

func (b *basicRPCNode) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
if b.rmux == nil {
b.rmux = mux.NewRouter()
}
b.rmux.HandleFunc(path, handler)
}

func (b *basicRPCNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {
}

Expand Down
2 changes: 1 addition & 1 deletion catchup/pref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.TestingLog(b), config.GetDefaultLocal(), remote, net, "test genesisID")
nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down
22 changes: 11 additions & 11 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestServiceFetchBlocksSameRange(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestSyncRound(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestPeriodicSync(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestServiceFetchBlocksOneBlock(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestAbruptWrites(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestServiceFetchBlocksMalformed(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -709,7 +709,7 @@ func helperTestOnSwitchToUnSupportedProtocol(
ls := rpcs.MakeBlockService(logging.Base(), config, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -932,7 +932,7 @@ func TestCatchupUnmatchedCertificate(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestServiceLedgerUnavailable(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -1110,7 +1110,7 @@ func TestServiceNoBlockForRound(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down
2 changes: 1 addition & 1 deletion catchup/universalFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestUGetBlockHTTP(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down
4 changes: 4 additions & 0 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (network *MockNetwork) ClearProcessors() {
func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}

// RegisterHTTPHandlerFunc - empty implementation
func (network *MockNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
}

// OnNetworkAdvance - empty implementation
func (network *MockNetwork) OnNetworkAdvance() {}

Expand Down
3 changes: 2 additions & 1 deletion network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ type GossipNode interface {
Disconnect(badnode DisconnectablePeer)
DisconnectPeers() // only used by testing

// RegisterHTTPHandler path accepts gorilla/mux path annotations
// RegisterHTTPHandler and RegisterHTTPHandlerFunc: path accepts gorilla/mux path annotations
RegisterHTTPHandler(path string, handler http.Handler)
RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request))

// RequestConnectOutgoing asks the system to actually connect to peers.
// `replace` optionally drops existing connections before making new ones.
Expand Down
6 changes: 6 additions & 0 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
n.wsNetwork.RegisterHTTPHandler(path, handler)
}

// RegisterHTTPHandlerFunc implements GossipNode
func (n *HybridP2PNetwork) RegisterHTTPHandlerFunc(path string, handlerFunc func(http.ResponseWriter, *http.Request)) {
n.p2pNetwork.RegisterHTTPHandlerFunc(path, handlerFunc)
n.wsNetwork.RegisterHTTPHandlerFunc(path, handlerFunc)

Check warning on line 152 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L150-L152

Added lines #L150 - L152 were not covered by tests
}

// RequestConnectOutgoing implements GossipNode
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {}

Expand Down
8 changes: 8 additions & 0 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@
})
}

// RegisterHTTPHandlerFunc registers a http handler with a given path.
func (s *HTTPServer) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
s.p2phttpMux.HandleFunc(path, handler)
s.p2phttpMuxRegistrarOnce.Do(func() {
s.Host.SetHTTPHandlerAtPath(algorandP2pHTTPProtocol, "/", s.p2phttpMux)
})

Check warning on line 65 in network/p2p/http.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/http.go#L61-L65

Added lines #L61 - L65 were not covered by tests
}

// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
Expand Down
6 changes: 6 additions & 0 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,12 @@
n.httpServer.RegisterHTTPHandler(path, handler)
}

// RegisterHTTPHandlerFunc is like RegisterHTTPHandler but accepts
// a callback handler function instead of a method receiver.
func (n *P2PNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
n.httpServer.RegisterHTTPHandlerFunc(path, handler)

Check warning on line 583 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L582-L583

Added lines #L582 - L583 were not covered by tests
}

// RequestConnectOutgoing asks the system to actually connect to peers.
// `replace` optionally drops existing connections before making new ones.
// `quit` chan allows cancellation.
Expand Down
5 changes: 5 additions & 0 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@
wn.router.Handle(path, handler)
}

// RegisterHTTPHandlerFunc path accepts gorilla/mux path annotations
func (wn *WebsocketNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
wn.router.HandleFunc(path, handler)

Check warning on line 528 in network/wsNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/wsNetwork.go#L527-L528

Added lines #L527 - L528 were not covered by tests
}

// RequestConnectOutgoing tries to actually do the connect to new peers.
// `replace` drop all connections first and find new peers.
func (wn *WebsocketNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {
Expand Down
17 changes: 11 additions & 6 deletions rpcs/blockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
const BlockResponseLatestRoundHeader = "X-Latest-Round"

// BlockServiceBlockPath is the path to register BlockService as a handler for when using gorilla/mux
// e.g. .Handle(BlockServiceBlockPath, &ls)
// e.g. .HandleFunc(BlockServiceBlockPath, ls.ServeBlockPath)
const BlockServiceBlockPath = "/v{version:[0-9.]+}/{genesisID}/block/{round:[0-9a-z]+}"

// Constant strings used as keys for topics
Expand Down Expand Up @@ -147,11 +147,16 @@
memoryCap: config.BlockServiceMemCap,
}
if service.enableService {
net.RegisterHTTPHandler(BlockServiceBlockPath, service)
service.RegisterHandlers(net)

Check warning on line 150 in rpcs/blockService.go

View check run for this annotation

Codecov / codecov/patch

rpcs/blockService.go#L150

Added line #L150 was not covered by tests
}
return service
}

// RegisterHandlers registers the request handlers for BlockService's paths with the registrar.
func (bs *BlockService) RegisterHandlers(registrar Registrar) {
registrar.RegisterHTTPHandlerFunc(BlockServiceBlockPath, bs.ServeBlockPath)
}

// Start listening to catchup requests over ws
func (bs *BlockService) Start() {
bs.mu.Lock()
Expand Down Expand Up @@ -179,10 +184,10 @@
bs.closeWaitGroup.Wait()
}

// ServerHTTP returns blocks
// ServeBlockPath returns blocks
// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version}
// Uses gorilla/mux for path argument parsing.
func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) {
func (bs *BlockService) ServeBlockPath(response http.ResponseWriter, request *http.Request) {
pathVars := mux.Vars(request)
versionStr, hasVersionStr := pathVars["version"]
roundStr, hasRoundStr := pathVars["round"]
Expand Down Expand Up @@ -260,13 +265,13 @@
if !ok {
response.Header().Set("Retry-After", blockResponseRetryAfter)
response.WriteHeader(http.StatusServiceUnavailable)
bs.log.Debugf("ServeHTTP: returned retry-after: %v", err)
bs.log.Debugf("ServeBlockPath: returned retry-after: %v", err)
}
httpBlockMessagesDroppedCounter.Inc(nil)
return
default:
// unexpected error.
bs.log.Warnf("ServeHTTP : failed to retrieve block %d %v", round, err)
bs.log.Warnf("ServeBlockPath: failed to retrieve block %d %v", round, err)

Check warning on line 274 in rpcs/blockService.go

View check run for this annotation

Codecov / codecov/patch

rpcs/blockService.go#L274

Added line #L274 was not covered by tests
response.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
19 changes: 9 additions & 10 deletions rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func TestRedirectFallbackEndpoints(t *testing.T) {
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID")

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
bs1.RegisterHandlers(nodeA)
bs2.RegisterHandlers(nodeB)

parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
require.NoError(t, err)
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestBlockServiceShutdown(t *testing.T) {

nodeA := &basicRPCNode{}

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
bs1.RegisterHandlers(nodeA)
nodeA.start()
defer nodeA.stop()

Expand Down Expand Up @@ -292,9 +292,8 @@ func TestRedirectOnFullCapacity(t *testing.T) {
bs1.memoryCap = 250
bs2.memoryCap = 250

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)

nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
bs1.RegisterHandlers(nodeA)
bs2.RegisterHandlers(nodeB)

parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
require.NoError(t, err)
Expand Down Expand Up @@ -371,11 +370,11 @@ forloop:

// First node redirects, does not return retry
require.True(t, strings.Contains(logBuffer1.String(), "redirectRequest: redirected block request to"))
require.False(t, strings.Contains(logBuffer1.String(), "ServeHTTP: returned retry-after: block service memory over capacity"))
require.False(t, strings.Contains(logBuffer1.String(), "ServeBlockPath: returned retry-after: block service memory over capacity"))

// Second node cannot redirect, it returns retry-after when over capacity
require.False(t, strings.Contains(logBuffer2.String(), "redirectRequest: redirected block request to"))
require.True(t, strings.Contains(logBuffer2.String(), "ServeHTTP: returned retry-after: block service memory over capacity"))
require.True(t, strings.Contains(logBuffer2.String(), "ServeBlockPath: returned retry-after: block service memory over capacity"))
}

// TestWsBlockLimiting ensures that limits are applied correctly on the websocket side of the service
Expand Down Expand Up @@ -474,8 +473,8 @@ func TestRedirectExceptions(t *testing.T) {
bs1 := MakeBlockService(log1, configInvalidRedirects, ledger1, net1, "{genesisID}")
bs2 := MakeBlockService(log2, configWithRedirectToSelf, ledger2, net2, "{genesisID}")

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
bs1.RegisterHandlers(nodeA)
bs2.RegisterHandlers(nodeB)

parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions rpcs/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
type Registrar interface {
// RegisterHTTPHandler path accepts gorilla/mux path annotations
RegisterHTTPHandler(path string, handler http.Handler)
// RegisterHTTPHandlerFunc path accepts gorilla/mux path annotations and a HandlerFunc
RegisterHTTPHandlerFunc(path string, handler func(response http.ResponseWriter, request *http.Request))
// RegisterHandlers exposes global websocket handler registration
RegisterHandlers(dispatch []network.TaggedMessageHandler)
}
7 changes: 7 additions & 0 deletions rpcs/txService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func (b *basicRPCNode) RegisterHTTPHandler(path string, handler http.Handler) {
b.rmux.Handle(path, handler)
}

func (b *basicRPCNode) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
if b.rmux == nil {
b.rmux = mux.NewRouter()
}
b.rmux.HandleFunc(path, handler)
}

func (b *basicRPCNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {
}

Expand Down
Loading