Skip to content

Commit 157a379

Browse files
separate lock from common grid to avoid epoll contention
epoll contention on TCP causes latency build-up when we have high volume ingress. This PR is an attempt to relieve this pressure. upstream issue golang/go#65064 seems to be a deeper problem, haven't yet tried the fix provide in this issue but however this change without changing the compiler helps. Of course this is a workaround.
1 parent 6651c65 commit 157a379

File tree

10 files changed

+103
-25
lines changed

10 files changed

+103
-25
lines changed

cmd/generic-handlers.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,11 @@ func guessIsRPCReq(req *http.Request) bool {
247247
if req == nil {
248248
return false
249249
}
250-
if req.Method == http.MethodGet && req.URL != nil && req.URL.Path == grid.RoutePath {
251-
return true
250+
if req.Method == http.MethodGet && req.URL != nil {
251+
switch req.URL.Path {
252+
case grid.RoutePath, grid.RouteLockPath:
253+
return true
254+
}
252255
}
253256

254257
return (req.Method == http.MethodPost || req.Method == http.MethodGet) &&

cmd/generic-handlers_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ func TestGuessIsRPC(t *testing.T) {
6464
if !guessIsRPCReq(r) {
6565
t.Fatal("Grid RPC path not detected")
6666
}
67+
r = &http.Request{
68+
Proto: "HTTP/1.1",
69+
Method: http.MethodGet,
70+
URL: &url.URL{Path: grid.RouteLockPath},
71+
}
72+
if !guessIsRPCReq(r) {
73+
t.Fatal("Grid RPC path not detected")
74+
}
6775
}
6876

6977
var isHTTPHeaderSizeTooLargeTests = []struct {

cmd/grid.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,15 @@ import (
3131
// globalGrid is the global grid manager.
3232
var globalGrid atomic.Pointer[grid.Manager]
3333

34+
// globalLockGrid is the global lock grid manager.
35+
var globalLockGrid atomic.Pointer[grid.Manager]
36+
3437
// globalGridStart is a channel that will block startup of grid connections until closed.
3538
var globalGridStart = make(chan struct{})
3639

40+
// globalLockGridStart is a channel that will block startup of lock grid connections until closed.
41+
var globalLockGridStart = make(chan struct{})
42+
3743
func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error {
3844
hosts, local := eps.GridHosts()
3945
lookupHost := globalDNSCache.LookupHost
@@ -55,13 +61,47 @@ func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error {
5561
AuthFn: newCachedAuthToken(),
5662
BlockConnect: globalGridStart,
5763
// Record incoming and outgoing bytes.
58-
Incoming: globalConnStats.incInternodeInputBytes,
59-
Outgoing: globalConnStats.incInternodeOutputBytes,
60-
TraceTo: globalTrace,
64+
Incoming: globalConnStats.incInternodeInputBytes,
65+
Outgoing: globalConnStats.incInternodeOutputBytes,
66+
TraceTo: globalTrace,
67+
RoutePath: grid.RoutePath,
6168
})
6269
if err != nil {
6370
return err
6471
}
6572
globalGrid.Store(g)
6673
return nil
6774
}
75+
76+
func initGlobalLockGrid(ctx context.Context, eps EndpointServerPools) error {
77+
hosts, local := eps.GridHosts()
78+
lookupHost := globalDNSCache.LookupHost
79+
g, err := grid.NewManager(ctx, grid.ManagerOptions{
80+
// Pass Dialer for websocket grid, make sure we do not
81+
// provide any DriveOPTimeout() function, as that is not
82+
// useful over persistent connections.
83+
Dialer: grid.ConnectWSWithRoutePath(
84+
grid.ContextDialer(xhttp.DialContextWithLookupHost(lookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions.ForWebsocket()))),
85+
newCachedAuthToken(),
86+
&tls.Config{
87+
RootCAs: globalRootCAs,
88+
CipherSuites: fips.TLSCiphers(),
89+
CurvePreferences: fips.TLSCurveIDs(),
90+
}, grid.RouteLockPath),
91+
Local: local,
92+
Hosts: hosts,
93+
AuthToken: validateStorageRequestToken,
94+
AuthFn: newCachedAuthToken(),
95+
BlockConnect: globalGridStart,
96+
// Record incoming and outgoing bytes.
97+
Incoming: globalConnStats.incInternodeInputBytes,
98+
Outgoing: globalConnStats.incInternodeOutputBytes,
99+
TraceTo: globalTrace,
100+
RoutePath: grid.RouteLockPath,
101+
})
102+
if err != nil {
103+
return err
104+
}
105+
globalLockGrid.Store(g)
106+
return nil
107+
}

cmd/lock-rest-client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,5 +107,5 @@ func newLockAPI(endpoint Endpoint) dsync.NetLocker {
107107

108108
// Returns a lock rest client.
109109
func newlockRESTClient(ep Endpoint) *lockRESTClient {
110-
return &lockRESTClient{globalGrid.Load().Connection(ep.GridHost())}
110+
return &lockRESTClient{globalLockGrid.Load().Connection(ep.GridHost())}
111111
}

cmd/lock-rest-client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestLockRESTlient(t *testing.T) {
3939

4040
ctx, cancel := context.WithCancel(context.Background())
4141
defer cancel()
42-
err = initGlobalGrid(ctx, []PoolEndpoints{{Endpoints: Endpoints{endpoint, endpointLocal}}})
42+
err = initGlobalLockGrid(ctx, []PoolEndpoints{{Endpoints: Endpoints{endpoint, endpointLocal}}})
4343
if err != nil {
4444
t.Fatal(err)
4545
}

cmd/lock-rest-server.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,17 @@ func newLockHandler(h grid.HandlerID) *grid.SingleHandler[*dsync.LockArgs, *dsyn
111111
}
112112

113113
// registerLockRESTHandlers - register lock rest router.
114-
func registerLockRESTHandlers() {
114+
func registerLockRESTHandlers(gm *grid.Manager) {
115115
lockServer := &lockRESTServer{
116116
ll: newLocker(),
117117
}
118118

119-
logger.FatalIf(lockRPCForceUnlock.Register(globalGrid.Load(), lockServer.ForceUnlockHandler), "unable to register handler")
120-
logger.FatalIf(lockRPCRefresh.Register(globalGrid.Load(), lockServer.RefreshHandler), "unable to register handler")
121-
logger.FatalIf(lockRPCLock.Register(globalGrid.Load(), lockServer.LockHandler), "unable to register handler")
122-
logger.FatalIf(lockRPCUnlock.Register(globalGrid.Load(), lockServer.UnlockHandler), "unable to register handler")
123-
logger.FatalIf(lockRPCRLock.Register(globalGrid.Load(), lockServer.RLockHandler), "unable to register handler")
124-
logger.FatalIf(lockRPCRUnlock.Register(globalGrid.Load(), lockServer.RUnlockHandler), "unable to register handler")
119+
logger.FatalIf(lockRPCForceUnlock.Register(gm, lockServer.ForceUnlockHandler), "unable to register handler")
120+
logger.FatalIf(lockRPCRefresh.Register(gm, lockServer.RefreshHandler), "unable to register handler")
121+
logger.FatalIf(lockRPCLock.Register(gm, lockServer.LockHandler), "unable to register handler")
122+
logger.FatalIf(lockRPCUnlock.Register(gm, lockServer.UnlockHandler), "unable to register handler")
123+
logger.FatalIf(lockRPCRLock.Register(gm, lockServer.RLockHandler), "unable to register handler")
124+
logger.FatalIf(lockRPCRUnlock.Register(gm, lockServer.RUnlockHandler), "unable to register handler")
125125

126126
globalLockServer = lockServer.ll
127127

cmd/routers.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,28 @@ import (
2626

2727
// Composed function registering routers for only distributed Erasure setup.
2828
func registerDistErasureRouters(router *mux.Router, endpointServerPools EndpointServerPools) {
29+
var (
30+
lockGrid = globalLockGrid.Load()
31+
commonGrid = globalGrid.Load()
32+
)
33+
2934
// Register storage REST router only if its a distributed setup.
30-
registerStorageRESTHandlers(router, endpointServerPools, globalGrid.Load())
35+
registerStorageRESTHandlers(router, endpointServerPools, commonGrid)
3136

3237
// Register peer REST router only if its a distributed setup.
33-
registerPeerRESTHandlers(router, globalGrid.Load())
38+
registerPeerRESTHandlers(router, commonGrid)
3439

3540
// Register bootstrap REST router for distributed setups.
36-
registerBootstrapRESTHandlers(globalGrid.Load())
41+
registerBootstrapRESTHandlers(commonGrid)
3742

3843
// Register distributed namespace lock routers.
39-
registerLockRESTHandlers()
44+
registerLockRESTHandlers(lockGrid)
45+
46+
// Add lock grid to router
47+
router.Handle(grid.RouteLockPath, adminMiddleware(lockGrid.Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag))
4048

4149
// Add grid to router
42-
router.Handle(grid.RoutePath, adminMiddleware(globalGrid.Load().Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag))
50+
router.Handle(grid.RoutePath, adminMiddleware(lockGrid.Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag))
4351
}
4452

4553
// List of some generic middlewares which are applied for all incoming requests.

cmd/server-main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,14 +856,20 @@ func serverMain(ctx *cli.Context) {
856856
logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services")
857857
})
858858

859+
// Initialize lock grid
860+
bootstrapTrace("initLockGrid", func() {
861+
logger.FatalIf(initGlobalLockGrid(GlobalContext, globalEndpoints), "Unable to configure server lock grid RPC services")
862+
})
863+
859864
// Configure server.
860865
bootstrapTrace("configureServer", func() {
861866
handler, err := configureServerHandler(globalEndpoints)
862867
if err != nil {
863868
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
864869
}
865870
// Allow grid to start after registering all services.
866-
xioutil.SafeClose(globalGridStart)
871+
close(globalGridStart)
872+
close(globalLockGridStart)
867873

868874
httpServer := xhttp.NewServer(getServerListenAddrs()).
869875
UseHandler(setCriticalErrorHandler(corsHandler(handler))).

internal/grid/grid.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,12 @@ func bytesOrLength(b []byte) string {
202202
// The net.Conn must support all features as described by the net.Conn interface.
203203
type ConnDialer func(ctx context.Context, address string) (net.Conn, error)
204204

205-
// ConnectWS returns a function that dials a websocket connection to the given address.
206-
// Route and auth are added to the connection.
207-
func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx context.Context, remote string) (net.Conn, error) {
205+
// ConnectWSWithRoutePath is like ConnectWS but with a custom grid route path.
206+
func ConnectWSWithRoutePath(dial ContextDialer, auth AuthFn, tls *tls.Config, routePath string) func(ctx context.Context, remote string) (net.Conn, error) {
208207
return func(ctx context.Context, remote string) (net.Conn, error) {
209208
toDial := strings.Replace(remote, "http://", "ws://", 1)
210209
toDial = strings.Replace(toDial, "https://", "wss://", 1)
211-
toDial += RoutePath
210+
toDial += routePath
212211

213212
dialer := ws.DefaultDialer
214213
dialer.ReadBufferSize = readBufferSize
@@ -234,5 +233,11 @@ func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx contex
234233
}
235234
}
236235

236+
// ConnectWS returns a function that dials a websocket connection to the given address.
237+
// Route and auth are added to the connection.
238+
func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx context.Context, remote string) (net.Conn, error) {
239+
return ConnectWSWithRoutePath(dial, auth, tls, RoutePath)
240+
}
241+
237242
// ValidateTokenFn must validate the token and return an error if it is invalid.
238243
type ValidateTokenFn func(token string) error

internal/grid/manager.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ const (
4545

4646
// RoutePath is the remote path to connect to.
4747
RoutePath = "/minio/grid/" + apiVersion
48+
49+
// RouteLockPath is the remote lock path to connect to.
50+
RouteLockPath = "/minio/grid/lock/" + apiVersion
4851
)
4952

5053
// Manager will contain all the connections to the grid.
@@ -65,6 +68,9 @@ type Manager struct {
6568

6669
// authToken is a function that will validate a token.
6770
authToken ValidateTokenFn
71+
72+
// routePath indicates the dial route path
73+
routePath string
6874
}
6975

7076
// ManagerOptions are options for creating a new grid manager.
@@ -74,6 +80,7 @@ type ManagerOptions struct {
7480
Incoming func(n int64) // Record incoming bytes.
7581
Outgoing func(n int64) // Record outgoing bytes.
7682
BlockConnect chan struct{} // If set, incoming and outgoing connections will be blocked until closed.
83+
RoutePath string
7784
TraceTo *pubsub.PubSub[madmin.TraceInfo, madmin.TraceType]
7885
Dialer ConnDialer
7986
// Sign a token for the given audience.
@@ -99,6 +106,7 @@ func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error) {
99106
targets: make(map[string]*Connection, len(o.Hosts)),
100107
local: o.Local,
101108
authToken: o.AuthToken,
109+
routePath: o.RoutePath,
102110
}
103111
m.handlers.init()
104112
if ctx == nil {
@@ -137,7 +145,7 @@ func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error) {
137145

138146
// AddToMux will add the grid manager to the given mux.
139147
func (m *Manager) AddToMux(router *mux.Router, authReq func(r *http.Request) error) {
140-
router.Handle(RoutePath, m.Handler(authReq))
148+
router.Handle(m.routePath, m.Handler(authReq))
141149
}
142150

143151
// Handler returns a handler that can be used to serve grid requests.

0 commit comments

Comments
 (0)