Skip to content

refactor(NODE-5679): introduce timeout abstraction and use for server selection and connection check out #4078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import {
type Callback,
List,
makeCounter,
promiseWithResolvers,
TimeoutController
} from '../utils';
import { CSOTError, Timeout } from '../timeout';
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -107,7 +102,7 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
export interface WaitQueueMember {
resolve: (conn: Connection) => void;
reject: (err: AnyError) => void;
timeoutController: TimeoutController;
timeout: Timeout;
[kCancelled]?: boolean;
}

Expand Down Expand Up @@ -149,6 +144,7 @@ export type ConnectionPoolEvents = {
export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
options: Readonly<ConnectionPoolOptions>;
[kPoolState]: (typeof PoolState)[keyof typeof PoolState];
/** @internal */
[kServer]: Server;
[kConnections]: List<Connection>;
[kPending]: number;
Expand Down Expand Up @@ -368,33 +364,39 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();

const timeout = Timeout.expires(waitQueueTimeoutMS);

const waitQueueMember: WaitQueueMember = {
resolve,
reject,
timeoutController: new TimeoutController(waitQueueTimeoutMS)
timeout
};
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.reject(
new WaitQueueTimeoutError(
this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

try {
return await Promise.race([promise, waitQueueMember.timeout]);
} catch (error) {
if (error instanceof CSOTError) {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeout.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
const timeoutError = new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
: 'Timed out while checking out a connection from connection pool',
this.address
)
);
});

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

return await promise;
);
throw timeoutError;
}
throw error;
}
}

/**
Expand Down Expand Up @@ -758,7 +760,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();
this[kWaitQueue].shift();
waitQueueMember.reject(error);
continue;
Expand All @@ -779,7 +781,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();

this[kWaitQueue].shift();
waitQueueMember.resolve(connection);
Expand Down Expand Up @@ -818,7 +820,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
waitQueueMember.resolve(connection);
}

waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();
}
process.nextTick(() => this.processWaitQueue());
});
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ export type {
WithTransactionCallback
} from './sessions';
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
export type { Timeout } from './timeout';
export type { Transaction, TransactionOptions, TxnState } from './transactions';
export type {
BufferPool,
Expand All @@ -555,7 +556,6 @@ export type {
HostAddress,
List,
MongoDBCollectionNamespace,
MongoDBNamespace,
TimeoutController
MongoDBNamespace
} from './utils';
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';
77 changes: 43 additions & 34 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mong
import { TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { CSOTError, Timeout } from '../timeout';
import type { Transaction } from '../transactions';
import {
type Callback,
Expand All @@ -43,8 +44,7 @@ import {
now,
ns,
promiseWithResolvers,
shuffle,
TimeoutController
shuffle
} from '../utils';
import {
_advanceClusterTime,
Expand Down Expand Up @@ -107,7 +107,7 @@ export interface ServerSelectionRequest {
resolve: (server: Server) => void;
reject: (error: MongoError) => void;
[kCancelled]?: boolean;
timeoutController: TimeoutController;
timeout: Timeout;
operationName: string;
waitingLogged: boolean;
previousServer?: ServerDescription;
Expand Down Expand Up @@ -178,6 +178,8 @@ export interface SelectServerOptions {
session?: ClientSession;
operationName: string;
previousServer?: ServerDescription;
/** @internal*/
timeout?: Timeout;
}

/** @public */
Expand Down Expand Up @@ -580,50 +582,57 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>();
const timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0);
const waitQueueMember: ServerSelectionRequest = {
serverSelector,
topologyDescription: this.description,
mongoLogger: this.client.mongoLogger,
transaction,
resolve,
reject,
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
timeout,
startTime: now(),
operationName: options.operationName,
waitingLogged: false,
previousServer: options.previousServer
};

waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`,
this.description
);
if (
this.client.mongoLogger?.willLog(
MongoLoggableComponent.SERVER_SELECTION,
SeverityLevel.DEBUG
)
) {
this.client.mongoLogger?.debug(
MongoLoggableComponent.SERVER_SELECTION,
new ServerSelectionFailedEvent(
selector,
this.description,
timeoutError,
options.operationName
)
);
}
waitQueueMember.reject(timeoutError);
});

this[kWaitQueue].push(waitQueueMember);
processWaitQueue(this);

return await serverPromise;
try {
return await Promise.race([serverPromise, waitQueueMember.timeout]);
} catch (error) {
if (error instanceof CSOTError) {
// Timeout
waitQueueMember[kCancelled] = true;
timeout.clear();
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`,
this.description
);
if (
this.client.mongoLogger?.willLog(
MongoLoggableComponent.SERVER_SELECTION,
SeverityLevel.DEBUG
)
) {
this.client.mongoLogger?.debug(
MongoLoggableComponent.SERVER_SELECTION,
new ServerSelectionFailedEvent(
selector,
this.description,
timeoutError,
options.operationName
)
);
}

throw timeoutError;
}
// Other server selection error
throw error;
}
}
/**
* Update the internal TopologyDescription with a ServerDescription
Expand Down Expand Up @@ -880,7 +889,7 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, drainError: MongoDr
continue;
}

waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();

if (!waitQueueMember[kCancelled]) {
if (
Expand Down Expand Up @@ -935,7 +944,7 @@ function processWaitQueue(topology: Topology) {
)
: serverDescriptions;
} catch (selectorError) {
waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();
if (
topology.client.mongoLogger?.willLog(
MongoLoggableComponent.SERVER_SELECTION,
Expand Down Expand Up @@ -1023,7 +1032,7 @@ function processWaitQueue(topology: Topology) {
transaction.pinServer(selectedServer);
}

waitQueueMember.timeoutController.clear();
waitQueueMember.timeout.clear();

if (
topology.client.mongoLogger?.willLog(
Expand Down
Loading
Loading