diff --git a/event/monitoring.go b/event/monitoring.go index cc2c7a4e6c..ddc7abacf7 100644 --- a/event/monitoring.go +++ b/event/monitoring.go @@ -117,6 +117,7 @@ type PoolEvent struct { Address string `json:"address"` ConnectionID uint64 `json:"connectionId"` PoolOptions *MonitorPoolOptions `json:"options"` + Duration time.Duration `json:"duration"` Reason string `json:"reason"` // ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field // can be used to distinguish between individual servers in a load balanced deployment. diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-logging.json b/testdata/connection-monitoring-and-pooling/logging/connection-logging.json index 86d4357420..3186f2be98 100644 --- a/testdata/connection-monitoring-and-pooling/logging/connection-logging.json +++ b/testdata/connection-monitoring-and-pooling/logging/connection-logging.json @@ -140,6 +140,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } }, @@ -162,6 +169,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } }, @@ -222,6 +236,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } }, @@ -444,7 +465,7 @@ { "level": "debug", "component": "connection", - "unordered": true, + "unordered": true, "data": { "message": "Connection closed", "driverConnectionId": { @@ -471,7 +492,7 @@ { "level": "debug", "component": "connection", - "unordered": true, + "unordered": true, "data": { "message": "Connection checkout failed", "serverHost": { @@ -486,6 +507,13 @@ "reason": "An error occurred while trying to establish a new connection", "error": { "$$exists": true + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } } diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-logging.yml b/testdata/connection-monitoring-and-pooling/logging/connection-logging.yml index ef5576d753..61d9ed3297 100644 --- a/testdata/connection-monitoring-and-pooling/logging/connection-logging.yml +++ b/testdata/connection-monitoring-and-pooling/logging/connection-logging.yml @@ -66,6 +66,7 @@ tests: driverConnectionId: { $$type: [int, long] } serverHost: { $$type: string } serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } - level: debug component: connection @@ -74,6 +75,7 @@ tests: driverConnectionId: { $$type: [int, long] } serverHost: { $$type: string } serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } - level: debug component: connection @@ -98,6 +100,7 @@ tests: driverConnectionId: { $$type: [int, long] } serverHost: { $$type: string } serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } - level: debug component: connection @@ -219,4 +222,5 @@ tests: serverPort: { $$type: [int, long] } reason: "An error occurred while trying to establish a new connection" error: { $$exists: true } + durationMS: { $$type: [double, int, long] } unordered: true diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.json b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.json index 1d4cc01a9f..7055a54869 100644 --- a/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.json +++ b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.json @@ -1,5 +1,5 @@ { - "description": "connection-pool-logging", + "description": "connection-pool-options", "schemaVersion": "1.13", "runOnRequirements": [ { @@ -128,6 +128,13 @@ "int", "long" ] + }, + "durationMS": { + "$$type": [ + "double", + "int", + "long" + ] } } } diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.yml b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.yml index beebb474a7..790dab6fed 100644 --- a/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.yml +++ b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.yml @@ -1,4 +1,4 @@ -description: "connection-pool-logging" +description: "connection-pool-options" schemaVersion: "1.13" @@ -71,6 +71,7 @@ tests: driverConnectionId: { $$type: [int, long] } serverHost: { $$type: string } serverPort: { $$type: [int, long] } + durationMS: { $$type: [double, int, long] } # Drivers who have not done DRIVERS-1943 will need to skip this test. - description: "maxConnecting should be included in connection pool created message when specified" diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 6ca23c071b..bfbda4fa48 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -467,6 +467,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { }) } + start := time.Now() // Check the pool state while holding a stateMu read lock. If the pool state is not "ready", // return an error. Do all of this while holding the stateMu read lock to prevent a state change between // checking the state and entering the wait queue. Not holding the stateMu read lock here may @@ -477,8 +478,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { case poolClosed: p.stateMu.RUnlock() + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedPoolClosed, } @@ -487,9 +490,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonPoolClosed, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonPoolClosed, }) } return nil, ErrPoolClosed @@ -497,8 +501,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { err := poolClearedError{err: p.lastClearErr, address: p.address} p.stateMu.RUnlock() + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedError, } @@ -507,10 +513,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonConnectionErrored, - Error: err, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonConnectionErrored, + Error: err, }) } return nil, err @@ -539,9 +546,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { // or an error, so unlock the stateMu lock here. p.stateMu.RUnlock() + duration := time.Since(start) if w.err != nil { if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedError, } @@ -550,18 +559,21 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonConnectionErrored, - Error: w.err, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonConnectionErrored, + Error: w.err, }) } return nil, w.err } + duration = time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ logger.KeyDriverConnectionID, w.conn.driverConnectionID, + logger.KeyDurationMS, duration.Milliseconds(), } logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) @@ -572,6 +584,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { Type: event.GetSucceeded, Address: p.address.String(), ConnectionID: w.conn.driverConnectionID, + Duration: duration, }) } @@ -584,12 +597,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { p.stateMu.RUnlock() // Wait for either the wantConn to be ready or for the Context to time out. - start := time.Now() + waitQueueStart := time.Now() select { case <-w.ready: if w.err != nil { + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedError, logger.KeyError, w.err.Error(), } @@ -599,19 +614,22 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonConnectionErrored, - Error: w.err, + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonConnectionErrored, + Error: w.err, }) } return nil, w.err } + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ logger.KeyDriverConnectionID, w.conn.driverConnectionID, + logger.KeyDurationMS, duration.Milliseconds(), } logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) @@ -622,14 +640,17 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { Type: event.GetSucceeded, Address: p.address.String(), ConnectionID: w.conn.driverConnectionID, + Duration: duration, }) } return w.conn, nil case <-ctx.Done(): - duration := time.Since(start) + waitQueueDuration := time.Since(waitQueueStart) + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ + logger.KeyDurationMS, duration.Milliseconds(), logger.KeyReason, logger.ReasonConnCheckoutFailedTimout, } @@ -638,10 +659,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ - Type: event.GetFailed, - Address: p.address.String(), - Reason: event.ReasonTimedOut, - Error: ctx.Err(), + Type: event.GetFailed, + Address: p.address.String(), + Duration: duration, + Reason: event.ReasonTimedOut, + Error: ctx.Err(), }) } @@ -650,7 +672,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { maxPoolSize: p.maxSize, totalConnections: p.totalConnectionCount(), availableConnections: p.availableConnectionCount(), - waitDuration: duration, + waitDuration: waitQueueDuration, } if p.loadBalanced { err.pinnedConnections = &pinnedConnections{ @@ -1085,6 +1107,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { }) } + start := time.Now() // Pass the createConnections context to connect to allow pool close to cancel connection // establishment so shutdown doesn't block indefinitely if connectTimeout=0. err := conn.connect(ctx) @@ -1111,9 +1134,11 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { continue } + duration := time.Since(start) if mustLogPoolMessage(p) { keysAndValues := logger.KeyValues{ logger.KeyDriverConnectionID, conn.driverConnectionID, + logger.KeyDurationMS, duration.Milliseconds(), } logPoolMessage(p, logger.ConnectionReady, keysAndValues...) @@ -1124,6 +1149,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { Type: event.ConnectionReady, Address: p.address.String(), ConnectionID: conn.driverConnectionID, + Duration: duration, }) } diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index 013403981d..bc7115ee2c 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -14,8 +14,12 @@ import ( "testing" "time" + "go.mongodb.org/mongo-driver/event" "go.mongodb.org/mongo-driver/internal/assert" + "go.mongodb.org/mongo-driver/internal/eventtest" + "go.mongodb.org/mongo-driver/internal/require" "go.mongodb.org/mongo-driver/mongo/address" + "go.mongodb.org/mongo-driver/x/mongo/driver" "go.mongodb.org/mongo-driver/x/mongo/driver/operation" ) @@ -1157,3 +1161,87 @@ func assertConnectionsOpened(t *testing.T, dialer *dialer, count int) { time.Sleep(100 * time.Millisecond) } } + +func TestPool_PoolMonitor(t *testing.T) { + t.Parallel() + + t.Run("records durations", func(t *testing.T) { + t.Parallel() + + cleanup := make(chan struct{}) + defer close(cleanup) + + // Create a listener that responds to exactly 1 connection. All + // subsequent connection requests should fail. + addr := bootstrapConnections(t, 1, func(nc net.Conn) { + <-cleanup + _ = nc.Close() + }) + + tpm := eventtest.NewTestPoolMonitor() + p := newPool( + poolConfig{ + Address: address.Address(addr.String()), + PoolMonitor: tpm.PoolMonitor, + }, + // Add a 10ms delay in the handshake so the test is reliable on + // operating systems that can't measure very short durations (e.g. + // Windows). + WithHandshaker(func(Handshaker) Handshaker { + return &testHandshaker{ + getHandshakeInformation: func(context.Context, address.Address, driver.Connection) (driver.HandshakeInformation, error) { + time.Sleep(10 * time.Millisecond) + return driver.HandshakeInformation{}, nil + }, + } + })) + + err := p.ready() + require.NoError(t, err, "ready error") + + // Check out a connection to trigger "ConnectionReady" and + // "ConnectionCheckedOut" events. + conn, err := p.checkOut(context.Background()) + require.NoError(t, err, "checkOut error") + + // Close the connection so the next checkOut tries to create a new + // connection. + err = conn.close() + require.NoError(t, err, "close error") + + err = p.checkIn(conn) + require.NoError(t, err, "checkIn error") + + // Try to check out another connection to trigger a + // "ConnectionCheckOutFailed" event. + _, err = p.checkOut(context.Background()) + require.Error(t, err, "expected a checkOut error") + + p.close(context.Background()) + + events := tpm.Events(func(evt *event.PoolEvent) bool { + switch evt.Type { + case "ConnectionReady", "ConnectionCheckedOut", "ConnectionCheckOutFailed": + return true + } + return false + }) + + require.Lenf(t, events, 3, "expected there to be 3 pool events") + + assert.Equal(t, events[0].Type, "ConnectionReady") + assert.Positive(t, + events[0].Duration, + "expected ConnectionReady Duration to be set") + + assert.Equal(t, events[1].Type, "ConnectionCheckedOut") + assert.Positive(t, + events[1].Duration, + "expected ConnectionCheckedOut Duration to be set") + + assert.Equal(t, events[2].Type, "ConnectionCheckOutFailed") + assert.Positive(t, + events[2].Duration, + "expected ConnectionCheckOutFailed Duration to be set") + }) +}