diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 0c8a8b8b575..7c271e8a97f 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -26,13 +26,8 @@ import { } from '../error'; import { CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { Server } from '../sdam/server'; -import { - type Callback, - List, - makeCounter, - promiseWithResolvers, - TimeoutController -} from '../utils'; +import { Timeout, TimeoutError } from '../timeout'; +import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils'; import { connect } from './connect'; import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection'; import { @@ -107,7 +102,7 @@ export interface ConnectionPoolOptions extends Omit void; reject: (err: AnyError) => void; - timeoutController: TimeoutController; + timeout: Timeout; [kCancelled]?: boolean; } @@ -368,33 +363,40 @@ export class ConnectionPool extends TypedEventEmitter { const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; const { promise, resolve, reject } = promiseWithResolvers(); + + const timeout = Timeout.expires(waitQueueTimeoutMS); + const waitQueueMember: WaitQueueMember = { resolve, reject, - timeoutController: new TimeoutController(waitQueueTimeoutMS) + timeout }; - waitQueueMember.timeoutController.signal.addEventListener('abort', () => { - waitQueueMember[kCancelled] = true; - waitQueueMember.timeoutController.clear(); - this.emitAndLog( - ConnectionPool.CONNECTION_CHECK_OUT_FAILED, - new ConnectionCheckOutFailedEvent(this, 'timeout') - ); - waitQueueMember.reject( - new WaitQueueTimeoutError( + this[kWaitQueue].push(waitQueueMember); + process.nextTick(() => this.processWaitQueue()); + + try { + return await Promise.race([promise, waitQueueMember.timeout]); + } catch (error) { + if (TimeoutError.is(error)) { + waitQueueMember[kCancelled] = true; + + waitQueueMember.timeout.clear(); + + this.emitAndLog( + ConnectionPool.CONNECTION_CHECK_OUT_FAILED, + new ConnectionCheckOutFailedEvent(this, 'timeout') + ); + const timeoutError = new WaitQueueTimeoutError( this.loadBalanced ? this.waitQueueErrorMetrics() : 'Timed out while checking out a connection from connection pool', this.address - ) - ); - }); - - this[kWaitQueue].push(waitQueueMember); - process.nextTick(() => this.processWaitQueue()); - - return await promise; + ); + throw timeoutError; + } + throw error; + } } /** @@ -758,7 +760,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new ConnectionCheckOutFailedEvent(this, reason, error) ); - waitQueueMember.timeoutController.clear(); + waitQueueMember.timeout.clear(); this[kWaitQueue].shift(); waitQueueMember.reject(error); continue; @@ -779,7 +781,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CHECKED_OUT, new ConnectionCheckedOutEvent(this, connection) ); - waitQueueMember.timeoutController.clear(); + waitQueueMember.timeout.clear(); this[kWaitQueue].shift(); waitQueueMember.resolve(connection); @@ -818,7 +820,7 @@ export class ConnectionPool extends TypedEventEmitter { waitQueueMember.resolve(connection); } - waitQueueMember.timeoutController.clear(); + waitQueueMember.timeout.clear(); } process.nextTick(() => this.processWaitQueue()); }); diff --git a/src/index.ts b/src/index.ts index f9188d5952c..bb83a774bf7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -547,6 +547,7 @@ export type { WithTransactionCallback } from './sessions'; export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort'; +export type { Timeout } from './timeout'; export type { Transaction, TransactionOptions, TxnState } from './transactions'; export type { BufferPool, @@ -555,7 +556,6 @@ export type { HostAddress, List, MongoDBCollectionNamespace, - MongoDBNamespace, - TimeoutController + MongoDBNamespace } from './utils'; export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern'; diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 8af37f5e0d7..73b0e92a09a 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -33,6 +33,7 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong import { TypedEventEmitter } from '../mongo_types'; import { ReadPreference, type ReadPreferenceLike } from '../read_preference'; import type { ClientSession } from '../sessions'; +import { Timeout, TimeoutError } from '../timeout'; import type { Transaction } from '../transactions'; import { type Callback, @@ -43,8 +44,7 @@ import { now, ns, promiseWithResolvers, - shuffle, - TimeoutController + shuffle } from '../utils'; import { _advanceClusterTime, @@ -107,7 +107,7 @@ export interface ServerSelectionRequest { resolve: (server: Server) => void; reject: (error: MongoError) => void; [kCancelled]?: boolean; - timeoutController: TimeoutController; + timeout: Timeout; operationName: string; waitingLogged: boolean; previousServer?: ServerDescription; @@ -178,6 +178,8 @@ export interface SelectServerOptions { session?: ClientSession; operationName: string; previousServer?: ServerDescription; + /** @internal*/ + timeout?: Timeout; } /** @public */ @@ -580,6 +582,7 @@ export class Topology extends TypedEventEmitter { } const { promise: serverPromise, resolve, reject } = promiseWithResolvers(); + const timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0); const waitQueueMember: ServerSelectionRequest = { serverSelector, topologyDescription: this.description, @@ -587,43 +590,49 @@ export class Topology extends TypedEventEmitter { transaction, resolve, reject, - timeoutController: new TimeoutController(options.serverSelectionTimeoutMS), + timeout, startTime: now(), operationName: options.operationName, waitingLogged: false, previousServer: options.previousServer }; - waitQueueMember.timeoutController.signal.addEventListener('abort', () => { - waitQueueMember[kCancelled] = true; - waitQueueMember.timeoutController.clear(); - const timeoutError = new MongoServerSelectionError( - `Server selection timed out after ${options.serverSelectionTimeoutMS} ms`, - this.description - ); - if ( - this.client.mongoLogger?.willLog( - MongoLoggableComponent.SERVER_SELECTION, - SeverityLevel.DEBUG - ) - ) { - this.client.mongoLogger?.debug( - MongoLoggableComponent.SERVER_SELECTION, - new ServerSelectionFailedEvent( - selector, - this.description, - timeoutError, - options.operationName - ) - ); - } - waitQueueMember.reject(timeoutError); - }); - this[kWaitQueue].push(waitQueueMember); processWaitQueue(this); - return await serverPromise; + try { + return await Promise.race([serverPromise, waitQueueMember.timeout]); + } catch (error) { + if (TimeoutError.is(error)) { + // Timeout + waitQueueMember[kCancelled] = true; + timeout.clear(); + const timeoutError = new MongoServerSelectionError( + `Server selection timed out after ${options.serverSelectionTimeoutMS} ms`, + this.description + ); + if ( + this.client.mongoLogger?.willLog( + MongoLoggableComponent.SERVER_SELECTION, + SeverityLevel.DEBUG + ) + ) { + this.client.mongoLogger?.debug( + MongoLoggableComponent.SERVER_SELECTION, + new ServerSelectionFailedEvent( + selector, + this.description, + timeoutError, + options.operationName + ) + ); + } + + throw timeoutError; + } + // Other server selection error + throw error; + } } /** * Update the internal TopologyDescription with a ServerDescription @@ -880,7 +889,7 @@ function drainWaitQueue(queue: List, drainError: MongoDr continue; } - waitQueueMember.timeoutController.clear(); + waitQueueMember.timeout.clear(); if (!waitQueueMember[kCancelled]) { if ( @@ -935,7 +944,7 @@ function processWaitQueue(topology: Topology) { ) : serverDescriptions; } catch (selectorError) { - waitQueueMember.timeoutController.clear(); + waitQueueMember.timeout.clear(); if ( topology.client.mongoLogger?.willLog( MongoLoggableComponent.SERVER_SELECTION, @@ -1023,7 +1032,7 @@ function processWaitQueue(topology: Topology) { transaction.pinServer(selectedServer); } - waitQueueMember.timeoutController.clear(); + waitQueueMember.timeout.clear(); if ( topology.client.mongoLogger?.willLog( diff --git a/src/timeout.ts b/src/timeout.ts new file mode 100644 index 00000000000..7191f981de4 --- /dev/null +++ b/src/timeout.ts @@ -0,0 +1,100 @@ +import { clearTimeout, setTimeout } from 'timers'; + +import { MongoInvalidArgumentError } from './error'; +import { noop } from './utils'; + +/** @internal */ +export class TimeoutError extends Error { + override get name(): 'TimeoutError' { + return 'TimeoutError'; + } + + constructor(message: string, options?: { cause?: Error }) { + super(message, options); + } + + static is(error: unknown): error is TimeoutError { + return ( + error != null && typeof error === 'object' && 'name' in error && error.name === 'TimeoutError' + ); + } +} + +type Executor = ConstructorParameters>[0]; +type Reject = Parameters>[0]>[1]; +/** + * @internal + * This class is an abstraction over timeouts + * The Timeout class can only be in the pending or rejected states. It is guaranteed not to resolve + * if interacted with exclusively through its public API + * */ +export class Timeout extends Promise { + get [Symbol.toStringTag](): 'MongoDBTimeout' { + return 'MongoDBTimeout'; + } + + private timeoutError: TimeoutError; + private id?: NodeJS.Timeout; + + public readonly start: number; + public ended: number | null = null; + public duration: number; + public timedOut = false; + + /** Create a new timeout that expires in `duration` ms */ + private constructor(executor: Executor = () => null, duration: number) { + let reject!: Reject; + + if (duration < 0) { + throw new MongoInvalidArgumentError('Cannot create a Timeout with a negative duration'); + } + + super((_, promiseReject) => { + reject = promiseReject; + + executor(noop, promiseReject); + }); + + // NOTE: Construct timeout error at point of Timeout instantiation to preserve stack traces + this.timeoutError = new TimeoutError(`Expired after ${duration}ms`); + + this.duration = duration; + this.start = Math.trunc(performance.now()); + + if (this.duration > 0) { + this.id = setTimeout(() => { + this.ended = Math.trunc(performance.now()); + this.timedOut = true; + reject(this.timeoutError); + }, this.duration); + // Ensure we do not keep the Node.js event loop running + if (typeof this.id.unref === 'function') { + this.id.unref(); + } + } + } + + /** + * Clears the underlying timeout. This method is idempotent + */ + clear(): void { + clearTimeout(this.id); + this.id = undefined; + } + + public static expires(durationMS: number): Timeout { + return new Timeout(undefined, durationMS); + } + + static is(timeout: unknown): timeout is Timeout { + return ( + typeof timeout === 'object' && + timeout != null && + Symbol.toStringTag in timeout && + timeout[Symbol.toStringTag] === 'MongoDBTimeout' && + 'then' in timeout && + // eslint-disable-next-line github/no-then + typeof timeout.then === 'function' + ); + } +} diff --git a/src/utils.ts b/src/utils.ts index b25b3ebb0fc..bf34a3d5196 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -3,7 +3,6 @@ import type { SrvRecord } from 'dns'; import { type EventEmitter } from 'events'; import { promises as fs } from 'fs'; import * as http from 'http'; -import { clearTimeout, setTimeout } from 'timers'; import * as url from 'url'; import { URL } from 'url'; import { promisify } from 'util'; @@ -1203,33 +1202,6 @@ export async function request( }); } -/** - * A custom AbortController that aborts after a specified timeout. - * - * If `timeout` is undefined or \<=0, the abort controller never aborts. - * - * This class provides two benefits over the built-in AbortSignal.timeout() method. - * - This class provides a mechanism for cancelling the timeout - * - This class supports infinite timeouts by interpreting a timeout of 0 as infinite. This is - * consistent with existing timeout options in the Node driver (serverSelectionTimeoutMS, for example). - * @internal - */ -export class TimeoutController extends AbortController { - constructor( - timeout = 0, - private timeoutId = timeout > 0 ? setTimeout(() => this.abort(), timeout) : null - ) { - super(); - } - - clear() { - if (this.timeoutId != null) { - clearTimeout(this.timeoutId); - } - this.timeoutId = null; - } -} - /** @internal */ export const DOCUMENT_DB_CHECK = /(\.docdb\.amazonaws\.com$)|(\.docdb-elastic\.amazonaws\.com$)/; /** @internal */ @@ -1344,3 +1316,7 @@ export async function fileIsAccessible(fileName: string, mode?: number) { return false; } } + +export function noop() { + return; +} diff --git a/test/mongodb.ts b/test/mongodb.ts index 9f9f1185eba..d6c78208695 100644 --- a/test/mongodb.ts +++ b/test/mongodb.ts @@ -201,6 +201,7 @@ export * from '../src/sdam/topology'; export * from '../src/sdam/topology_description'; export * from '../src/sessions'; export * from '../src/sort'; +export * from '../src/timeout'; export * from '../src/transactions'; export * from '../src/utils'; export * from '../src/write_concern'; diff --git a/test/unit/timeout.test.ts b/test/unit/timeout.test.ts new file mode 100644 index 00000000000..571050ce41d --- /dev/null +++ b/test/unit/timeout.test.ts @@ -0,0 +1,117 @@ +import { expect } from 'chai'; + +import { MongoInvalidArgumentError, Timeout, TimeoutError } from '../mongodb'; + +describe('Timeout', function () { + let timeout: Timeout; + + beforeEach(() => { + if (Timeout.is(timeout)) { + timeout.clear(); + } + }); + + describe('expires()', function () { + context('when called with a duration of 0', function () { + it('does not create a timeout instance (creates infinite timeout)', function () { + timeout = Timeout.expires(0); + expect(timeout).to.not.have.property('id'); + }); + }); + + context('when called with a duration greater than 0', function () { + beforeEach(() => { + timeout = Timeout.expires(2000); + }); + it('creates a timeout instance that will not keep the Node.js event loop active', function () { + expect(timeout).to.have.property('id'); + // @ts-expect-error: accessing private property + const id = timeout.id; + expect(id?.hasRef()).to.be.false; + }); + it('throws a TimeoutError when it expires', async function () { + try { + await timeout; + expect.fail('Expected to throw error'); + } catch (err) { + expect(err).to.be.instanceof(TimeoutError); + } + }); + }); + + context('when called with a duration less than 0', function () { + it('throws a MongoInvalidArgumentError', function () { + try { + timeout = Timeout.expires(-1); + expect.fail('Expected to throw error'); + } catch (error) { + expect(error).to.be.instanceof(MongoInvalidArgumentError); + } + }); + }); + }); + + describe('clear()', function () { + beforeEach(() => { + timeout = Timeout.expires(1000); + expect(timeout).to.have.property('id').that.is.not.undefined; + }); + context('when called on a non-expired timeout with a non-zero duration', function () { + it('clears the underlying NodeJS.Timeout instance', function () { + timeout.clear(); + expect(timeout).to.have.property('id').that.is.undefined; + }); + }); + }); + + describe('is()', function () { + context('when called on a Timeout instance', function () { + it('returns true', function () { + expect(Timeout.is(Timeout.expires(0))).to.be.true; + }); + }); + + context('when called on a nullish object ', function () { + it('returns false', function () { + expect(Timeout.is(undefined)).to.be.false; + expect(Timeout.is(null)).to.be.false; + }); + }); + + context('when called on a primitive type', function () { + it('returns false', function () { + expect(Timeout.is(1)).to.be.false; + expect(Timeout.is('hello')).to.be.false; + expect(Timeout.is(true)).to.be.false; + expect(Timeout.is(1n)).to.be.false; + expect(Timeout.is(Symbol.for('test'))).to.be.false; + }); + }); + + context('when called on a Promise-like object with a matching toStringTag', function () { + it('returns true', function () { + const timeoutLike = { + [Symbol.toStringTag]: 'MongoDBTimeout', + then() { + return 0; + } + }; + + expect(Timeout.is(timeoutLike)).to.be.true; + }); + }); + + context('when called on a Promise-like object without a matching toStringTag', function () { + it('returns false', function () { + const timeoutLike = { + [Symbol.toStringTag]: 'lol', + then() { + return 0; + } + }; + + expect(Timeout.is(timeoutLike)).to.be.false; + }); + }); + }); +}); diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index d36ddbeb3be..802b9bc5645 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1,5 +1,4 @@ import { expect } from 'chai'; -import * as sinon from 'sinon'; import { BufferPool, @@ -15,10 +14,8 @@ import { MongoDBNamespace, MongoRuntimeError, ObjectId, - shuffle, - TimeoutController + shuffle } from '../mongodb'; -import { createTimerSandbox } from './timer_sandbox'; describe('driver utils', function () { describe('.hostMatchesWildcards', function () { @@ -984,65 +981,4 @@ describe('driver utils', function () { }); }); }); - - describe('class TimeoutController', () => { - let timerSandbox, clock, spy; - - beforeEach(function () { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers(); - spy = sinon.spy(); - }); - - afterEach(function () { - clock.restore(); - timerSandbox.restore(); - }); - - describe('constructor', () => { - it('when no timeout is provided, it creates an infinite timeout', () => { - const controller = new TimeoutController(); - // @ts-expect-error Accessing a private field on TimeoutController - expect(controller.timeoutId).to.be.null; - }); - - it('when timeout is 0, it creates an infinite timeout', () => { - const controller = new TimeoutController(0); - // @ts-expect-error Accessing a private field on TimeoutController - expect(controller.timeoutId).to.be.null; - }); - - it('when timeout <0, it creates an infinite timeout', () => { - const controller = new TimeoutController(-5); - // @ts-expect-error Accessing a private field on TimeoutController - expect(controller.timeoutId).to.be.null; - }); - - context('when timeout > 0', () => { - let timeoutController: TimeoutController; - - beforeEach(function () { - timeoutController = new TimeoutController(3000); - timeoutController.signal.addEventListener('abort', spy); - }); - - afterEach(function () { - timeoutController.clear(); - }); - - it('it creates a timeout', () => { - // @ts-expect-error Accessing a private field on TimeoutController - expect(timeoutController.timeoutId).not.to.be.null; - }); - - it('times out after `timeout` milliseconds', () => { - expect(spy, 'spy was called after creation').not.to.have.been.called; - clock.tick(2999); - expect(spy, 'spy was called before 3000ms has expired').not.to.have.been.called; - clock.tick(1); - expect(spy, 'spy was not called after 3000ms').to.have.been.called; - }); - }); - }); - }); });