Skip to content

http: add parser kOnStreamAlloc callback for faster uploads #52176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions benchmark/_http-benchmarkers.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ class AutocannonBenchmarker {
for (const field in options.headers) {
args.push('-H', `${field}=${options.headers[field]}`);
}
if (options.upload) {
// TODO cleanup of upload data is left as an exercise to the poor user...
const upload_data_path = 'bench-upload.data';
fs.writeFileSync(upload_data_path, Buffer.alloc(options.upload, 'U'));
args.push('-i', upload_data_path);
}
const scheme = options.scheme || 'http';
args.push(`${scheme}://127.0.0.1:${options.port}${options.path}`);
const child = child_process.spawn(this.executable, args);
Expand Down Expand Up @@ -71,6 +77,18 @@ class WrkBenchmarker {
for (const field in options.headers) {
args.push('-H', `${field}: ${options.headers[field]}`);
}
if (options.upload) {
// TODO cleanup of upload data is left as an exercise to the poor user...
const upload_data_path = 'bench-upload.data';
const upload_lua_path = 'bench-upload.lua';
fs.writeFileSync(upload_data_path, Buffer.alloc(options.upload, 'U'));
fs.writeFileSync(upload_lua_path, `
wrk.method = "POST"
file = io.open("${upload_data_path}", "rb")
wrk.body = file:read("*all")
`);
args.push('-s', upload_lua_path);
}
const child = child_process.spawn(this.executable, args);
return child;
}
Expand Down
52 changes: 52 additions & 0 deletions benchmark/http/_buffer_pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// This is a utility that recycles buffers for benchmarking
// the http parser kOnStreamAlloc callback option
// See benchmark/http/server_upload.js
'use strict';

class BufferPool {

constructor(maxPoolSize = 64 * 1024 * 1024, bufSize = 64 * 1024) {
this.maxPoolSize = maxPoolSize;
this.bufSize = bufSize;
this.pool = [];
this.stats = {
alloc: { reuse: 0, new: 0, miss: 0, miss_ranges: {} },
free: { reuse: 0, full: 0, miss: 0, miss_ranges: {} },
};
}

alloc(length) {
if (length !== this.bufSize) {
this._miss(length, this.stats.alloc);
} else if (!this.pool.length) {
this.stats.alloc.new += 1;
return Buffer.allocUnsafeSlow(this.bufSize);
} else {
this.stats.alloc.reuse += 1;
return this.pool.pop();
}
}

free(buf) {
if (buf.length !== this.bufSize) {
this._miss(buf.length, this.stats.free);
} else if (this.pool.length * this.bufSize >= this.maxPoolSize) {
this.stats.free.full += 1;
} else {
this.stats.free.reuse += 1;
this.pool.push(buf);
}
}

// Count the misses of alloc/free per length range (logarithmic)
_miss(length, op_stats) {
const base = 2 ** Math.floor(Math.log2(length));
const range = `[${base}-${2 * base})`;
op_stats.miss += 1;
op_stats.miss_ranges[range] = (op_stats.miss_ranges[range] || 0) + 1;
}

}


exports.BufferPool = BufferPool;
94 changes: 94 additions & 0 deletions benchmark/http/server_upload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
'use strict';

const common = require('../common.js');

const bench = common.createBenchmark(main, {
stats: [0],
duration: [10],
connections: [200],
bufSize: [64 * 1024],
upload: [1 * 1024 * 1024],
backpressure: [
0, // None - just listen on request data events
1, // Pipe to a writable stream
],
delay: [
-1, // No delay
0, // -> setImmediate
1, // -> setTimeout
5, // ...
],
useBufferPool: [0, 1],
});

