Skip to content

GODRIVER-3181 Read server responses in the background after op timeout. #1719

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
522 changes: 522 additions & 0 deletions internal/integration/csot_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ func (t *T) cleanup() {
// Run creates a new T instance for a sub-test and runs the given callback. It also creates a new collection using the
// given name which is available to the callback through the T.Coll variable and is dropped after the callback
// returns.
func (t *T) Run(name string, callback func(*T)) {
func (t *T) Run(name string, callback func(mt *T)) {
t.RunOpts(name, NewOptions(), callback)
}

// RunOpts creates a new T instance for a sub-test with the given options. If the current environment does not satisfy
// constraints specified in the options, the new sub-test will be skipped automatically. If the test is not skipped,
// the callback will be run with the new T instance. RunOpts creates a new collection with the given name which is
// available to the callback through the T.Coll variable and is dropped after the callback returns.
func (t *T) RunOpts(name string, opts *Options, callback func(*T)) {
func (t *T) RunOpts(name string, opts *Options, callback func(mt *T)) {
t.T.Run(name, func(wrapped *testing.T) {
sub := newT(wrapped, t.baseOpts, opts)

Expand Down
25 changes: 25 additions & 0 deletions internal/integration/unified/unified_spec_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,31 @@ var (
"unpin when a new transaction is started": "Implement GODRIVER-3034",
"unpin when a non-transaction write operation uses a session": "Implement GODRIVER-3034",
"unpin when a non-transaction read operation uses a session": "Implement GODRIVER-3034",

// DRIVERS-2722: Setting "maxTimeMS" on a command that creates a cursor
// also limits the lifetime of the cursor. That may be surprising to
// users, so omit "maxTimeMS" from operations that return user-managed
// cursors.
"timeoutMS can be overridden for a find": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"timeoutMS can be configured for an operation - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"timeoutMS can be configured for an operation - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"timeoutMS can be configured for an operation - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"timeoutMS can be configured on a MongoClient - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"timeoutMS can be configured on a MongoClient - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"timeoutMS can be configured on a MongoClient - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"operation is retried multiple times for non-zero timeoutMS - find on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"operation is retried multiple times for non-zero timeoutMS - aggregate on collection": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"operation is retried multiple times for non-zero timeoutMS - aggregate on database": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",
"timeoutMS applied to find command": "maxTimeMS is disabled on find and aggregate. See DRIVERS-2722.",

// DRIVERS-2953: This test requires that the driver sends a "getMore"
// with "maxTimeMS" set. However, "getMore" can only include "maxTimeMS"
// for tailable awaitData cursors. Including "maxTimeMS" on "getMore"
// for any other cursor type results in a server error:
//
// (BadValue) cannot set maxTimeMS on getMore command for a non-awaitData cursor
//
"Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset": "maxTimeMS can't be set on a getMore. See DRIVERS-2953",
}

logMessageValidatorTimeout = 10 * time.Millisecond
Expand Down
27 changes: 21 additions & 6 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,12 @@ func aggregate(a aggregateParams, opts ...options.Lister[options.AggregateOption
ServerAPI(a.client.serverAPI).
HasOutputStage(hasOutputStage).
Timeout(a.client.timeout).
Authenticator(a.client.authenticator)
Authenticator(a.client.authenticator).
// Omit "maxTimeMS" from operations that return a user-managed cursor to
// prevent confusing "cursor not found" errors.
//
// See DRIVERS-2722 for more detail.
OmitMaxTimeMS(true)

if args.AllowDiskUse != nil {
op.AllowDiskUse(*args.AllowDiskUse)
Expand Down Expand Up @@ -1293,11 +1298,20 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
if err != nil {
return nil, err
}
return coll.find(ctx, filter, args)

// Omit "maxTimeMS" from operations that return a user-managed cursor to
// prevent confusing "cursor not found" errors.
//
// See DRIVERS-2722 for more detail.
return coll.find(ctx, filter, true, args)
}

func (coll *Collection) find(ctx context.Context, filter interface{},
args *options.FindOptions) (cur *Cursor, err error) {
func (coll *Collection) find(
ctx context.Context,
filter interface{},
omitMaxTimeMS bool,
args *options.FindOptions,
) (cur *Cursor, err error) {

if ctx == nil {
ctx = context.Background()
Expand Down Expand Up @@ -1335,7 +1349,8 @@ func (coll *Collection) find(ctx context.Context, filter interface{},
CommandMonitor(coll.client.monitor).ServerSelector(selector).
ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name).
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI).
Timeout(coll.client.timeout).Logger(coll.client.logger).Authenticator(coll.client.authenticator)
Timeout(coll.client.timeout).Logger(coll.client.logger).Authenticator(coll.client.authenticator).
OmitMaxTimeMS(omitMaxTimeMS)

cursorOpts := coll.client.createBaseCursorOptions()

Expand Down Expand Up @@ -1500,7 +1515,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{},
if err != nil {
return nil
}
cursor, err := coll.find(ctx, filter, newFindArgsFromFindOneArgs(args))
cursor, err := coll.find(ctx, filter, false, newFindArgsFromFindOneArgs(args))
return &SingleResult{
ctx: ctx,
cur: cursor,
Expand Down
6 changes: 6 additions & 0 deletions x/mongo/driver/batch_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,12 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
Crypt: bc.crypt,
ServerAPI: bc.serverAPI,

// Omit the automatically-calculated maxTimeMS because setting maxTimeMS
// on a non-awaitData cursor causes a server error. For awaitData
// cursors, maxTimeMS is set when maxAwaitTime is specified by the above
// CommandFn.
OmitMaxTimeMS: true,

// No read preference is passed to the getMore command,
// resulting in the default read preference: "primaryPreferred".
// Since this could be confusing, and there is no requirement
Expand Down
16 changes: 15 additions & 1 deletion x/mongo/driver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,14 +509,28 @@ func ExtractErrorFromServerResponse(doc bsoncore.Document) error {
errmsg = "command failed"
}

return Error{
err := Error{
Code: code,
Message: errmsg,
Name: codeName,
Labels: labels,
TopologyVersion: tv,
Raw: doc,
}

// If we get a MaxTimeMSExpired error, assume that the error was caused
// by setting "maxTimeMS" on the command based on the context deadline
// or on "timeoutMS". In that case, make the error wrap
// context.DeadlineExceeded so that users can always check
//
// errors.Is(err, context.DeadlineExceeded)
//
// for either client-side or server-side timeouts.
if err.Code == 50 {
err.Wrapped = context.DeadlineExceeded
}

return err
}

if len(wcError.WriteErrors) > 0 || wcError.WriteConcernError != nil {
Expand Down
104 changes: 0 additions & 104 deletions x/mongo/driver/integration/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,113 +10,21 @@ import (
"bytes"
"context"
"testing"
"time"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/event"
"go.mongodb.org/mongo-driver/v2/internal/integtest"
"go.mongodb.org/mongo-driver/v2/internal/require"
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/topology"
)

func setUpMonitor() (*event.CommandMonitor, chan *event.CommandStartedEvent, chan *event.CommandSucceededEvent, chan *event.CommandFailedEvent) {
started := make(chan *event.CommandStartedEvent, 1)
succeeded := make(chan *event.CommandSucceededEvent, 1)
failed := make(chan *event.CommandFailedEvent, 1)

return &event.CommandMonitor{
Started: func(_ context.Context, e *event.CommandStartedEvent) {
started <- e
},
Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) {
succeeded <- e
},
Failed: func(_ context.Context, e *event.CommandFailedEvent) {
failed <- e
},
}, started, succeeded, failed
}

func skipIfBelow32(ctx context.Context, t *testing.T, topo *topology.Topology) {
server, err := topo.SelectServer(ctx, &serverselector.Write{})
noerr(t, err)

versionCmd := bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "serverStatus", 1))
serverStatus, err := runCommand(server, dbName, versionCmd)
noerr(t, err)
version, err := serverStatus.LookupErr("version")
noerr(t, err)

if integtest.CompareVersions(t, version.StringValue(), "3.2") < 0 {
t.Skip()
}
}

func TestAggregate(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}

t.Run("TestMaxTimeMSInGetMore", func(t *testing.T) {
ctx := context.Background()
monitor, started, succeeded, failed := setUpMonitor()
dbName := "TestAggMaxTimeDB"
collName := "TestAggMaxTimeColl"
top := integtest.MonitoredTopology(t, dbName, monitor)
clearChannels(started, succeeded, failed)
skipIfBelow32(ctx, t, top)

clearChannels(started, succeeded, failed)
err := operation.NewInsert(
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "x", 1)),
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "x", 1)),
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "x", 1)),
).Collection(collName).Database(dbName).
Deployment(top).ServerSelector(&serverselector.Write{}).Execute(context.Background())
noerr(t, err)

clearChannels(started, succeeded, failed)
op := operation.NewAggregate(bsoncore.BuildDocumentFromElements(nil)).
Collection(collName).Database(dbName).Deployment(top).ServerSelector(&serverselector.Write{}).
CommandMonitor(monitor).BatchSize(2)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

err = op.Execute(ctx)
noerr(t, err)
batchCursor, err := op.Result(driver.CursorOptions{BatchSize: 2, CommandMonitor: monitor})
noerr(t, err)

var e *event.CommandStartedEvent
select {
case e = <-started:
case <-time.After(2000 * time.Millisecond):
t.Fatal("timed out waiting for aggregate")
}

require.Equal(t, "aggregate", e.CommandName)

clearChannels(started, succeeded, failed)
// first Next() should automatically return true
require.True(t, batchCursor.Next(ctx), "expected true from first Next, got false")
clearChannels(started, succeeded, failed)
batchCursor.Next(ctx) // should do getMore

select {
case e = <-started:
case <-time.After(200 * time.Millisecond):
t.Fatal("timed out waiting for getMore")
}
require.Equal(t, "getMore", e.CommandName)
_, err = e.Command.LookupErr("maxTimeMS")
noerr(t, err)
})
t.Run("Multiple Batches", func(t *testing.T) {
ds := []bsoncore.Document{
bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "_id", 1)),
Expand Down Expand Up @@ -185,15 +93,3 @@ func TestAggregate(t *testing.T) {
})

}

func clearChannels(s chan *event.CommandStartedEvent, succ chan *event.CommandSucceededEvent, f chan *event.CommandFailedEvent) {
for len(s) > 0 {
<-s
}
for len(succ) > 0 {
<-succ
}
for len(f) > 0 {
<-f
}
}
12 changes: 6 additions & 6 deletions x/mongo/driver/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ func addCompressorToURI(uri string) string {
return uri + "compressors=" + comp
}

// runCommand runs an arbitrary command on a given database of target server
func runCommand(s driver.Server, db string, cmd bsoncore.Document) (bsoncore.Document, error) {
// runCommand runs an arbitrary command on a given database of the target
// server.
func runCommand(s driver.Server, db string, cmd bsoncore.Document) error {
op := operation.NewCommand(cmd).
Database(db).Deployment(driver.SingleServerDeployment{Server: s})
err := op.Execute(context.Background())
res := op.Result()
return res, err
Database(db).
Deployment(driver.SingleServerDeployment{Server: s})
return op.Execute(context.Background())
}

// dropCollection drops the collection in the test cluster.
Expand Down
5 changes: 2 additions & 3 deletions x/mongo/driver/integration/scram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ func runScramAuthTest(t *testing.T, credential options.Credential) error {
noerr(t, err)

cmd := bsoncore.BuildDocument(nil, bsoncore.AppendInt32Element(nil, "dbstats", 1))
_, err = runCommand(server, integtest.DBName(t), cmd)
return err
return runCommand(server, integtest.DBName(t), cmd)
}

func createScramUsers(t *testing.T, s driver.Server, cases []scramTestCase) error {
Expand All @@ -169,7 +168,7 @@ func createScramUsers(t *testing.T, s driver.Server, cases []scramTestCase) erro
)),
bsoncore.AppendArrayElement(nil, "mechanisms", bsoncore.BuildArray(nil, values...)),
)
_, err := runCommand(s, db, newUserCmd)
err := runCommand(s, db, newUserCmd)
if err != nil {
return fmt.Errorf("Couldn't create user '%s' on db '%s': %w", c.username, integtest.DBName(t), err)
}
Expand Down
Loading
Loading