Skip to content

Commit 03de306

Browse files
addaleaxtargos
authored andcommitted
zlib: do not coalesce multiple .flush() calls
This is an approach to address the issue linked below. Previously, when `.write()` and `.flush()` calls to a zlib stream were interleaved synchronously (i.e. without waiting for these operations to finish), multiple flush calls would have been coalesced into a single flushing operation. This patch changes behaviour so that each `.flush()` all corresponds to one flushing operation on the underlying zlib resource, and the order of operations is as if the `.flush()` call were a `.write()` call. One test had to be removed because it specifically tested the previous behaviour. As a drive-by fix, this also makes sure that all flush callbacks are called. Previously, that was not the case. Fixes: #28478 PR-URL: #28520 Reviewed-By: Rich Trott <[email protected]> Reviewed-By: Ruben Bridgewater <[email protected]> Reviewed-By: Luigi Pinca <[email protected]>
1 parent aee8694 commit 03de306

File tree

4 files changed

+79
-53
lines changed

4 files changed

+79
-53
lines changed

lib/zlib.js

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ const {
4949
} = require('buffer');
5050
const { owner_symbol } = require('internal/async_hooks').symbols;
5151

52+
const kFlushFlag = Symbol('kFlushFlag');
53+
5254
const constants = internalBinding('constants').zlib;
5355
const {
5456
// Zlib flush levels
@@ -261,7 +263,6 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
261263
this._chunkSize = chunkSize;
262264
this._defaultFlushFlag = flush;
263265
this._finishFlushFlag = finishFlush;
264-
this._nextFlush = -1;
265266
this._defaultFullFlushFlag = fullFlush;
266267
this.once('end', this.close);
267268
this._info = opts && opts.info;
@@ -308,21 +309,35 @@ ZlibBase.prototype._flush = function(callback) {
308309

309310
// If a flush is scheduled while another flush is still pending, a way to figure
310311
// out which one is the "stronger" flush is needed.
312+
// This is currently only used to figure out which flush flag to use for the
313+
// last chunk.
311314
// Roughly, the following holds:
312315
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
313316
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
314317
const flushiness = [];
315318
let i = 0;
316-
for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
317-
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) {
319+
const kFlushFlagList = [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
320+
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH];
321+
for (const flushFlag of kFlushFlagList) {
318322
flushiness[flushFlag] = i++;
319323
}
320324

321325
function maxFlush(a, b) {
322326
return flushiness[a] > flushiness[b] ? a : b;
323327
}
324328

325-
const flushBuffer = Buffer.alloc(0);
329+
// Set up a list of 'special' buffers that can be written using .write()
330+
// from the .flush() code as a way of introducing flushing operations into the
331+
// write sequence.
332+
const kFlushBuffers = [];
333+
{
334+
const dummyArrayBuffer = new ArrayBuffer();
335+
for (const flushFlag of kFlushFlagList) {
336+
kFlushBuffers[flushFlag] = Buffer.from(dummyArrayBuffer);
337+
kFlushBuffers[flushFlag][kFlushFlag] = flushFlag;
338+
}
339+
}
340+
326341
ZlibBase.prototype.flush = function(kind, callback) {
327342
const ws = this._writableState;
328343

@@ -337,13 +352,8 @@ ZlibBase.prototype.flush = function(kind, callback) {
337352
} else if (ws.ending) {
338353
if (callback)
339354
this.once('end', callback);
340-
} else if (this._nextFlush !== -1) {
341-
// This means that there is a flush currently in the write queue.
342-
// We currently coalesce this flush into the pending one.
343-
this._nextFlush = maxFlush(this._nextFlush, kind);
344355
} else {
345-
this._nextFlush = kind;
346-
this.write(flushBuffer, '', callback);
356+
this.write(kFlushBuffers[kind], '', callback);
347357
}
348358
};
349359

@@ -361,9 +371,8 @@ ZlibBase.prototype._transform = function(chunk, encoding, cb) {
361371
var flushFlag = this._defaultFlushFlag;
362372
// We use a 'fake' zero-length chunk to carry information about flushes from
363373
// the public API to the actual stream implementation.
364-
if (chunk === flushBuffer) {
365-
flushFlag = this._nextFlush;
366-
this._nextFlush = -1;
374+
if (typeof chunk[kFlushFlag] === 'number') {
375+
flushFlag = chunk[kFlushFlag];
367376
}
368377

369378
// For the last chunk, also apply `_finishFlushFlag`.

test/parallel/test-zlib-flush-multiple-scheduled.js

Lines changed: 0 additions & 39 deletions
This file was deleted.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const { createGzip, createGunzip, Z_PARTIAL_FLUSH } = require('zlib');
5+
6+
// Verify that .flush() behaves like .write() in terms of ordering, e.g. in
7+
// a sequence like .write() + .flush() + .write() + .flush() each .flush() call
8+
// only affects the data written before it.
9+
// Refs: https://github.com/nodejs/node/issues/28478
10+
11+
const compress = createGzip();
12+
const decompress = createGunzip();
13+
decompress.setEncoding('utf8');
14+
15+
const events = [];
16+
const compressedChunks = [];
17+
18+
for (const chunk of ['abc', 'def', 'ghi']) {
19+
compress.write(chunk, common.mustCall(() => events.push({ written: chunk })));
20+
compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => {
21+
events.push('flushed');
22+
const chunk = compress.read();
23+
if (chunk !== null)
24+
compressedChunks.push(chunk);
25+
}));
26+
}
27+
28+
compress.end(common.mustCall(() => {
29+
events.push('compress end');
30+
writeToDecompress();
31+
}));
32+
33+
function writeToDecompress() {
34+
// Write the compressed chunks to a decompressor, one by one, in order to
35+
// verify that the flushes actually worked.
36+
const chunk = compressedChunks.shift();
37+
if (chunk === undefined) return decompress.end();
38+
decompress.write(chunk, common.mustCall(() => {
39+
events.push({ read: decompress.read() });
40+
writeToDecompress();
41+
}));
42+
}
43+
44+
process.on('exit', () => {
45+
assert.deepStrictEqual(events, [
46+
{ written: 'abc' },
47+
'flushed',
48+
{ written: 'def' },
49+
'flushed',
50+
{ written: 'ghi' },
51+
'flushed',
52+
'compress end',
53+
{ read: 'abc' },
54+
{ read: 'def' },
55+
{ read: 'ghi' }
56+
]);
57+
});

test/parallel/test-zlib-write-after-flush.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ for (const [ createCompress, createDecompress ] of [
3939
gunz.on('data', (c) => output += c);
4040
gunz.on('end', common.mustCall(() => {
4141
assert.strictEqual(output, input);
42-
assert.strictEqual(gzip._nextFlush, -1);
4342
}));
4443

4544
// Make sure that flush/write doesn't trigger an assert failure

0 commit comments

Comments
 (0)