Skip to content

GODRIVER-2037 Don't clear the connection pool on client-side connect timeout errors. #688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 30, 2021
Merged
59 changes: 59 additions & 0 deletions mongo/integration/primary_stepdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"sync"
"testing"

"go.mongodb.org/mongo-driver/bson"
Expand All @@ -23,7 +24,65 @@ const (
errorInterruptedAtShutdown int32 = 11600
)

// testPoolMonitor exposes an *event.PoolMonitor and collects all events logged to that
// *event.PoolMonitor. It is safe to use from multiple concurrent goroutines.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh very nice. Optional: should this be in a separate file, like connection_pool_helpers_test.go? Though isPoolCleared was used by other files, the name of this file suggests it is only for the tests specified in https://github.com/mongodb/specifications/blob/master/source/connections-survive-step-down/tests/README.rst

Copy link
Collaborator Author

@matthewdale matthewdale Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to locate it "next to" the existing poolMonitor so that there aren't two implementations of the same thing in different places. I think the testPoolMonitor could actually be a good candidate for a "test utilities" package because numerous test packages need to record events (including the Server tests added in this PR). However, I'm not sure it's worth moving into a separate file until we want to move it into a test utilities package, which probably shouldn't be part of this PR.

I've added investigating moving testPoolMonitor to a shared test utilities package to the description of GODRIVER-2068..

type testPoolMonitor struct {
*event.PoolMonitor

events []*event.PoolEvent
mu sync.RWMutex
}

func newTestPoolMonitor() *testPoolMonitor {
tpm := &testPoolMonitor{
events: make([]*event.PoolEvent, 0),
}
tpm.PoolMonitor = &event.PoolMonitor{
Event: func(evt *event.PoolEvent) {
tpm.mu.Lock()
defer tpm.mu.Unlock()
tpm.events = append(tpm.events, evt)
},
}
return tpm
}

// Events returns a copy of the events collected by the testPoolMonitor. Filters can optionally be
// applied to the returned events set and are applied using AND logic (i.e. all filters must return
// true to include the event in the result).
func (tpm *testPoolMonitor) Events(filters ...func(*event.PoolEvent) bool) []*event.PoolEvent {
filtered := make([]*event.PoolEvent, 0, len(tpm.events))
tpm.mu.RLock()
defer tpm.mu.RUnlock()

for _, evt := range tpm.events {
keep := true
for _, filter := range filters {
if !filter(evt) {
keep = false
break
}
}
if keep {
filtered = append(filtered, evt)
}
}

return filtered
}

// IsPoolCleared returns true if there are any events of type "event.PoolCleared" in the events
// recorded by the testPoolMonitor.
func (tpm *testPoolMonitor) IsPoolCleared() bool {
poolClearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
return evt.Type == event.PoolCleared
})
return len(poolClearedEvents) > 0
}

var poolChan = make(chan *event.PoolEvent, 100)

// TODO(GODRIVER-2068): Replace all uses of poolMonitor with individual instances of testPoolMonitor.
var poolMonitor = &event.PoolMonitor{
Event: func(event *event.PoolEvent) {
poolChan <- event
Expand Down
160 changes: 106 additions & 54 deletions mongo/integration/sdam_error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ func TestSDAMErrorHandling(t *testing.T) {
return options.Client().
ApplyURI(mtest.ClusterURI()).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc)
}
baseMtOpts := func() *mtest.Options {
mtOpts := mtest.NewOptions().
Topologies(mtest.ReplicaSet). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
Topologies(mtest.ReplicaSet, mtest.Single). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
ClientOptions(baseClientOpts())

if mtest.ClusterTopologyKind() == mtest.Sharded {
Expand All @@ -48,13 +47,14 @@ func TestSDAMErrorHandling(t *testing.T) {
// blockConnection and appName.
mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by an application operation thread
// encounters a network timeout during handshaking. Unlike the non-timeout test below, we only test
// connections created in the foreground for timeouts because connections created by the pool
// maintenance routine can't be timed out using a context.

appName := "authNetworkTimeoutTest"
mt.Run("pool not cleared on operation-scoped network timeout", func(mt *mtest.T) {
// Assert that the pool is not cleared when a connection created by an application
// operation thread encounters an operation timeout during handshaking. Unlike the
// non-timeout test below, we only test connections created in the foreground for
// timeouts because connections created by the pool maintenance routine can't be
// timed out using a context.

appName := "authOperationTimeoutTest"
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
// speculative auth.
mt.SetFailPoint(mtest.FailPoint{
Expand All @@ -70,24 +70,61 @@ func TestSDAMErrorHandling(t *testing.T) {
},
})

// Reset the client with the appName specified in the failpoint.
clientOpts := options.Client().
SetAppName(appName).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor)
mt.ResetClient(clientOpts)
clearPoolChan()
// Reset the client with the appName specified in the failpoint and the pool monitor.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

// The saslContinue blocks for 150ms so run the InsertOne with a 100ms context to cause a network
// timeout during auth and assert that the pool was cleared.
// Use a context with a 100ms timeout so that the saslContinue delay of 150ms causes
// an operation-scoped context timeout (i.e. a timeout not caused by a client timeout
// like connectTimeoutMS or socketTimeoutMS).
timeoutCtx, cancel := context.WithTimeout(mtest.Background, 100*time.Millisecond)
defer cancel()
_, err := mt.Coll.InsertOne(timeoutCtx, bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
assert.False(mt, tpm.IsPoolCleared(), "expected pool not to be cleared but was cleared")
})

mt.Run("pool cleared on non-operation-scoped network timeout", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by an application
// operation thread encounters a timeout caused by connectTimeoutMS during
// handshaking.

appName := "authConnectTimeoutTest"
// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
// speculative auth.
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"saslContinue"},
BlockConnection: true,
BlockTimeMS: 150,
AppName: appName,
},
})

// Reset the client with the appName specified in the failpoint and the pool monitor.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().
SetAppName(appName).
SetPoolMonitor(tpm.PoolMonitor).
// Set a 100ms socket timeout so that the saslContinue delay of 150ms causes a
// timeout during socket read (i.e. a timeout not caused by the InsertOne context).
SetSocketTimeout(100 * time.Millisecond))

