Skip to content

Commit 237bcdf

Browse files
authored
[streams] speed up Readable in some cases (#1708)
If `encoding` is set, no `Buffer`s would be exposed thus `Uint8Array` can be used directly. - fix data corruption in `BufferList.concat()` - fix segfaults in `BufferList.join()`
1 parent 0b395ca commit 237bcdf

File tree

3 files changed

+140
-14
lines changed

3 files changed

+140
-14
lines changed

src/bun.js/bindings/JSBufferList.cpp

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,17 @@ JSC::JSValue JSBufferList::concat(JSC::VM& vm, JSC::JSGlobalObject* lexicalGloba
5353
size_t i = 0;
5454
for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) {
5555
auto array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get());
56-
if (!array)
57-
continue;
58-
size_t length = array->byteLength();
59-
uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable);
56+
if (UNLIKELY(!array)) {
57+
return throwTypeError(lexicalGlobalObject, throwScope, "concat can only be called when all buffers are Uint8Array"_s);
58+
}
59+
const size_t length = array->byteLength();
60+
if (UNLIKELY(i + length > n)) {
61+
return throwRangeError(lexicalGlobalObject, throwScope, "specified size too small to fit all buffers"_s);
62+
}
63+
if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable))) {
64+
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
65+
}
66+
i += length;
6067
}
6168

6269
RELEASE_AND_RETURN(throwScope, uint8Array);
@@ -68,16 +75,18 @@ JSC::JSValue JSBufferList::join(JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalO
6875
if (length() == 0) {
6976
RELEASE_AND_RETURN(throwScope, JSC::jsEmptyString(vm));
7077
}
71-
bool needSeq = false;
78+
const bool needSeq = seq->length() != 0;
79+
const auto end = m_deque.end();
7280
JSRopeString::RopeBuilder<RecordOverflow> ropeBuilder(vm);
73-
for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) {
74-
auto str = JSC::jsCast<JSC::JSString*>(iter->get());
81+
for (auto iter = m_deque.begin(); ;) {
82+
auto str = iter->get().toString(lexicalGlobalObject);
83+
if (!ropeBuilder.append(str))
84+
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
85+
if (++iter == end)
86+
break;
7587
if (needSeq)
7688
if (!ropeBuilder.append(seq))
7789
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
78-
if (!ropeBuilder.append(str))
79-
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
80-
needSeq = seq->length() != 0;
8190
}
8291
RELEASE_AND_RETURN(throwScope, ropeBuilder.release());
8392
}
@@ -142,11 +151,13 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG
142151
for (auto iter = m_deque.begin(); iter != m_deque.end() && n > 0; ++iter) {
143152
JSC::JSUint8Array* array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get());
144153
if (UNLIKELY(!array)) {
145-
return throwOutOfMemoryError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s);
154+
return throwTypeError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s);
146155
}
147156
size_t length = array->byteLength();
148157
if (length > n) {
149-
uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable);
158+
if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable))) {
159+
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
160+
}
150161
// create a new array of size length - n.
151162
// is there a faster way to do this?
152163
auto arrayBuffer = JSC::ArrayBuffer::tryCreateUninitialized(length - n, 1);
@@ -160,7 +171,9 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG
160171
memcpy(newArray->typedVector(), array->typedVector() + n, length - n);
161172
iter->set(vm, this, newArray);
162173
} else {
163-
uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable);
174+
if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable))) {
175+
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
176+
}
164177
m_deque.removeFirst();
165178
}
166179
n -= static_cast<int32_t>(length);