function main({
stats,
duration,
connections,
bufSize,
upload,
backpressure,
delay,
useBufferPool,
}) {
const http = require('http');
const { pipeline, Writable } = require('stream');
const { HTTPParser } = require('_http_common');
const { BufferPool } = require('./_buffer_pool.js');

const maxPoolSize = connections * upload * 2;
const bufferPool = new BufferPool(maxPoolSize, bufSize);

function onRequest(req, res) {
const onData = (buf, enc, callback) => {
// Processing the data ... and when done, recycle the buffer.
if (useBufferPool) bufferPool.free(buf);
if (callback) callback();
};
const onDataDelayed = delay < 0 ? onData : (
delay === 0 ?
setImmediate.bind(null, onData) :
setTimeout.bind(null, onData, delay)
);
if (useBufferPool) {
req.socket.parser[HTTPParser.kOnStreamAlloc | 0] =
(length) => bufferPool.alloc(length);
}
if (backpressure) {
const pipeTo = new Writable({
highWaterMark: upload,
write: onDataDelayed,
});
pipeline(req, pipeTo, () => res.end());
} else {
req.on('data', onDataDelayed);
req.on('end', () => res.end());
}
}

const headers = {
'Content-Length': upload,
'Content-Type': 'application/octet-stream',
};

const options = {
highWaterMark: upload,
};

const server = http.createServer(options, onRequest);

server.listen(0, () => {
bench.http({
connections,
duration,
headers,
upload,
port: server.address().port,
}, () => {
if (stats && useBufferPool) {
console.log('BufferPool: length', bufferPool.pool.length, 'stats', bufferPool.stats);
}
server.close();
});
});
}
2 changes: 2 additions & 0 deletions lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const kOnBody = HTTPParser.kOnBody | 0;
const kOnMessageComplete = HTTPParser.kOnMessageComplete | 0;
const kOnExecute = HTTPParser.kOnExecute | 0;
const kOnTimeout = HTTPParser.kOnTimeout | 0;
const kOnStreamAlloc = HTTPParser.kOnStreamAlloc | 0;

const MAX_HEADER_PAIRS = 2000;