// Use context.Background() so that the new connection will not time out due to an
// operation-scoped timeout.
_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})

mt.RunOpts("pool cleared on non-timeout network error", noClientOpts, func(mt *mtest.T) {
mt.Run("background", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by the background pool maintenance
Expand All @@ -106,16 +143,19 @@ func TestSDAMErrorHandling(t *testing.T) {
},
})

clientOpts := options.Client().
// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().
SetAppName(appName).
SetMinPoolSize(5).
SetPoolMonitor(poolMonitor)
mt.ResetClient(clientOpts)
clearPoolChan()
SetPoolMonitor(tpm.PoolMonitor).
// Set minPoolSize to enable the background pool maintenance goroutine.
SetMinPoolSize(5))

time.Sleep(200 * time.Millisecond)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")

assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})

mt.Run("foreground", func(mt *mtest.T) {
// Assert that the pool is cleared when a connection created by an application thread connection
// checkout encounters a non-timeout network error during handshaking.
Expand All @@ -133,24 +173,23 @@ func TestSDAMErrorHandling(t *testing.T) {
},
})

clientOpts := options.Client().
SetAppName(appName).
SetPoolMonitor(poolMonitor)
mt.ResetClient(clientOpts)
clearPoolChan()
// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})
})
})
})
mt.RunOpts("after handshake completes", baseMtOpts(), func(mt *mtest.T) {
mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
mt.Run("pool cleared on non-timeout network error", func(mt *mtest.T) {
clearPoolChan()
appName := "afterHandshakeNetworkError"

mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Expand All @@ -159,16 +198,22 @@ func TestSDAMErrorHandling(t *testing.T) {
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
CloseConnection: true,
AppName: appName,
},
})

// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"test", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
assert.True(mt, isPoolCleared(), "expected pool to be cleared but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
})
mt.Run("pool not cleared on timeout network error", func(mt *mtest.T) {
clearPoolChan()
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)
Expand All @@ -181,11 +226,11 @@ func TestSDAMErrorHandling(t *testing.T) {
_, err = mt.Coll.Find(timeoutCtx, filter)
assert.NotNil(mt, err, "expected Find error, got %v", err)
assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)

assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
})
mt.Run("pool not cleared on context cancellation", func(mt *mtest.T) {
clearPoolChan()
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.Nil(mt, err, "InsertOne error: %v", err)
Expand All @@ -204,8 +249,7 @@ func TestSDAMErrorHandling(t *testing.T) {
assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
assert.True(mt, errors.Is(err, context.Canceled), "expected error %v to be context.Canceled", err)

assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
})
})
mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
Expand Down Expand Up @@ -242,28 +286,32 @@ func TestSDAMErrorHandling(t *testing.T) {
}
for _, tc := range testCases {
mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()
appName := fmt.Sprintf("command_error_%s", tc.name)

// Cause the next insert to fail with an ok:0 response.
fp := mtest.FailPoint{
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
ErrorCode: tc.errorCode,
AppName: appName,
},
}
mt.SetFailPoint(fp)
})

// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

runServerErrorsTest(mt, tc.isShutdownError)
runServerErrorsTest(mt, tc.isShutdownError, tpm)
})
mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()
appName := fmt.Sprintf("write_concern_error_%s", tc.name)

// Cause the next insert to fail with a write concern error.
fp := mtest.FailPoint{
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
Expand All @@ -273,32 +321,36 @@ func TestSDAMErrorHandling(t *testing.T) {
WriteConcernError: &mtest.WriteConcernErrorData{
Code: tc.errorCode,
},
AppName: appName,
},
}
mt.SetFailPoint(fp)
})

// Reset the client with the appName specified in the failpoint.
tpm := newTestPoolMonitor()
mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

runServerErrorsTest(mt, tc.isShutdownError)
runServerErrorsTest(mt, tc.isShutdownError, tpm)
})
}
})
})
}

func runServerErrorsTest(mt *mtest.T, isShutdownError bool) {
func runServerErrorsTest(mt *mtest.T, isShutdownError bool, tpm *testPoolMonitor) {
mt.Helper()

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")

// The pool should always be cleared for shutdown errors, regardless of server version.
if isShutdownError {
assert.True(mt, isPoolCleared(), "expected pool to be cleared, but was not")
assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared, but was not")
return
}

// For non-shutdown errors, the pool is only cleared if the error is from a pre-4.2 server.
wantCleared := mtest.CompareServerVersions(mtest.ServerVersion(), "4.2") < 0
gotCleared := isPoolCleared()
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %v; pool was cleared: %v",
gotCleared := tpm.IsPoolCleared()
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %t; pool was cleared: %t",
wantCleared, gotCleared)
}
Loading