diff --git a/.gitignore b/.gitignore index d0fdc406efd..36a3aac2c16 100644 --- a/.gitignore +++ b/.gitignore @@ -103,6 +103,7 @@ secrets-export.fish mo-expansion.sh mo-expansion.yml expansions.sh +uri.txt .drivers-tools/ diff --git a/.mocharc.js b/.mocharc.js index a289ec1a9de..13d28e27c2f 100644 --- a/.mocharc.js +++ b/.mocharc.js @@ -7,6 +7,7 @@ module.exports = { require: [ 'source-map-support/register', 'ts-node/register', + 'test/tools/runner/throw_rejections.cjs', 'test/tools/runner/chai_addons.ts', 'test/tools/runner/ee_checker.ts' ], diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index cf9ea8293dd..791fab6f584 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -1146,50 +1146,59 @@ class ReadableCursorStream extends Readable { return; } - this._cursor.next().then( - result => { - if (result == null) { - this.push(null); - } else if (this.destroyed) { - this._cursor.close().then(undefined, squashError); - } else { - if (this.push(result)) { - return this._readNext(); + this._cursor + .next() + .then( + // result from next() + result => { + if (result == null) { + this.push(null); + } else if (this.destroyed) { + this._cursor.close().then(undefined, squashError); + } else { + if (this.push(result)) { + return this._readNext(); + } + + this._readInProgress = false; + } + }, + // error from next() + err => { + // NOTE: This is questionable, but we have a test backing the behavior. It seems the + // desired behavior is that a stream ends cleanly when a user explicitly closes + // a client during iteration. Alternatively, we could do the "right" thing and + // propagate the error message by removing this special case. + if (err.message.match(/server is closed/)) { + this._cursor.close().then(undefined, squashError); + return this.push(null); } - this._readInProgress = false; - } - }, - err => { - // NOTE: This is questionable, but we have a test backing the behavior. It seems the - // desired behavior is that a stream ends cleanly when a user explicitly closes - // a client during iteration. Alternatively, we could do the "right" thing and - // propagate the error message by removing this special case. - if (err.message.match(/server is closed/)) { - this._cursor.close().then(undefined, squashError); - return this.push(null); - } + // NOTE: This is also perhaps questionable. The rationale here is that these errors tend + // to be "operation was interrupted", where a cursor has been closed but there is an + // active getMore in-flight. This used to check if the cursor was killed but once + // that changed to happen in cleanup legitimate errors would not destroy the + // stream. There are change streams test specifically test these cases. + if (err.message.match(/operation was interrupted/)) { + return this.push(null); + } - // NOTE: This is also perhaps questionable. The rationale here is that these errors tend - // to be "operation was interrupted", where a cursor has been closed but there is an - // active getMore in-flight. This used to check if the cursor was killed but once - // that changed to happen in cleanup legitimate errors would not destroy the - // stream. There are change streams test specifically test these cases. - if (err.message.match(/operation was interrupted/)) { - return this.push(null); + // NOTE: The two above checks on the message of the error will cause a null to be pushed + // to the stream, thus closing the stream before the destroy call happens. This means + // that either of those error messages on a change stream will not get a proper + // 'error' event to be emitted (the error passed to destroy). Change stream resumability + // relies on that error event to be emitted to create its new cursor and thus was not + // working on 4.4 servers because the error emitted on failover was "interrupted at + // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down". + // See NODE-4475. + return this.destroy(err); } - - // NOTE: The two above checks on the message of the error will cause a null to be pushed - // to the stream, thus closing the stream before the destroy call happens. This means - // that either of those error messages on a change stream will not get a proper - // 'error' event to be emitted (the error passed to destroy). Change stream resumability - // relies on that error event to be emitted to create its new cursor and thus was not - // working on 4.4 servers because the error emitted on failover was "interrupted at - // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down". - // See NODE-4475. - return this.destroy(err); - } - ); + ) + // if either of the above handlers throw + .catch(error => { + this._readInProgress = false; + this.destroy(error); + }); } } diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 55a1765b24b..c6798316974 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -48,6 +48,7 @@ import { maxWireVersion, type MongoDBNamespace, noop, + squashError, supportsRetryableWrites } from '../utils'; import { throwIfWriteConcernError } from '../write_concern'; @@ -345,9 +346,10 @@ export class Server extends TypedEventEmitter { operationError instanceof MongoError && operationError.code === MONGODB_ERROR_CODES.Reauthenticate ) { - reauthPromise = this.pool.reauthenticate(conn).catch(error => { + reauthPromise = this.pool.reauthenticate(conn); + reauthPromise.then(undefined, error => { reauthPromise = null; - throw error; + squashError(error); }); await abortable(reauthPromise, options); @@ -368,9 +370,10 @@ export class Server extends TypedEventEmitter { if (session?.pinnedConnection !== conn) { if (reauthPromise != null) { // The reauth promise only exists if it hasn't thrown. - void reauthPromise.finally(() => { + const checkBackIn = () => { this.pool.checkIn(conn); - }); + }; + void reauthPromise.then(checkBackIn, checkBackIn); } else { this.pool.checkIn(conn); } diff --git a/src/timeout.ts b/src/timeout.ts index 3b1dbcb2346..26e04943f6a 100644 --- a/src/timeout.ts +++ b/src/timeout.ts @@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers'; import { type Document } from './bson'; import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error'; import { type ClientSession } from './sessions'; -import { csotMin, noop } from './utils'; +import { csotMin, noop, squashError } from './utils'; /** @internal */ export class TimeoutError extends Error { @@ -102,7 +102,13 @@ export class Timeout extends Promise { } throwIfExpired(): void { - if (this.timedOut) throw new TimeoutError('Timed out', { duration: this.duration }); + if (this.timedOut) { + // This method is invoked when someone wants to throw immediately instead of await the result of this promise + // Since they won't be handling the rejection from the promise (because we're about to throw here) + // attach handling to prevent this from bubbling up to Node.js + this.then(undefined, squashError); + throw new TimeoutError('Timed out', { duration: this.duration }); + } } public static expires(duration: number, unref?: true): Timeout { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a115caf29e6..f8aabb83215 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -819,6 +819,7 @@ describe('Change Streams', function () { const write = lastWrite(); const nextP = changeStream.next(); + nextP.catch(() => null); await changeStream.close(); diff --git a/test/integration/client-side-operations-timeout/node_csot.test.ts b/test/integration/client-side-operations-timeout/node_csot.test.ts index ec69dcc1b7b..b544917229e 100644 --- a/test/integration/client-side-operations-timeout/node_csot.test.ts +++ b/test/integration/client-side-operations-timeout/node_csot.test.ts @@ -1023,7 +1023,8 @@ describe('CSOT driver tests', metadata, () => { beforeEach(async function () { cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 }); - const _changePromise = once(cs, 'change'); + cs.once('change', () => null); + await once(cs.cursor, 'init'); await internalClient.db().admin().command(failpoint); diff --git a/test/integration/node-specific/abort_signal.test.ts b/test/integration/node-specific/abort_signal.test.ts index a7527479382..7aa59a88ff6 100644 --- a/test/integration/node-specific/abort_signal.test.ts +++ b/test/integration/node-specific/abort_signal.test.ts @@ -753,7 +753,7 @@ describe('AbortSignal support', () => { if (args[1].find != null) { commandStub.restore(); controller.abort(); - throw new ReAuthenticationError({}); + throw new ReAuthenticationError({ message: 'This is a fake reauthentication error' }); } return commandStub.wrappedMethod.apply(this, args); }); @@ -792,8 +792,9 @@ describe('AbortSignal support', () => { describe('if reauth throws', () => { beforeEach(() => { sinon.stub(ConnectionPool.prototype, 'reauthenticate').callsFake(async function () { + const error = new Error('Rejecting reauthenticate for testing'); await sleep(1000); - throw new Error(); + throw error; }); }); diff --git a/test/integration/node-specific/examples/change_streams.test.js b/test/integration/node-specific/examples/change_streams.test.js index 5285da5cf14..90737e2ddfc 100644 --- a/test/integration/node-specific/examples/change_streams.test.js +++ b/test/integration/node-specific/examples/change_streams.test.js @@ -60,7 +60,9 @@ maybeDescribe('examples(change-stream):', function () { it('Open A Change Stream', { metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } }, test: async function () { - const looper = new Looper(() => db.collection('inventory').insertOne({ a: 1 })); + const looper = new Looper(async () => { + await db.collection('inventory').insertOne({ a: 1 }); + }); looper.run(); // Start Changestream Example 1 diff --git a/test/integration/shared.js b/test/integration/shared.js index 6bbbff6beb0..5019e4fad9c 100644 --- a/test/integration/shared.js +++ b/test/integration/shared.js @@ -90,32 +90,25 @@ function ignoreNsNotFound(err) { if (!err.message.match(/ns not found/)) throw err; } -function setupDatabase(configuration, dbsToClean) { +async function setupDatabase(configuration, dbsToClean) { dbsToClean = Array.isArray(dbsToClean) ? dbsToClean : []; - var configDbName = configuration.db; - var client = configuration.newClient(configuration.writeConcernMax(), { - maxPoolSize: 1 - }); + const configDbName = configuration.db; dbsToClean.push(configDbName); - return client - .connect() - .then(() => - dbsToClean.reduce( - (result, dbName) => - result - .then(() => - client.db(dbName).command({ dropAllUsersFromDatabase: 1, writeConcern: { w: 1 } }) - ) - .then(() => client.db(dbName).dropDatabase({ writeConcern: { w: 1 } })), - Promise.resolve() - ) - ) - .then( - () => client.close(), - err => client.close(() => Promise.reject(err)) - ); + const client = configuration.newClient(); + try { + for (const dbName of dbsToClean) { + const db = await client.db(dbName); + for await (const { name } of db.listCollections({}, { nameOnly: true })) { + const collection = db.collection(name); + await collection.deleteMany({}).catch(() => null); + await collection.drop().catch(() => null); + } + } + } finally { + await client.close(); + } } /** diff --git a/test/manual/mocharc.js b/test/manual/mocharc.js index 04c59052424..fcb5e647971 100644 --- a/test/manual/mocharc.js +++ b/test/manual/mocharc.js @@ -4,7 +4,11 @@ const [major] = process.versions.node.split('.'); /** @type {import("mocha").MochaOptions} */ module.exports = { - require: ['ts-node/register', 'test/tools/runner/chai_addons.ts'], + require: [ + 'ts-node/register', + 'test/tools/runner/throw_rejections.cjs', + 'test/tools/runner/chai_addons.ts' + ], reporter: 'test/tools/reporter/mongodb_reporter.js', failZero: true, color: true, diff --git a/test/mocha_lambda.js b/test/mocha_lambda.js index cb1079aed33..b5d117b179a 100644 --- a/test/mocha_lambda.js +++ b/test/mocha_lambda.js @@ -4,7 +4,10 @@ const [major] = process.versions.node.split('.'); /** @type {import("mocha").MochaOptions} */ module.exports = { - require: ['test/integration/node-specific/examples/setup.js'], + require: [ + 'test/tools/runner/throw_rejections.cjs', + 'test/integration/node-specific/examples/setup.js' + ], extension: ['js'], ui: 'test/tools/runner/metadata_ui.js', recursive: true, diff --git a/test/mocha_mongodb.js b/test/mocha_mongodb.js index eeb18ae4f8d..b33ea65622f 100644 --- a/test/mocha_mongodb.js +++ b/test/mocha_mongodb.js @@ -7,6 +7,7 @@ module.exports = { require: [ 'source-map-support/register', 'ts-node/register', + 'test/tools/runner/throw_rejections.cjs', 'test/tools/runner/chai_addons.ts', 'test/tools/runner/ee_checker.ts', 'test/tools/runner/hooks/configuration.ts', diff --git a/test/tools/runner/throw_rejections.cjs b/test/tools/runner/throw_rejections.cjs new file mode 100644 index 00000000000..7dcf448c5d6 --- /dev/null +++ b/test/tools/runner/throw_rejections.cjs @@ -0,0 +1,6 @@ +// eslint-disable-next-line @typescript-eslint/no-require-imports +const process = require('process'); + +process.on('unhandledRejection', error => { + throw error; +}); diff --git a/test/unit/assorted/optional_require.test.js b/test/unit/assorted/optional_require.test.js deleted file mode 100644 index 754ab4e5035..00000000000 --- a/test/unit/assorted/optional_require.test.js +++ /dev/null @@ -1,69 +0,0 @@ -'use strict'; - -const { expect } = require('chai'); -const { existsSync } = require('fs'); -const { resolve } = require('path'); - -const { compress } = require('../../mongodb'); -const { GSSAPI } = require('../../mongodb'); -const { AuthContext } = require('../../mongodb'); -const { MongoDBAWS } = require('../../mongodb'); -const { HostAddress } = require('../../mongodb'); - -function moduleExistsSync(moduleName) { - return existsSync(resolve(__dirname, `../../../node_modules/${moduleName}`)); -} - -describe('optionalRequire', function () { - context('Snappy', function () { - it('should error if not installed', function () { - const moduleName = 'snappy'; - if (moduleExistsSync(moduleName)) { - return this.skip(); - } - compress( - { - options: { - agreedCompressor: 'snappy' - } - }, - Buffer.alloc(1), - error => { - expect(error).to.exist; - expect(error.message).includes('not found'); - } - ); - }); - }); - - context('Kerberos', function () { - it('should error if not installed', function () { - const moduleName = 'kerberos'; - if (moduleExistsSync(moduleName)) { - return this.skip(); - } - const gssapi = new GSSAPI(); - gssapi.auth( - new AuthContext(null, true, { hostAddress: new HostAddress('a'), credentials: true }), - error => { - expect(error).to.exist; - expect(error.message).includes('not found'); - } - ); - }); - }); - - context('aws4', function () { - it('should error if not installed', function () { - const moduleName = 'aws4'; - if (moduleExistsSync(moduleName)) { - return this.skip(); - } - const mdbAWS = new MongoDBAWS(); - mdbAWS.auth(new AuthContext({ hello: { maxWireVersion: 9 } }, true, null), error => { - expect(error).to.exist; - expect(error.message).includes('not found'); - }); - }); - }); -}); diff --git a/test/unit/assorted/optional_require.test.ts b/test/unit/assorted/optional_require.test.ts new file mode 100644 index 00000000000..83eb32cfc24 --- /dev/null +++ b/test/unit/assorted/optional_require.test.ts @@ -0,0 +1,72 @@ +import { expect } from 'chai'; +import { existsSync } from 'fs'; +import { resolve } from 'path'; + +import { + AuthContext, + compress, + GSSAPI, + HostAddress, + MongoDBAWS, + MongoMissingDependencyError +} from '../../mongodb'; + +function moduleExistsSync(moduleName) { + return existsSync(resolve(__dirname, `../../../node_modules/${moduleName}`)); +} + +describe('optionalRequire', function () { + describe('Snappy', function () { + it('should error if not installed', async function () { + const moduleName = 'snappy'; + if (moduleExistsSync(moduleName)) { + return this.skip(); + } + + const error = await compress( + { zlibCompressionLevel: 0, agreedCompressor: 'snappy' }, + Buffer.alloc(1) + ).then( + () => null, + e => e + ); + + expect(error).to.be.instanceOf(MongoMissingDependencyError); + }); + }); + + describe('Kerberos', function () { + it('should error if not installed', async function () { + const moduleName = 'kerberos'; + if (moduleExistsSync(moduleName)) { + return this.skip(); + } + const gssapi = new GSSAPI(); + + const error = await gssapi + .auth(new AuthContext(null, true, { hostAddress: new HostAddress('a'), credentials: true })) + .then( + () => null, + e => e + ); + + expect(error).to.be.instanceOf(MongoMissingDependencyError); + }); + }); + + describe('aws4', function () { + it('should error if not installed', async function () { + const moduleName = 'aws4'; + if (moduleExistsSync(moduleName)) { + return this.skip(); + } + const mdbAWS = new MongoDBAWS(); + + const error = await mdbAWS + .auth(new AuthContext({ hello: { maxWireVersion: 9 } }, true, null)) + .catch(error => error); + + expect(error).to.be.instanceOf(MongoMissingDependencyError); + }); + }); +}); diff --git a/test/unit/cmap/auth/auth_provider.test.ts b/test/unit/cmap/auth/auth_provider.test.ts index 96d49c650c3..7c9b530e69f 100644 --- a/test/unit/cmap/auth/auth_provider.test.ts +++ b/test/unit/cmap/auth/auth_provider.test.ts @@ -1,19 +1,26 @@ import { expect } from 'chai'; -import { AuthProvider, MongoRuntimeError } from '../../../mongodb'; +import { type AuthContext, AuthProvider, MongoRuntimeError } from '../../../mongodb'; describe('AuthProvider', function () { describe('#reauth', function () { context('when the provider is already reauthenticating', function () { - const provider = new AuthProvider(); + const provider = new (class extends AuthProvider { + override auth(_context: AuthContext): Promise { + throw new Error('Method not implemented.'); + } + })(); + const context = { reauthenticating: true }; - it('returns an error', function () { - provider.reauth(context, error => { - expect(error).to.exist; - expect(error).to.be.instanceOf(MongoRuntimeError); - expect(error?.message).to.equal('Reauthentication already in progress.'); - }); + it('returns an error', async function () { + const error = await provider.reauth(context).then( + () => null, + error => error + ); + expect(error).to.exist; + expect(error).to.be.instanceOf(MongoRuntimeError); + expect(error?.message).to.equal('Reauthentication already in progress.'); }); }); });