Skip to content

Commit b312282

Browse files
kalenikalexanderrnavarych
authored andcommitted
handle backpressure when loading data from file
1 parent daecdda commit b312282

File tree

1 file changed

+38
-14
lines changed

1 file changed

+38
-14
lines changed

lib/connection.js

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,14 @@ class Connection extends EventEmitter {
3232
if (opts.config.socketPath) {
3333
this.stream = Net.connect(opts.config.socketPath);
3434
} else {
35-
this.stream = Net.connect(
36-
opts.config.port,
37-
opts.config.host
38-
);
35+
this.stream = Net.connect(opts.config.port, opts.config.host);
3936

4037
// Enable keep-alive on the socket. It's disabled by default, but the
4138
// user can enable it and supply an initial delay.
4239
this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
4340
}
4441
// if stream is a function, treat it as "stream agent / factory"
45-
} else if (typeof opts.config.stream === 'function') {
42+
} else if (typeof opts.config.stream === 'function') {
4643
this.stream = opts.config.stream(opts);
4744
} else {
4845
this.stream = opts.config.stream;
@@ -223,11 +220,15 @@ class Connection extends EventEmitter {
223220
}
224221

225222
write(buffer) {
226-
this.stream.write(buffer, err => {
223+
const result = this.stream.write(buffer, err => {
227224
if (err) {
228225
this._handleNetworkError(err);
229226
}
230227
});
228+
229+
if (!result) {
230+
this.stream.emit('pause');
231+
}
231232
}
232233

233234
// http://dev.mysql.com/doc/internals/en/sequence-id.html
@@ -260,11 +261,19 @@ class Connection extends EventEmitter {
260261
if (this.config.debug) {
261262
// eslint-disable-next-line no-console
262263
console.log(
263-
`${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
264+
`${this._internalId} ${this.connectionId} <== ${
265+
this._command._commandName
266+
}#${this._command.stateName()}(${[
267+
this.sequenceId,
268+
packet._name,
269+
packet.length()
270+
].join(',')})`
264271
);
265272
// eslint-disable-next-line no-console
266273
console.log(
267-
`${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
274+
`${this._internalId} ${
275+
this.connectionId
276+
} <== ${packet.buffer.toString('hex')}`
268277
);
269278
}
270279
this._bumpSequenceId(1);
@@ -277,7 +286,13 @@ class Connection extends EventEmitter {
277286
);
278287
// eslint-disable-next-line no-console
279288
console.log(
280-
`${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
289+
`${this._internalId} ${this.connectionId} <== ${
290+
this._command._commandName
291+
}#${this._command.stateName()}(${[
292+
this.sequenceId,
293+
packet._name,
294+
packet.length()
295+
].join(',')})`
281296
);
282297
}
283298
for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
@@ -402,7 +417,13 @@ class Connection extends EventEmitter {
402417
: '(no command)';
403418
// eslint-disable-next-line no-console
404419
console.log(
405-
`${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
420+
`${this._internalId} ${
421+
this.connectionId
422+
} ==> ${commandName}#${stateName}(${[
423+
packet.sequenceId,
424+
packet.type(),
425+
packet.length()
426+
].join(',')})`
406427
);
407428
}
408429
}
@@ -497,7 +518,10 @@ class Connection extends EventEmitter {
497518
cmdQuery = Connection.createQuery(sql, values, cb, this.config);
498519
}
499520
this._resolveNamedPlaceholders(cmdQuery);
500-
const rawSql = this.format(cmdQuery.sql, cmdQuery.values !== undefined ? cmdQuery.values : []);
521+
const rawSql = this.format(
522+
cmdQuery.sql,
523+
cmdQuery.values !== undefined ? cmdQuery.values : []
524+
);
501525
cmdQuery.sql = rawSql;
502526
return this.addCommand(cmdQuery);
503527
}
@@ -839,9 +863,9 @@ class Connection extends EventEmitter {
839863
}
840864

841865
static statementKey(options) {
842-
return (
843-
`${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
844-
);
866+
return `${typeof options.nestTables}/${options.nestTables}/${
867+
options.rowsAsArray
868+
}${options.sql}`;
845869
}
846870
}
847871

0 commit comments

Comments
 (0)