src/bun.js/streams.exports.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2949,7 +2949,9 @@ var require_readable = __commonJS({
29492949
} else if (chunk instanceof Buffer) {
29502950
encoding = "";
29512951
} else if (Stream._isUint8Array(chunk)) {
2952-
chunk = Stream._uint8ArrayToBuffer(chunk);
2952+
if (addToFront || !state.decoder) {
2953+
chunk = Stream._uint8ArrayToBuffer(chunk);
2954+
}
29532955
encoding = "";
29542956
} else if (chunk != null) {
29552957
err = new ERR_INVALID_ARG_TYPE(
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { beforeEach, describe, expect, it } from "bun:test";
2+
import { Readable, Writable } from "stream";
3+
4+
const ABC = new Uint8Array([0x41, 0x42, 0x43]);
5+
const DEF = new Uint8Array([0x44, 0x45, 0x46]);
6+
const GHI = new Uint8Array([0x47, 0x48, 0x49]);
7+
8+
describe("Writable", () => {
9+
let called;
10+
11+
function logCall(fn, id) {
12+
return function() {
13+
called[id] = (called[id] || 0) + 1;
14+
return fn.apply(this, arguments);
15+
};
16+
}
17+
18+
beforeEach(() => {
19+
called = [];
20+
});
21+
22+
it("should perform simple operations", () => {
23+
let n = 0;
24+
const writable = new Writable({
25+
write: logCall((chunk, encoding, cb) => {
26+
expect(chunk instanceof Buffer).toBe(true);
27+
if (n++ === 0) {
28+
expect(String(chunk)).toBe("ABC");
29+
} else {
30+
expect(String(chunk)).toBe("DEF");
31+
}
32+
33+
cb();
34+
}, 0),
35+
});
36+
37+
writable.write(ABC);
38+
writable.end(DEF);
39+
expect(called).toEqual([ 2 ]);
40+
});
41+
42+
it("should pass in Uint8Array in object mode", () => {
43+
const writable = new Writable({
44+
objectMode: true,
45+
write: logCall((chunk, encoding, cb) => {
46+
expect(chunk instanceof Buffer).toBe(false);
47+
expect(chunk instanceof Uint8Array).toBe(true);
48+
expect(chunk).toStrictEqual(ABC);
49+
expect(encoding).toBe("utf8");
50+
cb();
51+
}, 0),
52+
});
53+
54+
writable.end(ABC);
55+
expect(called).toEqual([ 1 ]);
56+
});
57+
58+
it("should handle multiple writes carried out via writev()", () => {
59+
let callback;
60+
61+
const writable = new Writable({
62+
write: logCall((chunk, encoding, cb) => {
63+
expect(chunk instanceof Buffer).toBe(true);
64+
expect(encoding).toBe("buffer");
65+
expect(String(chunk)).toBe("ABC");
66+
callback = cb;
67+
}, 0),
68+
writev: logCall((chunks, cb) => {
69+
expect(chunks.length).toBe(2);
70+
expect(chunks[0].encoding).toBe("buffer");
71+
expect(chunks[1].encoding).toBe("buffer");
72+
expect(chunks[0].chunk + chunks[1].chunk).toBe("DEFGHI");
73+
}, 1),
74+
});
75+
76+
writable.write(ABC);
77+
writable.write(DEF);
78+
writable.end(GHI);
79+
callback();
80+
expect(called).toEqual([ 1, 1 ]);
81+
});
82+
});
83+
84+
describe("Readable", () => {
85+
it("should perform simple operations", () => {
86+
const readable = new Readable({
87+
read() {}
88+
});
89+
90+
readable.push(DEF);
91+
readable.unshift(ABC);
92+
93+
const buf = readable.read();
94+
expect(buf instanceof Buffer).toBe(true);
95+
expect([ ...buf ]).toEqual([ ...ABC, ...DEF ]);
96+
});
97+
98+
it("should work with setEncoding()", () => {
99+
const readable = new Readable({
100+
read() {}
101+
});
102+
103+
readable.setEncoding("utf8");
104+
105+
readable.push(DEF);
106+
readable.unshift(ABC);
107+
108+
const out = readable.read();
109+
expect(out).toBe("ABCDEF");
110+
});
111+
});

0 commit comments

Comments
 (0)