Skip to content

fix(NODE-6845): ensure internal rejections are handled #4448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ secrets-export.fish
mo-expansion.sh
mo-expansion.yml
expansions.sh
uri.txt

.drivers-tools/

Expand Down
1 change: 1 addition & 0 deletions .mocharc.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module.exports = {
require: [
'source-map-support/register',
'ts-node/register',
'test/tools/runner/throw_rejections.cjs',
'test/tools/runner/chai_addons.ts',
'test/tools/runner/ee_checker.ts'
],
Expand Down
89 changes: 49 additions & 40 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1146,50 +1146,59 @@ class ReadableCursorStream extends Readable {
return;
}

this._cursor.next().then(
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().then(undefined, squashError);
} else {
if (this.push(result)) {
return this._readNext();
this._cursor
.next()
.then(
// result from next()
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().then(undefined, squashError);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
},
// error from next()
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
// propagate the error message by removing this special case.
if (err.message.match(/server is closed/)) {
this._cursor.close().then(undefined, squashError);
return this.push(null);
}

this._readInProgress = false;
}
},
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
// propagate the error message by removing this special case.
if (err.message.match(/server is closed/)) {
this._cursor.close().then(undefined, squashError);
return this.push(null);
}
// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
// to be "operation was interrupted", where a cursor has been closed but there is an
// active getMore in-flight. This used to check if the cursor was killed but once
// that changed to happen in cleanup legitimate errors would not destroy the
// stream. There are change streams test specifically test these cases.
if (err.message.match(/operation was interrupted/)) {
return this.push(null);
}

// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
// to be "operation was interrupted", where a cursor has been closed but there is an
// active getMore in-flight. This used to check if the cursor was killed but once
// that changed to happen in cleanup legitimate errors would not destroy the
// stream. There are change streams test specifically test these cases.
if (err.message.match(/operation was interrupted/)) {
return this.push(null);
// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475.
return this.destroy(err);
}

// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475.
return this.destroy(err);
}
);
)
// if either of the above handlers throw
.catch(error => {
this._readInProgress = false;
this.destroy(error);
});
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
maxWireVersion,
type MongoDBNamespace,
noop,
squashError,
supportsRetryableWrites
} from '../utils';
import { throwIfWriteConcernError } from '../write_concern';
Expand Down Expand Up @@ -345,9 +346,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
operationError instanceof MongoError &&
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
) {
reauthPromise = this.pool.reauthenticate(conn).catch(error => {
reauthPromise = this.pool.reauthenticate(conn);
reauthPromise.then(undefined, error => {
reauthPromise = null;
throw error;
squashError(error);
});

await abortable(reauthPromise, options);
Expand All @@ -368,9 +370,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
if (session?.pinnedConnection !== conn) {
if (reauthPromise != null) {
// The reauth promise only exists if it hasn't thrown.
void reauthPromise.finally(() => {
const checkBackIn = () => {
this.pool.checkIn(conn);
});
};
void reauthPromise.then(checkBackIn, checkBackIn);
} else {
this.pool.checkIn(conn);
}
Expand Down
10 changes: 8 additions & 2 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';
import { type Document } from './bson';
import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error';
import { type ClientSession } from './sessions';
import { csotMin, noop } from './utils';
import { csotMin, noop, squashError } from './utils';

/** @internal */
export class TimeoutError extends Error {
Expand Down Expand Up @@ -102,7 +102,13 @@ export class Timeout extends Promise<never> {
}

throwIfExpired(): void {
if (this.timedOut) throw new TimeoutError('Timed out', { duration: this.duration });
if (this.timedOut) {
// This method is invoked when someone wants to throw immediately instead of await the result of this promise
// Since they won't be handling the rejection from the promise (because we're about to throw here)
// attach handling to prevent this from bubbling up to Node.js
this.then(undefined, squashError);
throw new TimeoutError('Timed out', { duration: this.duration });
}
}

public static expires(duration: number, unref?: true): Timeout {
Expand Down
1 change: 1 addition & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ describe('Change Streams', function () {
const write = lastWrite();

const nextP = changeStream.next();
nextP.catch(() => null);

await changeStream.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,8 @@ describe('CSOT driver tests', metadata, () => {

beforeEach(async function () {
cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 });
const _changePromise = once(cs, 'change');
cs.once('change', () => null);

await once(cs.cursor, 'init');

await internalClient.db().admin().command(failpoint);
Expand Down
5 changes: 3 additions & 2 deletions test/integration/node-specific/abort_signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ describe('AbortSignal support', () => {
if (args[1].find != null) {
commandStub.restore();
controller.abort();
throw new ReAuthenticationError({});
throw new ReAuthenticationError({ message: 'This is a fake reauthentication error' });
}
return commandStub.wrappedMethod.apply(this, args);
});
Expand Down Expand Up @@ -792,8 +792,9 @@ describe('AbortSignal support', () => {
describe('if reauth throws', () => {
beforeEach(() => {
sinon.stub(ConnectionPool.prototype, 'reauthenticate').callsFake(async function () {
const error = new Error('Rejecting reauthenticate for testing');
await sleep(1000);
throw new Error();
throw error;
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ maybeDescribe('examples(change-stream):', function () {
it('Open A Change Stream', {
metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } },
test: async function () {
const looper = new Looper(() => db.collection('inventory').insertOne({ a: 1 }));
const looper = new Looper(async () => {
await db.collection('inventory').insertOne({ a: 1 });
});
looper.run();

// Start Changestream Example 1
Expand Down
37 changes: 15 additions & 22 deletions test/integration/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,32 +90,25 @@ function ignoreNsNotFound(err) {
if (!err.message.match(/ns not found/)) throw err;
}

function setupDatabase(configuration, dbsToClean) {
async function setupDatabase(configuration, dbsToClean) {
dbsToClean = Array.isArray(dbsToClean) ? dbsToClean : [];
var configDbName = configuration.db;
var client = configuration.newClient(configuration.writeConcernMax(), {
maxPoolSize: 1
});
const configDbName = configuration.db;

dbsToClean.push(configDbName);

return client
.connect()
.then(() =>
dbsToClean.reduce(
(result, dbName) =>
result
.then(() =>
client.db(dbName).command({ dropAllUsersFromDatabase: 1, writeConcern: { w: 1 } })
)
.then(() => client.db(dbName).dropDatabase({ writeConcern: { w: 1 } })),
Promise.resolve()
)
)
.then(
() => client.close(),
err => client.close(() => Promise.reject(err))
);
const client = configuration.newClient();
try {
for (const dbName of dbsToClean) {
const db = await client.db(dbName);
for await (const { name } of db.listCollections({}, { nameOnly: true })) {
const collection = db.collection(name);
await collection.deleteMany({}).catch(() => null);
await collection.drop().catch(() => null);
}
}
} finally {
await client.close();
}
}

/**
Expand Down
6 changes: 5 additions & 1 deletion test/manual/mocharc.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ const [major] = process.versions.node.split('.');

/** @type {import("mocha").MochaOptions} */
module.exports = {
require: ['ts-node/register', 'test/tools/runner/chai_addons.ts'],
require: [
'ts-node/register',
'test/tools/runner/throw_rejections.cjs',
'test/tools/runner/chai_addons.ts'
],
reporter: 'test/tools/reporter/mongodb_reporter.js',
failZero: true,
color: true,
Expand Down
5 changes: 4 additions & 1 deletion test/mocha_lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ const [major] = process.versions.node.split('.');

/** @type {import("mocha").MochaOptions} */
module.exports = {
require: ['test/integration/node-specific/examples/setup.js'],
require: [
'test/tools/runner/throw_rejections.cjs',
'test/integration/node-specific/examples/setup.js'
],
extension: ['js'],
ui: 'test/tools/runner/metadata_ui.js',
recursive: true,
Expand Down
1 change: 1 addition & 0 deletions test/mocha_mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module.exports = {
require: [
'source-map-support/register',
'ts-node/register',
'test/tools/runner/throw_rejections.cjs',
'test/tools/runner/chai_addons.ts',
'test/tools/runner/ee_checker.ts',
'test/tools/runner/hooks/configuration.ts',
Expand Down
6 changes: 6 additions & 0 deletions test/tools/runner/throw_rejections.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// eslint-disable-next-line @typescript-eslint/no-require-imports
const process = require('process');

process.on('unhandledRejection', error => {
throw error;
});
69 changes: 0 additions & 69 deletions test/unit/assorted/optional_require.test.js

This file was deleted.

Loading