Skip to content

Commit e4870d8

Browse files
authored
perf(ext/node): native vectored write for server streams (#19752)
``` # main $ ./load_test 10 0.0.0.0 8080 0 0 Using message size of 20 bytes Running benchmark now... Msg/sec: 106182.250000 Msg/sec: 110279.750000 ^C # this PR $ ./load_test 10 0.0.0.0 8080 0 0 Using message size of 20 bytes Running benchmark now... Msg/sec: 131632.250000 Msg/sec: 134754.250000 ^C ```
1 parent 7d022ad commit e4870d8

File tree

4 files changed

+75
-2
lines changed

4 files changed

+75
-2
lines changed

ext/http/http_next.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use deno_core::ByteString;
3232
use deno_core::CancelFuture;
3333
use deno_core::CancelHandle;
3434
use deno_core::CancelTryFuture;
35+
use deno_core::JsBuffer;
3536
use deno_core::OpState;
3637
use deno_core::RcRef;
3738
use deno_core::Resource;
@@ -1034,6 +1035,34 @@ impl UpgradeStream {
10341035
.try_or_cancel(cancel_handle)
10351036
.await
10361037
}
1038+
1039+
async fn write_vectored(
1040+
self: Rc<Self>,
1041+
buf1: &[u8],
1042+
buf2: &[u8],
1043+
) -> Result<usize, AnyError> {
1044+
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
1045+
1046+
let total = buf1.len() + buf2.len();
1047+
let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
1048+
let mut nwritten = wr.write_vectored(&bufs).await?;
1049+
if nwritten == total {
1050+
return Ok(nwritten);
1051+
}
1052+
1053+
// Slightly more optimized than (unstable) write_all_vectored for 2 iovecs.
1054+
while nwritten <= buf1.len() {
1055+
bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]);
1056+
nwritten += wr.write_vectored(&bufs).await?;
1057+
}
1058+
1059+
// First buffer out of the way.
1060+
if nwritten < total && nwritten > buf1.len() {
1061+
wr.write_all(&buf2[nwritten - buf1.len()..]).await?;
1062+
}
1063+
1064+
Ok(total)
1065+
}
10371066
}
10381067

10391068
impl Resource for UpgradeStream {
@@ -1048,3 +1077,16 @@ impl Resource for UpgradeStream {
10481077
self.cancel_handle.cancel();
10491078
}
10501079
}
1080+
1081+
#[op]
1082+
pub async fn op_raw_write_vectored(
1083+
state: Rc<RefCell<OpState>>,
1084+
rid: ResourceId,
1085+
buf1: JsBuffer,
1086+
buf2: JsBuffer,
1087+
) -> Result<usize, AnyError> {
1088+
let resource: Rc<UpgradeStream> =
1089+
state.borrow().resource_table.get::<UpgradeStream>(rid)?;
1090+
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
1091+
Ok(nwritten)
1092+
}

ext/http/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ deno_core::extension!(
120120
http_next::op_http_track,
121121
http_next::op_http_upgrade_websocket_next,
122122
http_next::op_http_upgrade_raw,
123+
http_next::op_raw_write_vectored,
123124
http_next::op_http_try_wait,
124125
http_next::op_http_wait,
125126
],

ext/node/polyfills/internal/stream_base_commons.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,9 @@ export function onStreamRead(
253253
}
254254
} else {
255255
const offset = streamBaseState[kArrayBufferOffset];
256-
const buf = Buffer.from(arrayBuffer, offset, nread);
256+
// Performance note: Pass ArrayBuffer to Buffer#from to avoid
257+
// copy.
258+
const buf = Buffer.from(arrayBuffer.buffer, offset, nread);
257259
result = stream.push(buf);
258260
}
259261

ext/node/polyfills/internal_binding/stream_wrap.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ import {
4040
} from "ext:deno_node/internal_binding/async_wrap.ts";
4141
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
4242

43+
const core = globalThis.Deno.core;
44+
const { ops } = core;
45+
4346
interface Reader {
4447
read(p: Uint8Array): Promise<number | null>;
4548
}
@@ -54,7 +57,7 @@ export interface Closer {
5457

5558
type Ref = { ref(): void; unref(): void };
5659

57-
enum StreamBaseStateFields {
60+
const enum StreamBaseStateFields {
5861
kReadBytesOrError,
5962
kArrayBufferOffset,
6063
kBytesWritten,
@@ -195,6 +198,31 @@ export class LibuvStreamWrap extends HandleWrap {
195198
chunks: Buffer[] | (string | Buffer)[],
196199
allBuffers: boolean,
197200
): number {
201+
const supportsWritev = this.provider === providerType.TCPSERVERWRAP;
202+
// Fast case optimization: two chunks, and all buffers.
203+
if (chunks.length === 2 && allBuffers && supportsWritev) {
204+
// String chunks.
205+
if (typeof chunks[0] === "string") chunks[0] = Buffer.from(chunks[0]);
206+
if (typeof chunks[1] === "string") chunks[1] = Buffer.from(chunks[1]);
207+
208+
ops.op_raw_write_vectored(
209+
this[kStreamBaseField]!.rid,
210+
chunks[0],
211+
chunks[1],
212+
).then((nwritten) => {
213+
try {
214+
req.oncomplete(0);
215+
} catch {
216+
// swallow callback errors.
217+
}
218+
219+
streamBaseState[kBytesWritten] = nwritten;
220+
this.bytesWritten += nwritten;
221+
});
222+
223+
return 0;
224+
}
225+
198226
const count = allBuffers ? chunks.length : chunks.length >> 1;
199227
const buffers: Buffer[] = new Array(count);
200228

0 commit comments

Comments
 (0)