Skip to content

Commit 219922e

Browse files
committed
feat(NODE-6337): implement client bulk write batching
1 parent bb3bae8 commit 219922e

File tree

14 files changed

+735
-196
lines changed

14 files changed

+735
-196
lines changed

src/cmap/commands.ts

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,66 @@ export interface OpMsgOptions {
429429

430430
/** @internal */
431431
export class DocumentSequence {
432+
field: string;
432433
documents: Document[];
434+
serializedDocumentsLength: number;
435+
private chunks: Uint8Array[];
436+
private header?: Buffer;
433437

434-
constructor(documents: Document[]) {
435-
this.documents = documents;
438+
/**
439+
* Create a new document sequence for the provided field.
440+
* @param field - The field it will replace.
441+
*/
442+
constructor(field: string, documents?: Document[]) {
443+
this.field = field;
444+
this.documents = [];
445+
this.chunks = [];
446+
this.serializedDocumentsLength = 0;
447+
this.init();
448+
if (documents) {
449+
for (const doc of documents) {
450+
this.push(doc, BSON.serialize(doc));
451+
}
452+
}
453+
}
454+
455+
/**
456+
* Initialize the buffer chunks.
457+
*/
458+
private init() {
459+
// Document sequences starts with type 1 at the first byte.
460+
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
461+
buffer[0] = 1;
462+
// Third part is the field name at offset 5 with trailing null byte.
463+
encodeUTF8Into(buffer, `${this.field}\0`, 5);
464+
this.chunks.push(buffer);
465+
this.header = buffer;
466+
}
467+
468+
/**
469+
* Push a document to the document sequence. Will serialize the document
470+
* as well and return the current serialized length of all documents.
471+
* @param document - The document to add.
472+
* @param buffer - The serialized document in raw BSON.
473+
* @returns The new totoal document sequence length.
474+
*/
475+
push(document: Document, buffer: Uint8Array): number {
476+
this.serializedDocumentsLength += buffer.length;
477+
// Push the document.
478+
this.documents.push(document);
479+
// Push the document raw bson.
480+
this.chunks.push(buffer);
481+
// Write the new length.
482+
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
483+
return this.serializedDocumentsLength + (this.header?.length ?? 0);
484+
}
485+
486+
/**
487+
* Get the fully serialized bytes for the document sequence section.
488+
* @returns The section bytes.
489+
*/
490+
toBin(): Uint8Array {
491+
return Buffer.concat(this.chunks);
436492
}
437493
}
438494

@@ -543,21 +599,7 @@ export class OpMsgRequest {
543599
const chunks = [];
544600
for (const [key, value] of Object.entries(document)) {
545601
if (value instanceof DocumentSequence) {
546-
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548-
buffer[0] = 1;
549-
// Third part is the field name at offset 5 with trailing null byte.
550-
encodeUTF8Into(buffer, `${key}\0`, 5);
551-
chunks.push(buffer);
552-
// Fourth part are the documents' bytes.
553-
let docsLength = 0;
554-
for (const doc of value.documents) {
555-
const docBson = this.serializeBson(doc);
556-
docsLength += docBson.length;
557-
chunks.push(docBson);
558-
}
559-
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
602+
chunks.push(value.toBin());
561603
// Why are we removing the field from the command? This is because it needs to be
562604
// removed in the OP_MSG request first section, and DocumentSequence is not a
563605
// BSON type and is specific to the MongoDB wire protocol so there's nothing

src/cursor/client_bulk_write_cursor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Document } from '../bson';
22
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
3-
import { MongoBulkWriteCursorError } from '../error';
3+
import { MongoClientBulkWriteCursorError } from '../error';
44
import type { MongoClient } from '../mongo_client';
55
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
66
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
@@ -44,7 +44,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
4444
*/
4545
get response(): ClientBulkWriteCursorResponse {
4646
if (this.cursorResponse) return this.cursorResponse;
47-
throw new MongoBulkWriteCursorError(
47+
throw new MongoClientBulkWriteCursorError(
4848
'No client bulk write cursor response returned from the server.'
4949
);
5050
}

src/error.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ export class MongoGCPError extends MongoOIDCError {
622622
* @public
623623
* @category Error
624624
*/
625-
export class MongoBulkWriteCursorError extends MongoRuntimeError {
625+
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
626626
/**
627627
* **Do not use this constructor!**
628628
*
@@ -639,7 +639,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError {
639639
}
640640

641641
override get name(): string {
642-
return 'MongoBulkWriteCursorError';
642+
return 'MongoClientBulkWriteCursorError';
643+
}
644+
}
645+
646+
/**
647+
* An error indicating that an error occurred on the client when executing a client bulk write.
648+
*
649+
* @public
650+
* @category Error
651+
*/
652+
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
653+
/**
654+
* **Do not use this constructor!**
655+
*
656+
* Meant for internal use only.
657+
*
658+
* @remarks
659+
* This class is only meant to be constructed within the driver. This constructor is
660+
* not subject to semantic versioning compatibility guarantees and may change at any time.
661+
*
662+
* @public
663+
**/
664+
constructor(message: string) {
665+
super(message);
666+
}
667+
668+
override get name(): string {
669+
return 'MongoClientBulkWriteExecutionError';
643670
}
644671
}
645672

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ export {
4444
MongoAWSError,
4545
MongoAzureError,
4646
MongoBatchReExecutionError,
47-
MongoBulkWriteCursorError,
4847
MongoChangeStreamError,
48+
MongoClientBulkWriteCursorError,
49+
MongoClientBulkWriteExecutionError,
4950
MongoCompatibilityError,
5051
MongoCursorExhaustedError,
5152
MongoCursorInUseError,

src/operations/client_bulk_write/command_builder.ts

Lines changed: 132 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type Document } from '../../bson';
1+
import { BSON, type Document } from '../../bson';
22
import { DocumentSequence } from '../../cmap/commands';
33
import { type PkFactory } from '../../mongo_client';
44
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
@@ -28,6 +28,11 @@ export interface ClientBulkWriteCommand {
2828
comment?: any;
2929
}
3030

31+
/**
32+
* The bytes overhead for the extra fields added post command generation.
33+
*/
34+
const MESSAGE_OVERHEAD_BYTES = 1000;
35+
3136
/** @internal */
3237
export class ClientBulkWriteCommandBuilder {
3338
models: AnyClientBulkWriteModel[];
@@ -62,32 +67,148 @@ export class ClientBulkWriteCommandBuilder {
6267
/**
6368
* Build the bulk write commands from the models.
6469
*/
65-
buildCommands(): ClientBulkWriteCommand[] {
70+
buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] {
6671
// Iterate the models to build the ops and nsInfo fields.
67-
const operations = [];
72+
// We need to do this in a loop which creates one command each up
73+
// to the max bson size or max message size.
74+
const commands: ClientBulkWriteCommand[] = [];
75+
let currentCommandLength = 0;
6876
let currentNamespaceIndex = 0;
77+
let currentCommand: ClientBulkWriteCommand = this.baseCommand();
6978
const namespaces = new Map<string, number>();
79+
7080
for (const model of this.models) {
7181
const ns = model.namespace;
7282
const index = namespaces.get(ns);
83+
84+
/**
85+
* Convenience function for resetting everything when a new batch
86+
* is started.
87+
*/
88+
const reset = () => {
89+
commands.push(currentCommand);
90+
namespaces.clear();
91+
currentNamespaceIndex = 0;
92+
currentCommand = this.baseCommand();
93+
namespaces.set(ns, currentNamespaceIndex);
94+
};
95+
7396
if (index != null) {
74-
operations.push(buildOperation(model, index, this.pkFactory));
97+
// Pushing to the ops document sequence returns the bytes length added.
98+
const operation = buildOperation(model, index, this.pkFactory);
99+
const operationBuffer = BSON.serialize(operation);
100+
101+
// Check if the operation buffer can fit in the current command. If it can,
102+
// then add the operation to the document sequence and increment the
103+
// current length as long as the ops don't exceed the maxWriteBatchSize.
104+
if (
105+
currentCommandLength + operationBuffer.length < maxMessageSizeBytes &&
106+
currentCommand.ops.documents.length < maxWriteBatchSize
107+
) {
108+
// Pushing to the ops document sequence returns the bytes length added.
109+
currentCommandLength =
110+
MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer);
111+
} else {
112+
// We need to batch. Push the current command to the commands
113+
// array and create a new current command. We aslo need to clear the namespaces
114+
// map for the new command.
115+
reset();
116+
117+
const nsInfo = { ns: ns };
118+
const nsInfoBuffer = BSON.serialize(nsInfo);
119+
currentCommandLength =
120+
MESSAGE_OVERHEAD_BYTES +
121+
this.addOperationAndNsInfo(
122+
currentCommand,
123+
operation,
124+
operationBuffer,
125+
nsInfo,
126+
nsInfoBuffer
127+
);
128+
}
75129
} else {
76130
namespaces.set(ns, currentNamespaceIndex);
77-
operations.push(buildOperation(model, currentNamespaceIndex, this.pkFactory));
131+
const nsInfo = { ns: ns };
132+
const nsInfoBuffer = BSON.serialize(nsInfo);
133+
const operation = buildOperation(model, currentNamespaceIndex, this.pkFactory);
134+
const operationBuffer = BSON.serialize(operation);
135+
136+
// Check if the operation and nsInfo buffers can fit in the command. If they
137+
// can, then add the operation and nsInfo to their respective document
138+
// sequences and increment the current length as long as the ops don't exceed
139+
// the maxWriteBatchSize.
140+
if (
141+
currentCommandLength + nsInfoBuffer.length + operationBuffer.length <
142+
maxMessageSizeBytes &&
143+
currentCommand.ops.documents.length < maxWriteBatchSize
144+
) {
145+
currentCommandLength =
146+
MESSAGE_OVERHEAD_BYTES +
147+
this.addOperationAndNsInfo(
148+
currentCommand,
149+
operation,
150+
operationBuffer,
151+
nsInfo,
152+
nsInfoBuffer
153+
);
154+
} else {
155+
// We need to batch. Push the current command to the commands
156+
// array and create a new current command. Aslo clear the namespaces map.
157+
reset();
158+
159+
currentCommandLength =
160+
MESSAGE_OVERHEAD_BYTES +
161+
this.addOperationAndNsInfo(
162+
currentCommand,
163+
operation,
164+
operationBuffer,
165+
nsInfo,
166+
nsInfoBuffer
167+
);
168+
}
169+
// We've added a new namespace, increment the namespace index.
78170
currentNamespaceIndex++;
79171
}
80172
}
81173

82-
const nsInfo = Array.from(namespaces.keys(), ns => ({ ns }));
174+
// After we've finisihed iterating all the models put the last current command
175+
// only if there are operations in it.
176+
if (currentCommand.ops.documents.length > 0) {
177+
commands.push(currentCommand);
178+
}
83179

84-
// The base command.
180+
return commands;
181+
}
182+
183+
private addOperation(
184+
command: ClientBulkWriteCommand,
185+
operation: Document,
186+
operationBuffer: Uint8Array
187+
): number {
188+
// Pushing to the ops document sequence returns the bytes length added.
189+
return command.ops.push(operation, operationBuffer);
190+
}
191+
192+
private addOperationAndNsInfo(
193+
command: ClientBulkWriteCommand,
194+
operation: Document,
195+
operationBuffer: Uint8Array,
196+
nsInfo: Document,
197+
nsInfoBuffer: Uint8Array
198+
): number {
199+
// Pushing to the nsInfo document sequence returns the bytes length added.
200+
const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer);
201+
const opsLength = this.addOperation(command, operation, operationBuffer);
202+
return nsInfoLength + opsLength;
203+
}
204+
205+
private baseCommand(): ClientBulkWriteCommand {
85206
const command: ClientBulkWriteCommand = {
86207
bulkWrite: 1,
87208
errorsOnly: this.errorsOnly,
88209
ordered: this.options.ordered ?? true,
89-
ops: new DocumentSequence(operations),
90-
nsInfo: new DocumentSequence(nsInfo)
210+
ops: new DocumentSequence('ops'),
211+
nsInfo: new DocumentSequence('nsInfo')
91212
};
92213
// Add bypassDocumentValidation if it was present in the options.
93214
if (this.options.bypassDocumentValidation != null) {
@@ -103,7 +224,8 @@ export class ClientBulkWriteCommandBuilder {
103224
if (this.options.comment !== undefined) {
104225
command.comment = this.options.comment;
105226
}
106-
return [command];
227+
228+
return command;
107229
}
108230
}
109231

0 commit comments

Comments
 (0)