Skip to content

Commit 5915c5a

Browse files
committed
feat(NODE-6337): implement client bulk write batching
1 parent bb3bae8 commit 5915c5a

File tree

14 files changed

+622
-197
lines changed

14 files changed

+622
-197
lines changed

src/cmap/commands.ts

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,66 @@ export interface OpMsgOptions {
429429

430430
/** @internal */
431431
export class DocumentSequence {
432+
field: string;
432433
documents: Document[];
434+
serializedDocumentsLength: number;
435+
private chunks: Uint8Array[];
436+
private header?: Buffer;
433437

434-
constructor(documents: Document[]) {
435-
this.documents = documents;
438+
/**
439+
* Create a new document sequence for the provided field.
440+
* @param field - The field it will replace.
441+
*/
442+
constructor(field: string, documents?: Document[]) {
443+
this.field = field;
444+
this.documents = [];
445+
this.chunks = [];
446+
this.serializedDocumentsLength = 0;
447+
this.init();
448+
if (documents) {
449+
for (const doc of documents) {
450+
this.push(doc, BSON.serialize(doc));
451+
}
452+
}
453+
}
454+
455+
/**
456+
* Initialize the buffer chunks.
457+
*/
458+
private init() {
459+
// Document sequences starts with type 1 at the first byte.
460+
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
461+
buffer[0] = 1;
462+
// Third part is the field name at offset 5 with trailing null byte.
463+
encodeUTF8Into(buffer, `${this.field}\0`, 5);
464+
this.chunks.push(buffer);
465+
this.header = buffer;
466+
}
467+
468+
/**
469+
* Push a document to the document sequence. Will serialize the document
470+
* as well and return the current serialized length of all documents.
471+
* @param document - The document to add.
472+
* @param buffer - The serialized document in raw BSON.
473+
* @returns The serialized documents length.
474+
*/
475+
push(document: Document, buffer: Uint8Array): number {
476+
this.serializedDocumentsLength += buffer.length;
477+
// Push the document.
478+
this.documents.push(document);
479+
// Push the document raw bson.
480+
this.chunks.push(buffer);
481+
// Write the new length.
482+
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
483+
return this.serializedDocumentsLength;
484+
}
485+
486+
/**
487+
* Get the fully serialized bytes for the document sequence section.
488+
* @returns The section bytes.
489+
*/
490+
toBin(): Uint8Array {
491+
return Buffer.concat(this.chunks);
436492
}
437493
}
438494

@@ -543,21 +599,7 @@ export class OpMsgRequest {
543599
const chunks = [];
544600
for (const [key, value] of Object.entries(document)) {
545601
if (value instanceof DocumentSequence) {
546-
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548-
buffer[0] = 1;
549-
// Third part is the field name at offset 5 with trailing null byte.
550-
encodeUTF8Into(buffer, `${key}\0`, 5);
551-
chunks.push(buffer);
552-
// Fourth part are the documents' bytes.
553-
let docsLength = 0;
554-
for (const doc of value.documents) {
555-
const docBson = this.serializeBson(doc);
556-
docsLength += docBson.length;
557-
chunks.push(docBson);
558-
}
559-
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
602+
chunks.push(value.toBin());
561603
// Why are we removing the field from the command? This is because it needs to be
562604
// removed in the OP_MSG request first section, and DocumentSequence is not a
563605
// BSON type and is specific to the MongoDB wire protocol so there's nothing

src/cursor/client_bulk_write_cursor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Document } from '../bson';
22
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
3-
import { MongoBulkWriteCursorError } from '../error';
3+
import { MongoClientBulkWriteCursorError } from '../error';
44
import type { MongoClient } from '../mongo_client';
55
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
66
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
@@ -44,7 +44,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
4444
*/
4545
get response(): ClientBulkWriteCursorResponse {
4646
if (this.cursorResponse) return this.cursorResponse;
47-
throw new MongoBulkWriteCursorError(
47+
throw new MongoClientBulkWriteCursorError(
4848
'No client bulk write cursor response returned from the server.'
4949
);
5050
}

src/error.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ export class MongoGCPError extends MongoOIDCError {
622622
* @public
623623
* @category Error
624624
*/
625-
export class MongoBulkWriteCursorError extends MongoRuntimeError {
625+
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
626626
/**
627627
* **Do not use this constructor!**
628628
*
@@ -639,7 +639,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError {
639639
}
640640

641641
override get name(): string {
642-
return 'MongoBulkWriteCursorError';
642+
return 'MongoClientBulkWriteCursorError';
643+
}
644+
}
645+
646+
/**
647+
* An error indicating that an error occurred on the client when executing a client bulk write.
648+
*
649+
* @public
650+
* @category Error
651+
*/
652+
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
653+
/**
654+
* **Do not use this constructor!**
655+
*
656+
* Meant for internal use only.
657+
*
658+
* @remarks
659+
* This class is only meant to be constructed within the driver. This constructor is
660+
* not subject to semantic versioning compatibility guarantees and may change at any time.
661+
*
662+
* @public
663+
**/
664+
constructor(message: string) {
665+
super(message);
666+
}
667+
668+
override get name(): string {
669+
return 'MongoClientBulkWriteExecutionError';
643670
}
644671
}
645672

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ export {
4444
MongoAWSError,
4545
MongoAzureError,
4646
MongoBatchReExecutionError,
47-
MongoBulkWriteCursorError,
4847
MongoChangeStreamError,
48+
MongoClientBulkWriteCursorError,
49+
MongoClientBulkWriteExecutionError,
4950
MongoCompatibilityError,
5051
MongoCursorExhaustedError,
5152
MongoCursorInUseError,

src/operations/client_bulk_write/command_builder.ts

Lines changed: 108 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type Document } from '../../bson';
1+
import { BSON, type Document } from '../../bson';
22
import { DocumentSequence } from '../../cmap/commands';
33
import { type PkFactory } from '../../mongo_client';
44
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
@@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand {
2828
comment?: any;
2929
}
3030

31+
/**
32+
* The bytes overhead for the extra fields added post command generation.
33+
*/
34+
const MESSAGE_OVERHEAD_BYTES = 1000;
35+
3136
/** @internal */
3237
export class ClientBulkWriteCommandBuilder {
3338
models: AnyClientBulkWriteModel[];
@@ -62,32 +67,123 @@ export class ClientBulkWriteCommandBuilder {
6267
/**
6368
* Build the bulk write commands from the models.
6469
*/
65-
buildCommands(): ClientBulkWriteCommand[] {
70+
buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] {
6671
// Iterate the models to build the ops and nsInfo fields.
67-
const operations = [];
72+
// We need to do this in a loop which creates one command each up
73+
// to the max bson size or max message size.
74+
const commands: ClientBulkWriteCommand[] = [];
75+
let currentCommandLength = MESSAGE_OVERHEAD_BYTES;
6876
let currentNamespaceIndex = 0;
77+
let currentCommand: ClientBulkWriteCommand = this.baseCommand();
6978
const namespaces = new Map<string, number>();
7079
for (const model of this.models) {
7180
const ns = model.namespace;
7281
const index = namespaces.get(ns);
7382
if (index != null) {
74-
operations.push(buildOperation(model, index, this.pkFactory));
83+
// Pushing to the ops document sequence returns the bytes length added.
84+
const operation = buildOperation(model, index, this.pkFactory);
85+
const operationBuffer = BSON.serialize(operation);
86+
87+
// Check if the operation buffer can fit in the current command. If it can,
88+
// then add the operation to the document sequence and increment the
89+
// current length as long as the ops don't exceed the maxWriteBatchSize.
90+
if (
91+
currentCommandLength + operationBuffer.length < maxMessageSizeBytes &&
92+
currentCommand.ops.documents.length < maxWriteBatchSize
93+
) {
94+
// Pushing to the ops document sequence returns the bytes length added.
95+
currentCommandLength += this.addOperation(currentCommand, operation, operationBuffer);
96+
} else {
97+
// We need to batch. Push the current command to the commands
98+
// array and create a new current command.
99+
commands.push(currentCommand);
100+
currentCommand = this.baseCommand();
101+
currentCommandLength += this.addOperation(currentCommand, operation, operationBuffer);
102+
}
75103
} else {
76104
namespaces.set(ns, currentNamespaceIndex);
77-
operations.push(buildOperation(model, currentNamespaceIndex, this.pkFactory));
78-
currentNamespaceIndex++;
105+
const nsInfo = { ns: ns };
106+
const nsInfoBuffer = BSON.serialize(nsInfo);
107+
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
108+
const operationBuffer = BSON.serialize(operation);
109+
110+
// Check if the operation and nsInfo buffers can fit in the command. If they
111+
// can, then add the operation and nsInfo to their respective document
112+
// sequences and increment the current length as long as the ops don't exceed
113+
// the maxWriteBatchSize.
114+
if (
115+
currentCommandLength + nsInfoBuffer.length + operationBuffer.length <
116+
maxMessageSizeBytes &&
117+
currentCommand.ops.documents.length < maxWriteBatchSize
118+
) {
119+
currentCommandLength += this.addOperationAndNsInfo(
120+
currentCommand,
121+
operation,
122+
operationBuffer,
123+
nsInfo,
124+
nsInfoBuffer
125+
);
126+
127+
// We've added a new namespace, increment the namespace index.
128+
currentNamespaceIndex++;
129+
} else {
130+
// We need to batch. Push the current command to the commands
131+
// array and create a new current command.
132+
commands.push(currentCommand);
133+
currentCommand = this.baseCommand();
134+
135+
currentCommandLength += this.addOperationAndNsInfo(
136+
currentCommand,
137+
operation,
138+
operationBuffer,
139+
nsInfo,
140+
nsInfoBuffer
141+
);
142+
143+
// We've added a new namespace, increment the namespace index.
144+
currentNamespaceIndex++;
145+
}
79146
}
80147
}
81148

82-
const nsInfo = Array.from(namespaces.keys(), ns => ({ ns }));
149+
// After we've finisihed iterating all the models put the last current command
150+
// only if there are operations in it.
151+
if (currentCommand.ops.documents.length > 0) {
152+
commands.push(currentCommand);
153+
}
154+
155+
return commands;
156+
}
157+
158+
private addOperation(
159+
command: ClientBulkWriteCommand,
160+
operation: Document,
161+
operationBuffer: Uint8Array
162+
): number {
163+
// Pushing to the ops document sequence returns the bytes length added.
164+
return command.ops.push(operation, operationBuffer);
165+
}
166+
167+
private addOperationAndNsInfo(
168+
command: ClientBulkWriteCommand,
169+
operation: Document,
170+
operationBuffer: Uint8Array,
171+
nsInfo: Document,
172+
nsInfoBuffer: Uint8Array
173+
): number {
174+
// Pushing to the nsInfo document sequence returns the bytes length added.
175+
const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer);
176+
const opsLength = this.addOperation(command, operation, operationBuffer);
177+
return nsInfoLength + opsLength;
178+
}
83179

84-
// The base command.
180+
private baseCommand(): ClientBulkWriteCommand {
85181
const command: ClientBulkWriteCommand = {
86182
bulkWrite: 1,
87183
errorsOnly: this.errorsOnly,
88184
ordered: this.options.ordered ?? true,
89-
ops: new DocumentSequence(operations),
90-
nsInfo: new DocumentSequence(nsInfo)
185+
ops: new DocumentSequence('ops'),
186+
nsInfo: new DocumentSequence('nsInfo')
91187
};
92188
// Add bypassDocumentValidation if it was present in the options.
93189
if (this.options.bypassDocumentValidation != null) {
@@ -103,7 +199,8 @@ export class ClientBulkWriteCommandBuilder {
103199
if (this.options.comment !== undefined) {
104200
command.comment = this.options.comment;
105201
}
106-
return [command];
202+
203+
return command;
107204
}
108205
}
109206

src/operations/client_bulk_write/executor.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type Document } from 'bson';
22

33
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
4+
import { MongoClientBulkWriteExecutionError } from '../../error';
45
import { type MongoClient } from '../../mongo_client';
56
import { WriteConcern } from '../../write_concern';
67
import { executeOperation } from '../execute_operation';
@@ -49,6 +50,23 @@ export class ClientBulkWriteExecutor {
4950
* @returns The result.
5051
*/
5152
async execute(): Promise<ClientBulkWriteResult | { ok: 1 }> {
53+
const topologyDescription = this.client.topology?.description;
54+
const maxMessageSizeBytes = topologyDescription?.maxMessageSizeBytes;
55+
const maxWriteBatchSize = topologyDescription?.maxWriteBatchSize;
56+
// If we don't know the maxMessageSizeBytes or for some reason it's 0
57+
// then we cannot calculate the batch.
58+
if (!maxMessageSizeBytes) {
59+
throw new MongoClientBulkWriteExecutionError(
60+
'No maxMessageSizeBytes value found - client bulk writes cannot execute without this value set from the monitoring connections.'
61+
);
62+
}
63+
64+
if (!maxWriteBatchSize) {
65+
throw new MongoClientBulkWriteExecutionError(
66+
'No maxWriteBatchSize value found - client bulk writes cannot execute without this value set from the monitoring connections.'
67+
);
68+
}
69+
5270
// The command builder will take the user provided models and potential split the batch
5371
// into multiple commands due to size.
5472
const pkFactory = this.client.s.options.pkFactory;
@@ -57,7 +75,7 @@ export class ClientBulkWriteExecutor {
5775
this.options,
5876
pkFactory
5977
);
60-
const commands = commandBuilder.buildCommands();
78+
const commands = commandBuilder.buildCommands(maxMessageSizeBytes, maxWriteBatchSize);
6179
if (this.options.writeConcern?.w === 0) {
6280
return await executeUnacknowledged(this.client, this.options, commands);
6381
}
@@ -75,10 +93,14 @@ async function executeAcknowledged(
7593
): Promise<ClientBulkWriteResult> {
7694
const resultsMerger = new ClientBulkWriteResultsMerger(options);
7795
// For each command will will create and exhaust a cursor for the results.
96+
let currentBatchOffset = 0;
7897
for (const command of commands) {
7998
const cursor = new ClientBulkWriteCursor(client, command, options);
8099
const docs = await cursor.toArray();
81-
resultsMerger.merge(command.ops.documents, cursor.response, docs);
100+
const operations = command.ops.documents;
101+
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
102+
// Set the new batch index so we can back back to the index in the original models.
103+
currentBatchOffset += operations.length;
82104
}
83105
return resultsMerger.result;
84106
}

0 commit comments

Comments
 (0)