Skip to content

Commit 0ed7ce8

Browse files
committed
refactor: build commands with connection
1 parent d15f7cd commit 0ed7ce8

File tree

7 files changed

+164
-247
lines changed

7 files changed

+164
-247
lines changed

src/cursor/client_bulk_write_cursor.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import type { Document } from '../bson';
1+
import { type Document } from 'bson';
2+
23
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
34
import { MongoClientBulkWriteCursorError } from '../error';
45
import type { MongoClient } from '../mongo_client';
56
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
7+
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
68
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
79
import { executeOperation } from '../operations/execute_operation';
810
import type { ClientSession } from '../sessions';
@@ -24,17 +26,21 @@ export interface ClientBulkWriteCursorOptions
2426
* @internal
2527
*/
2628
export class ClientBulkWriteCursor extends AbstractCursor {
27-
public readonly command: Document;
29+
commandBuilder: ClientBulkWriteCommandBuilder;
2830
/** @internal */
2931
private cursorResponse?: ClientBulkWriteCursorResponse;
3032
/** @internal */
3133
private clientBulkWriteOptions: ClientBulkWriteOptions;
3234

3335
/** @internal */
34-
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
36+
constructor(
37+
client: MongoClient,
38+
commandBuilder: ClientBulkWriteCommandBuilder,
39+
options: ClientBulkWriteOptions = {}
40+
) {
3541
super(client, new MongoDBNamespace('admin', '$cmd'), options);
3642

37-
this.command = command;
43+
this.commandBuilder = commandBuilder;
3844
this.clientBulkWriteOptions = options;
3945
}
4046

@@ -49,17 +55,24 @@ export class ClientBulkWriteCursor extends AbstractCursor {
4955
);
5056
}
5157

