Skip to content

Commit f1eb220

Browse files
committed
feat(NODE-6337): implement client bulk write batching
1 parent a2d9dcc commit f1eb220

File tree

6 files changed

+334
-176
lines changed

6 files changed

+334
-176
lines changed

src/cmap/commands.ts

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,68 @@ export interface OpMsgOptions {
429429

430430
/** @internal */
431431
export class DocumentSequence {
432+
field: string;
432433
documents: Document[];
434+
serializedDocumentsLength: number;
435+
private chunks: Uint8Array[];
436+
private header?: Buffer;
433437

434-
constructor(documents: Document[]) {
435-
this.documents = documents;
438+
/**
439+
* Create a new document sequence for the provided field.
440+
* @param field - The field it will replace.
441+
*/
442+
constructor(field: string, documents?: Document[]) {
443+
this.field = field;
444+
this.documents = [];
445+
this.chunks = [];
446+
this.serializedDocumentsLength = 0;
447+
this.init();
448+
if (documents) {
449+
for (const doc of documents) {
450+
this.push(doc);
451+
}
452+
}
453+
}
454+
455+
/**
456+
* Initialize the buffer chunks.
457+
*/
458+
private init() {
459+
// Document sequences starts with type 1 at the first byte.
460+
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
461+
buffer[0] = 1;
462+
// Third part is the field name at offset 5 with trailing null byte.
463+
encodeUTF8Into(buffer, `${this.field}\0`, 5);
464+
this.chunks.push(buffer);
465+
this.header = buffer;
466+
}
467+
468+
/**
469+
* Push a document to the document sequence. Will serialize the document
470+
* as well and return the current serialized length of all documents.
471+
* @param document - The document to add.
472+
* @returns The serialized documents length.
473+
*/
474+
push(document: Document): number {
475+
// First serialize the document and recalculate the documents length.
476+
const docBuffer = BSON.serialize(document);
477+
this.serializedDocumentsLength += docBuffer.length;
478+
// Push the document.
479+
this.documents.push(document);
480+
// Push the document raw bson.
481+
this.chunks.push(docBuffer);
482+
// Write the new length.
483+
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
484+
return this.serializedDocumentsLength;
485+
}
486+
487+
/**
488+
* Get the fully serialized bytes for the document sequence section.
489+
* @returns The section bytes.
490+
*/
491+
toBin(): Uint8Array {
492+
// TODO: What to do if no documents?
493+
return Buffer.concat(this.chunks);
436494
}
437495
}
438496

@@ -543,21 +601,7 @@ export class OpMsgRequest {
543601
const chunks = [];
544602
for (const [key, value] of Object.entries(document)) {
545603
if (value instanceof DocumentSequence) {
546-
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548-
buffer[0] = 1;
549-
// Third part is the field name at offset 5 with trailing null byte.
550-
encodeUTF8Into(buffer, `${key}\0`, 5);
551-
chunks.push(buffer);
552-
// Fourth part are the documents' bytes.
553-
let docsLength = 0;
554-
for (const doc of value.documents) {
555-
const docBson = this.serializeBson(doc);
556-
docsLength += docBson.length;
557-
chunks.push(docBson);
558-
}
559-
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
604+
chunks.push(value.toBin());
561605
// Why are we removing the field from the command? This is because it needs to be
562606
// removed in the OP_MSG request first section, and DocumentSequence is not a
563607
// BSON type and is specific to the MongoDB wire protocol so there's nothing

src/operations/client_bulk_write/command_builder.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ export class ClientBulkWriteCommandBuilder {
6464
*/
6565
buildCommands(): ClientBulkWriteCommand[] {
6666
// Iterate the models to build the ops and nsInfo fields.
67+
// We need to do this in a loop which creates one command each up
68+
// to the max bson size or max message size.
6769
const operations = [];
6870
let currentNamespaceIndex = 0;
6971
const namespaces = new Map<string, number>();
@@ -86,8 +88,8 @@ export class ClientBulkWriteCommandBuilder {
8688
bulkWrite: 1,
8789
errorsOnly: this.errorsOnly,
8890
ordered: this.options.ordered ?? true,
89-
ops: new DocumentSequence(operations),
90-
nsInfo: new DocumentSequence(nsInfo)
91+
ops: new DocumentSequence('ops', operations),
92+
nsInfo: new DocumentSequence('nsInfo', nsInfo)
9193
};
9294
// Add bypassDocumentValidation if it was present in the options.
9395
if (this.options.bypassDocumentValidation != null) {

src/operations/client_bulk_write/executor.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,14 @@ async function executeAcknowledged(
7575
): Promise<ClientBulkWriteResult> {
7676
const resultsMerger = new ClientBulkWriteResultsMerger(options);
7777
// For each command will will create and exhaust a cursor for the results.
78+
let currentBatchOffset = 0;
7879
for (const command of commands) {
7980
const cursor = new ClientBulkWriteCursor(client, command, options);
8081
const docs = await cursor.toArray();
81-
resultsMerger.merge(command.ops.documents, cursor.response, docs);
82+
const operations = command.ops.documents;
83+
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
84+
// Set the new batch index so we can back back to the index in the original models.
85+
currentBatchOffset += operations.length;
8286
}
8387
return resultsMerger.result;
8488
}

src/operations/client_bulk_write/results_merger.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ export class ClientBulkWriteResultsMerger {
4242

4343
/**
4444
* Merge the results in the cursor to the existing result.
45+
* @param currentBatchOffset - The offset index to the original models.
4546
* @param response - The cursor response.
4647
* @param documents - The documents in the cursor.
4748
* @returns The current result.
4849
*/
4950
merge(
51+
currentBatchOffset: number,
5052
operations: Document[],
5153
response: ClientBulkWriteCursorResponse,
5254
documents: Document[]
@@ -67,7 +69,9 @@ export class ClientBulkWriteResultsMerger {
6769
const operation = operations[document.idx];
6870
// Handle insert results.
6971
if ('insert' in operation) {
70-
this.result.insertResults?.set(document.idx, { insertedId: operation.document._id });
72+
this.result.insertResults?.set(document.idx + currentBatchOffset, {
73+
insertedId: operation.document._id
74+
});
7175
}
7276
// Handle update results.
7377
if ('update' in operation) {
@@ -81,11 +85,13 @@ export class ClientBulkWriteResultsMerger {
8185
if (document.upserted) {
8286
result.upsertedId = document.upserted._id;
8387
}
84-
this.result.updateResults?.set(document.idx, result);
88+
this.result.updateResults?.set(document.idx + currentBatchOffset, result);
8589
}
8690
// Handle delete results.
8791
if ('delete' in operation) {
88-
this.result.deleteResults?.set(document.idx, { deletedCount: document.n });
92+
this.result.deleteResults?.set(document.idx + currentBatchOffset, {
93+
deletedCount: document.n
94+
});
8995
}
9096
}
9197
}

test/unit/cmap/commands.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ describe('commands', function () {
1515
context('when there is one document sequence', function () {
1616
const command = {
1717
test: 1,
18-
field: new DocumentSequence([{ test: 1 }])
18+
field: new DocumentSequence('field', [{ test: 1 }])
1919
};
2020
const msg = new OpMsgRequest('admin', command, {});
2121
const buffers = msg.toBin();
@@ -53,8 +53,8 @@ describe('commands', function () {
5353
context('when there are multiple document sequences', function () {
5454
const command = {
5555
test: 1,
56-
fieldOne: new DocumentSequence([{ test: 1 }]),
57-
fieldTwo: new DocumentSequence([{ test: 1 }])
56+
fieldOne: new DocumentSequence('fieldOne', [{ test: 1 }]),
57+
fieldTwo: new DocumentSequence('fieldTwo', [{ test: 1 }])
5858
};
5959
const msg = new OpMsgRequest('admin', command, {});
6060
const buffers = msg.toBin();

0 commit comments

Comments
 (0)