Expand Down Expand Up @@ -234,6 +235,7 @@ function cleanParser(parser) {
parser[kOnMessageBegin] = null;
parser[kOnExecute] = null;
parser[kOnTimeout] = null;
parser[kOnStreamAlloc] = null;
parser._consumed = false;
parser.onIncoming = null;
parser.joinDuplicateHeaders = null;
Expand Down
87 changes: 85 additions & 2 deletions src/node_http_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ namespace node {
namespace { // NOLINT(build/namespaces)

using v8::Array;
using v8::ArrayBuffer;
using v8::ArrayBufferView;
using v8::Boolean;
using v8::Context;
using v8::EscapableHandleScope;
using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Global;
using v8::HandleScope;
using v8::Int32;
using v8::Integer;
Expand All @@ -77,6 +80,7 @@ const uint32_t kOnBody = 3;
const uint32_t kOnMessageComplete = 4;
const uint32_t kOnExecute = 5;
const uint32_t kOnTimeout = 6;
const uint32_t kOnStreamAlloc = 7;
// Any more fields than this will be flushed into JS
const size_t kMaxHeaderFieldsCount = 32;
// Maximum size of chunk extensions
Expand Down Expand Up @@ -473,7 +477,11 @@ class Parser : public AsyncWrap, public StreamListener {
if (!cb->IsFunction())
return 0;

Local<Value> buffer = Buffer::Copy(env, at, length).ToLocalChecked();
// Get user buffer or fallback to copy to new buffer
Local<Value> buffer = GetUserAllocBuffer(at, length);
if (buffer.IsEmpty()) {
buffer = Buffer::Copy(env, at, length).ToLocalChecked();
}

MaybeLocal<Value> r = MakeCallback(cb.As<Function>(), 1, &buffer);

Expand Down Expand Up @@ -765,13 +773,26 @@ class Parser : public AsyncWrap, public StreamListener {
static const size_t kAllocBufferSize = 64 * 1024;

uv_buf_t OnStreamAlloc(size_t suggested_size) override {
HandleScope scope(env()->isolate());

// If a custom allocator callback is set, it is called to return the next
// buffer for the stream data, which are emitted to the kOnBody callback.
// Setting a custom allocator can be used to recycle buffers after the
// server processed their data, which improves server upload performance
// by reducing redundant memcpy and garbage collection work.

Local<Value> buf = CallJSOnStreamAlloc(suggested_size);
if (!buf.IsEmpty())
return uv_buf_init(Buffer::Data(buf), Buffer::Length(buf));

// For most types of streams, OnStreamRead will be immediately after
// OnStreamAlloc, and will consume all data, so using a static buffer for
// reading is more efficient. For other streams, just use Malloc() directly.

if (binding_data_->parser_buffer_in_use)
return uv_buf_init(Malloc(suggested_size), suggested_size);
binding_data_->parser_buffer_in_use = true;

binding_data_->parser_buffer_in_use = true;
if (binding_data_->parser_buffer.empty())
binding_data_->parser_buffer.resize(kAllocBufferSize);

Expand All @@ -781,12 +802,19 @@ class Parser : public AsyncWrap, public StreamListener {

void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
HandleScope scope(env()->isolate());

const bool is_user_alloc_buffer =
!user_alloc_buffer_.IsEmpty() &&
buf.base == Buffer::Data(user_alloc_buffer_.Get(env()->isolate()));

// Once we’re done here, either indicate that the HTTP parser buffer
// is free for re-use, or free() the data if it didn’t come from there
// in the first place.
auto on_scope_leave = OnScopeLeave([&]() {
if (buf.base == binding_data_->parser_buffer.data())
binding_data_->parser_buffer_in_use = false;
else if (is_user_alloc_buffer)
user_alloc_buffer_.Reset();
else
free(buf.base);
});
Expand Down Expand Up @@ -987,6 +1015,7 @@ class Parser : public AsyncWrap, public StreamListener {
got_exception_ = false;
headers_completed_ = false;
max_http_header_size_ = max_http_header_size;
user_alloc_buffer_.Reset();
}


Expand Down Expand Up @@ -1018,6 +1047,57 @@ class Parser : public AsyncWrap, public StreamListener {
return true;
}

// `CallJSOnStreamAlloc` calls the `kOnStreamAlloc` callback if it is set to
// provide the next buffer to use for `OnStreamAlloc`.
// This allows to read the data from the underlying stream directly to the
// user buffer, preventing unneeded memory copies, and the user can decide
// to recycle it when processing of onBody is done.
// The buffer is stored in a persistent reference so it can be retrieved
// during `OnStreamRead` -> `Execute` -> `on_body` by calling
// `GetUserAllocBuffer`, and the reference held by the parser is discarded
// at the end of `OnStreamRead`.
Local<Value> CallJSOnStreamAlloc(size_t suggested_size) {
if (!user_alloc_buffer_.IsEmpty()) return Local<Value>();

Local<Value> cb =
object()->Get(env()->context(), kOnStreamAlloc).ToLocalChecked();
if (!cb->IsFunction()) return Local<Value>();

Local<Value> arg =
Integer::NewFromUnsigned(env()->isolate(), kAllocBufferSize);
Local<Value> b =
MakeCallback(cb.As<Function>(), 1, &arg).FromMaybe(Local<Value>());
if (b.IsEmpty() || !Buffer::HasInstance(b)) return Local<Value>();

user_alloc_buffer_.Reset(env()->isolate(), b);
return b;
}

// `GetUserAllocBuffer` provides a buffer to `on_body` if it matches the
// buffer provided by the user `kOnStreamAlloc` callback.
// Partial match is also accepted by slicing the buffer.
Local<Value> GetUserAllocBuffer(const char* at, size_t length) {
if (user_alloc_buffer_.IsEmpty()) return Local<Value>();

Local<Value> b = user_alloc_buffer_.Get(env()->isolate());
DCHECK(Buffer::HasInstance(b));
const size_t alloc_len = Buffer::Length(b);
const char* alloc_start = Buffer::Data(b);
const char* alloc_end = alloc_start + alloc_len;

// exact match
if (alloc_start == at && alloc_len == length) return b;

// slice match
if (alloc_start <= at && alloc_end >= at + length) {
Local<ArrayBufferView> abv = b.As<ArrayBufferView>();
Local<ArrayBuffer> ab = abv->Buffer();
const size_t offset = abv->ByteOffset() + at - alloc_start;
return Buffer::New(env()->isolate(), ab, offset, length).ToLocalChecked();
}

return Local<Value>();
}

llhttp_t parser_;
StringPtr fields_[kMaxHeaderFieldsCount]; // header fields
Expand All @@ -1037,6 +1117,7 @@ class Parser : public AsyncWrap, public StreamListener {
uint64_t max_http_header_size_;
uint64_t last_message_start_;
ConnectionsList* connectionsList_;
Global<Value> user_alloc_buffer_;

BaseObjectPtr<BindingData> binding_data_;

Expand Down Expand Up @@ -1274,6 +1355,8 @@ void InitializeHttpParser(Local<Object> target,
Integer::NewFromUnsigned(env->isolate(), kOnExecute));
t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnTimeout"),
Integer::NewFromUnsigned(env->isolate(), kOnTimeout));
t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnStreamAlloc"),
Integer::NewFromUnsigned(env->isolate(), kOnStreamAlloc));

t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kLenientNone"),
Integer::NewFromUnsigned(env->isolate(), kLenientNone));
Expand Down
1 change: 1 addition & 0 deletions typings/internalBinding/http_parser.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ declare namespace InternalHttpParserBinding {
static kOnMessageComplete: 4;
static kOnExecute: 5;
static kOnTimeout: 6;
static kOnStreamAlloc: 7;

static kLenientNone: number;
static kLenientHeaders: number;
Expand Down