58+
/**
59+
* Get the last set of operations the cursor executed.
60+
*/
61+
get operations(): Document[] {
62+
return this.commandBuilder.lastOperations;
63+
}
64+
5265
clone(): ClientBulkWriteCursor {
5366
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
5467
delete clonedOptions.session;
55-
return new ClientBulkWriteCursor(this.client, this.command, {
68+
return new ClientBulkWriteCursor(this.client, this.commandBuilder, {
5669
...clonedOptions
5770
});
5871
}
5972

6073
/** @internal */
6174
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
62-
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
75+
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, {
6376
...this.clientBulkWriteOptions,
6477
...this.cursorOptions,
6578
session
Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
1-
import { type Document } from 'bson';
2-
1+
import { ServerType } from '../../beta';
32
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
43
import type { Server } from '../../sdam/server';
54
import type { ClientSession } from '../../sessions';
65
import { MongoDBNamespace } from '../../utils';
76
import { CommandOperation } from '../command';
87
import { Aspect, defineAspects } from '../operation';
8+
import { type ClientBulkWriteCommandBuilder } from './command_builder';
99
import { type ClientBulkWriteOptions } from './common';
1010

1111
/**
1212
* Executes a single client bulk write operation within a potential batch.
1313
* @internal
1414
*/
1515
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
16-
command: Document;
16+
commandBuilder: ClientBulkWriteCommandBuilder;
1717
override options: ClientBulkWriteOptions;
1818

1919
override get commandName() {
2020
return 'bulkWrite' as const;
2121
}
2222

23-
constructor(command: Document, options: ClientBulkWriteOptions) {
23+
constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) {
2424
super(undefined, options);
25-
this.command = command;
25+
this.commandBuilder = commandBuilder;
2626
this.options = options;
2727
this.ns = new MongoDBNamespace('admin', '$cmd');
2828
}
@@ -37,9 +37,34 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
3737
server: Server,
3838
session: ClientSession | undefined
3939
): Promise<ClientBulkWriteCursorResponse> {
40-
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
40+
let command;
41+
42+
if (server.description.type === ServerType.LoadBalancer) {
43+
// Checkout a connection to build the command.
44+
const connection = await server.pool.checkOut();
45+
command = this.commandBuilder.buildBatch(
46+
connection.hello?.maxMessageSizeBytes,
47+
connection.hello?.maxWriteBatchSize
48+
);
49+
// Pin the connection to the session so it get used to execute the command and we do not
50+
// perform a double check-in/check-out.
51+
session?.pin(connection);
52+
} else {
53+
// At this point we have a server and the auto connect code has already
54+
// run in executeOperation, so the server description will be populated.
55+
// We can use that to build the command.
56+
command = this.commandBuilder.buildBatch(
57+
server.description.maxMessageSizeBytes,
58+
server.description.maxWriteBatchSize
59+
);
60+
}
61+
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
4162
}
4263
}
4364

4465
// Skipping the collation as it goes on the individual ops.
45-
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
66+
defineAspects(ClientBulkWriteOperation, [
67+
Aspect.WRITE_OPERATION,
68+
Aspect.SKIP_COLLATION,
69+
Aspect.CURSOR_CREATING
70+
]);

src/operations/client_bulk_write/command_builder.ts

Lines changed: 52 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ export class ClientBulkWriteCommandBuilder {
3838
models: AnyClientBulkWriteModel[];
3939
options: ClientBulkWriteOptions;
4040
pkFactory: PkFactory;
41+
currentModelIndex: number;
42+
lastOperations: Document[];
4143

4244
/**
4345
* Create the command builder.
@@ -51,6 +53,8 @@ export class ClientBulkWriteCommandBuilder {
5153
this.models = models;
5254
this.options = options;
5355
this.pkFactory = pkFactory ?? DEFAULT_PK_FACTORY;
56+
this.currentModelIndex = 0;
57+
this.lastOperations = [];
5458
}
5559

5660
/**
@@ -65,68 +69,55 @@ export class ClientBulkWriteCommandBuilder {
6569
}
6670

6771
/**
68-
* Build the bulk write commands from the models.
72+
* Determines if there is another batch to process.
73+
* @returns True if not all batches have been built.
6974
*/
70-
buildCommands(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand[] {
71-
// Iterate the models to build the ops and nsInfo fields.
72-
// We need to do this in a loop which creates one command each up
73-
// to the max bson size or max message size.
74-
const commands: ClientBulkWriteCommand[] = [];
75-
let currentCommandLength = 0;
75+
hasNextBatch(): boolean {
76+
return this.currentModelIndex < this.models.length;
77+
}
78+
79+
/**
80+
* Build a single batch of a client bulk write command.
81+
* @param maxMessageSizeBytes - The max message size in bytes.
82+
* @param maxWriteBatchSize - The max write batch size.
83+
* @returns The client bulk write command.
84+
*/
85+
buildBatch(maxMessageSizeBytes: number, maxWriteBatchSize: number): ClientBulkWriteCommand {
86+
let commandLength = 0;
7687
let currentNamespaceIndex = 0;
77-
let currentCommand: ClientBulkWriteCommand = this.baseCommand();
88+
const command: ClientBulkWriteCommand = this.baseCommand();
7889
const namespaces = new Map<string, number>();
7990

80-
for (const model of this.models) {
91+
while (this.currentModelIndex < this.models.length) {
92+
const model = this.models[this.currentModelIndex];
8193
const ns = model.namespace;
82-
const index = namespaces.get(ns);
83-
84-
/**
85-
* Convenience function for resetting everything when a new batch
86-
* is started.
87-
*/
88-
const reset = () => {
89-
commands.push(currentCommand);
90-
namespaces.clear();
91-
currentNamespaceIndex = 0;
92-
currentCommand = this.baseCommand();
93-
namespaces.set(ns, currentNamespaceIndex);
94-
};
94+
const nsIndex = namespaces.get(ns);
9595

96-
if (index != null) {
97-
// Pushing to the ops document sequence returns the bytes length added.
98-
const operation = buildOperation(model, index, this.pkFactory);
96+
if (nsIndex != null) {
97+
// Build the operation and serialize it to get the bytes buffer.
98+
const operation = buildOperation(model, nsIndex, this.pkFactory);
9999
const operationBuffer = BSON.serialize(operation);
100100

101-
// Check if the operation buffer can fit in the current command. If it can,
101+
// Check if the operation buffer can fit in the command. If it can,
102102
// then add the operation to the document sequence and increment the
103103
// current length as long as the ops don't exceed the maxWriteBatchSize.
104104
if (
105-
currentCommandLength + operationBuffer.length < maxMessageSizeBytes &&
106-
currentCommand.ops.documents.length < maxWriteBatchSize
105+
commandLength + operationBuffer.length < maxMessageSizeBytes &&
106+
command.ops.documents.length < maxWriteBatchSize
107107
) {
108108
// Pushing to the ops document sequence returns the total byte length of the document sequence.
109-
currentCommandLength =
110-
MESSAGE_OVERHEAD_BYTES + this.addOperation(currentCommand, operation, operationBuffer);
109+
commandLength = MESSAGE_OVERHEAD_BYTES + command.ops.push(operation, operationBuffer);
110+
// Increment the builder's current model index.
111+
this.currentModelIndex++;
111112
} else {
112-
// We need to batch. Push the current command to the commands
113-
// array and create a new current command. We aslo need to clear the namespaces
114-
// map for the new command.
115-
reset();
116-
117-
const nsInfo = { ns: ns };
118-
const nsInfoBuffer = BSON.serialize(nsInfo);
119-
currentCommandLength =
120-
MESSAGE_OVERHEAD_BYTES +
121-
this.addOperationAndNsInfo(
122-
currentCommand,
123-
operation,
124-
operationBuffer,
125-
nsInfo,
126-
nsInfoBuffer
127-
);
113+
// The operation cannot fit in the current command and will need to
114+
// go in the next batch. Exit the loop and set the last ops.
115+
this.lastOperations = command.ops.documents;
116+
break;
128117
}
129118
} else {
119+
// The namespace is not already in the nsInfo so we will set it in the map, and
120+
// construct our nsInfo and ops documents and buffers.
130121
namespaces.set(ns, currentNamespaceIndex);
131122
const nsInfo = { ns: ns };
132123
const nsInfoBuffer = BSON.serialize(nsInfo);
@@ -138,68 +129,27 @@ export class ClientBulkWriteCommandBuilder {
138129
// sequences and increment the current length as long as the ops don't exceed
139130
// the maxWriteBatchSize.
140131
if (
141-
currentCommandLength + nsInfoBuffer.length + operationBuffer.length <
142-
maxMessageSizeBytes &&
143-
currentCommand.ops.documents.length < maxWriteBatchSize
132+
commandLength + nsInfoBuffer.length + operationBuffer.length < maxMessageSizeBytes &&
133+
command.ops.documents.length < maxWriteBatchSize
144134
) {
145-
currentCommandLength =
135+
// Pushing to the ops document sequence returns the total byte length of the document sequence.
136+
commandLength =
146137
MESSAGE_OVERHEAD_BYTES +
147-
this.addOperationAndNsInfo(
148-
currentCommand,
149-
operation,
150-
operationBuffer,
151-
nsInfo,
152-
nsInfoBuffer
153-
);
138+
command.nsInfo.push(nsInfo, nsInfoBuffer) +
139+
command.ops.push(operation, operationBuffer);
140+
// We've added a new namespace, increment the namespace index.
141+
currentNamespaceIndex++;
142+
// Increment the builder's current model index.
143+
this.currentModelIndex++;
154144
} else {
155-
// We need to batch. Push the current command to the commands
156-
// array and create a new current command. Aslo clear the namespaces map.
157-
reset();
158-
159-
currentCommandLength =
160-
MESSAGE_OVERHEAD_BYTES +
161-
this.addOperationAndNsInfo(
162-
currentCommand,
163-
operation,
164-
operationBuffer,
165-
nsInfo,
166-
nsInfoBuffer
167-
);
145+
// The operation cannot fit in the current command and will need to
146+
// go in the next batch. Exit the loop and set the last ops.
147+
this.lastOperations = command.ops.documents;
148+
break;
168149
}
169-
// We've added a new namespace, increment the namespace index.
170-
currentNamespaceIndex++;
171150
}
172151
}
173-
174-
// After we've finisihed iterating all the models put the last current command
175-
// only if there are operations in it.
176-
if (currentCommand.ops.documents.length > 0) {
177-
commands.push(currentCommand);
178-
}
179-
180-
return commands;
181-
}
182-
183-
private addOperation(
184-
command: ClientBulkWriteCommand,
185-
operation: Document,
186-
operationBuffer: Uint8Array
187-
): number {
188-
// Pushing to the ops document sequence returns the total byte length of the document sequence.
189-
return command.ops.push(operation, operationBuffer);
190-
}
191-
192-
private addOperationAndNsInfo(
193-
command: ClientBulkWriteCommand,
194-
operation: Document,
195-
operationBuffer: Uint8Array,
196-
nsInfo: Document,
197-
nsInfoBuffer: Uint8Array
198-
): number {
199-
// Pushing to the nsInfo document sequence returns the total byte length of the document sequence.
200-
const nsInfoLength = command.nsInfo.push(nsInfo, nsInfoBuffer);
201-
const opsLength = this.addOperation(command, operation, operationBuffer);
202-
return nsInfoLength + opsLength;
152+
return command;
203153
}
204154

205155
private baseCommand(): ClientBulkWriteCommand {

0 commit comments

Comments
 (0)