diff --git a/benchmark/_http-benchmarkers.js b/benchmark/_http-benchmarkers.js index ae5429fa721750..40860630f968e1 100644 --- a/benchmark/_http-benchmarkers.js +++ b/benchmark/_http-benchmarkers.js @@ -29,6 +29,12 @@ class AutocannonBenchmarker { for (const field in options.headers) { args.push('-H', `${field}=${options.headers[field]}`); } + if (options.upload) { + // TODO cleanup of upload data is left as an exercise to the poor user... + const upload_data_path = 'bench-upload.data'; + fs.writeFileSync(upload_data_path, Buffer.alloc(options.upload, 'U')); + args.push('-i', upload_data_path); + } const scheme = options.scheme || 'http'; args.push(`${scheme}://127.0.0.1:${options.port}${options.path}`); const child = child_process.spawn(this.executable, args); @@ -71,6 +77,18 @@ class WrkBenchmarker { for (const field in options.headers) { args.push('-H', `${field}: ${options.headers[field]}`); } + if (options.upload) { + // TODO cleanup of upload data is left as an exercise to the poor user... + const upload_data_path = 'bench-upload.data'; + const upload_lua_path = 'bench-upload.lua'; + fs.writeFileSync(upload_data_path, Buffer.alloc(options.upload, 'U')); + fs.writeFileSync(upload_lua_path, ` + wrk.method = "POST" + file = io.open("${upload_data_path}", "rb") + wrk.body = file:read("*all") + `); + args.push('-s', upload_lua_path); + } const child = child_process.spawn(this.executable, args); return child; } diff --git a/benchmark/http/_buffer_pool.js b/benchmark/http/_buffer_pool.js new file mode 100644 index 00000000000000..8db1b5a512a892 --- /dev/null +++ b/benchmark/http/_buffer_pool.js @@ -0,0 +1,52 @@ +// This is a utility that recycles buffers for benchmarking +// the http parser kOnStreamAlloc callback option +// See benchmark/http/server_upload.js +'use strict'; + +class BufferPool { + + constructor(maxPoolSize = 64 * 1024 * 1024, bufSize = 64 * 1024) { + this.maxPoolSize = maxPoolSize; + this.bufSize = bufSize; + this.pool = []; + this.stats = { + alloc: { reuse: 0, new: 0, miss: 0, miss_ranges: {} }, + free: { reuse: 0, full: 0, miss: 0, miss_ranges: {} }, + }; + } + + alloc(length) { + if (length !== this.bufSize) { + this._miss(length, this.stats.alloc); + } else if (!this.pool.length) { + this.stats.alloc.new += 1; + return Buffer.allocUnsafeSlow(this.bufSize); + } else { + this.stats.alloc.reuse += 1; + return this.pool.pop(); + } + } + + free(buf) { + if (buf.length !== this.bufSize) { + this._miss(buf.length, this.stats.free); + } else if (this.pool.length * this.bufSize >= this.maxPoolSize) { + this.stats.free.full += 1; + } else { + this.stats.free.reuse += 1; + this.pool.push(buf); + } + } + + // Count the misses of alloc/free per length range (logarithmic) + _miss(length, op_stats) { + const base = 2 ** Math.floor(Math.log2(length)); + const range = `[${base}-${2 * base})`; + op_stats.miss += 1; + op_stats.miss_ranges[range] = (op_stats.miss_ranges[range] || 0) + 1; + } + +} + + +exports.BufferPool = BufferPool; diff --git a/benchmark/http/server_upload.js b/benchmark/http/server_upload.js new file mode 100644 index 00000000000000..f75f705eb3615f --- /dev/null +++ b/benchmark/http/server_upload.js @@ -0,0 +1,94 @@ +'use strict'; + +const common = require('../common.js'); + +const bench = common.createBenchmark(main, { + stats: [0], + duration: [10], + connections: [200], + bufSize: [64 * 1024], + upload: [1 * 1024 * 1024], + backpressure: [ + 0, // None - just listen on request data events + 1, // Pipe to a writable stream + ], + delay: [ + -1, // No delay + 0, // -> setImmediate + 1, // -> setTimeout + 5, // ... + ], + useBufferPool: [0, 1], +}); + +function main({ + stats, + duration, + connections, + bufSize, + upload, + backpressure, + delay, + useBufferPool, +}) { + const http = require('http'); + const { pipeline, Writable } = require('stream'); + const { HTTPParser } = require('_http_common'); + const { BufferPool } = require('./_buffer_pool.js'); + + const maxPoolSize = connections * upload * 2; + const bufferPool = new BufferPool(maxPoolSize, bufSize); + + function onRequest(req, res) { + const onData = (buf, enc, callback) => { + // Processing the data ... and when done, recycle the buffer. + if (useBufferPool) bufferPool.free(buf); + if (callback) callback(); + }; + const onDataDelayed = delay < 0 ? onData : ( + delay === 0 ? + setImmediate.bind(null, onData) : + setTimeout.bind(null, onData, delay) + ); + if (useBufferPool) { + req.socket.parser[HTTPParser.kOnStreamAlloc | 0] = + (length) => bufferPool.alloc(length); + } + if (backpressure) { + const pipeTo = new Writable({ + highWaterMark: upload, + write: onDataDelayed, + }); + pipeline(req, pipeTo, () => res.end()); + } else { + req.on('data', onDataDelayed); + req.on('end', () => res.end()); + } + } + + const headers = { + 'Content-Length': upload, + 'Content-Type': 'application/octet-stream', + }; + + const options = { + highWaterMark: upload, + }; + + const server = http.createServer(options, onRequest); + + server.listen(0, () => { + bench.http({ + connections, + duration, + headers, + upload, + port: server.address().port, + }, () => { + if (stats && useBufferPool) { + console.log('BufferPool: length', bufferPool.pool.length, 'stats', bufferPool.stats); + } + server.close(); + }); + }); +} diff --git a/lib/_http_common.js b/lib/_http_common.js index 2c7866d350eea2..ca96952b750c49 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -48,6 +48,7 @@ const kOnBody = HTTPParser.kOnBody | 0; const kOnMessageComplete = HTTPParser.kOnMessageComplete | 0; const kOnExecute = HTTPParser.kOnExecute | 0; const kOnTimeout = HTTPParser.kOnTimeout | 0; +const kOnStreamAlloc = HTTPParser.kOnStreamAlloc | 0; const MAX_HEADER_PAIRS = 2000; @@ -234,6 +235,7 @@ function cleanParser(parser) { parser[kOnMessageBegin] = null; parser[kOnExecute] = null; parser[kOnTimeout] = null; + parser[kOnStreamAlloc] = null; parser._consumed = false; parser.onIncoming = null; parser.joinDuplicateHeaders = null; diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index d6732a7ddc571f..27a6645cfdfb36 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -50,6 +50,8 @@ namespace node { namespace { // NOLINT(build/namespaces) using v8::Array; +using v8::ArrayBuffer; +using v8::ArrayBufferView; using v8::Boolean; using v8::Context; using v8::EscapableHandleScope; @@ -57,6 +59,7 @@ using v8::Exception; using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; +using v8::Global; using v8::HandleScope; using v8::Int32; using v8::Integer; @@ -77,6 +80,7 @@ const uint32_t kOnBody = 3; const uint32_t kOnMessageComplete = 4; const uint32_t kOnExecute = 5; const uint32_t kOnTimeout = 6; +const uint32_t kOnStreamAlloc = 7; // Any more fields than this will be flushed into JS const size_t kMaxHeaderFieldsCount = 32; // Maximum size of chunk extensions @@ -473,7 +477,11 @@ class Parser : public AsyncWrap, public StreamListener { if (!cb->IsFunction()) return 0; - Local buffer = Buffer::Copy(env, at, length).ToLocalChecked(); + // Get user buffer or fallback to copy to new buffer + Local buffer = GetUserAllocBuffer(at, length); + if (buffer.IsEmpty()) { + buffer = Buffer::Copy(env, at, length).ToLocalChecked(); + } MaybeLocal r = MakeCallback(cb.As(), 1, &buffer); @@ -765,13 +773,26 @@ class Parser : public AsyncWrap, public StreamListener { static const size_t kAllocBufferSize = 64 * 1024; uv_buf_t OnStreamAlloc(size_t suggested_size) override { + HandleScope scope(env()->isolate()); + + // If a custom allocator callback is set, it is called to return the next + // buffer for the stream data, which are emitted to the kOnBody callback. + // Setting a custom allocator can be used to recycle buffers after the + // server processed their data, which improves server upload performance + // by reducing redundant memcpy and garbage collection work. + + Local buf = CallJSOnStreamAlloc(suggested_size); + if (!buf.IsEmpty()) + return uv_buf_init(Buffer::Data(buf), Buffer::Length(buf)); + // For most types of streams, OnStreamRead will be immediately after // OnStreamAlloc, and will consume all data, so using a static buffer for // reading is more efficient. For other streams, just use Malloc() directly. + if (binding_data_->parser_buffer_in_use) return uv_buf_init(Malloc(suggested_size), suggested_size); - binding_data_->parser_buffer_in_use = true; + binding_data_->parser_buffer_in_use = true; if (binding_data_->parser_buffer.empty()) binding_data_->parser_buffer.resize(kAllocBufferSize); @@ -781,12 +802,19 @@ class Parser : public AsyncWrap, public StreamListener { void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { HandleScope scope(env()->isolate()); + + const bool is_user_alloc_buffer = + !user_alloc_buffer_.IsEmpty() && + buf.base == Buffer::Data(user_alloc_buffer_.Get(env()->isolate())); + // Once we’re done here, either indicate that the HTTP parser buffer // is free for re-use, or free() the data if it didn’t come from there // in the first place. auto on_scope_leave = OnScopeLeave([&]() { if (buf.base == binding_data_->parser_buffer.data()) binding_data_->parser_buffer_in_use = false; + else if (is_user_alloc_buffer) + user_alloc_buffer_.Reset(); else free(buf.base); }); @@ -987,6 +1015,7 @@ class Parser : public AsyncWrap, public StreamListener { got_exception_ = false; headers_completed_ = false; max_http_header_size_ = max_http_header_size; + user_alloc_buffer_.Reset(); } @@ -1018,6 +1047,57 @@ class Parser : public AsyncWrap, public StreamListener { return true; } + // `CallJSOnStreamAlloc` calls the `kOnStreamAlloc` callback if it is set to + // provide the next buffer to use for `OnStreamAlloc`. + // This allows to read the data from the underlying stream directly to the + // user buffer, preventing unneeded memory copies, and the user can decide + // to recycle it when processing of onBody is done. + // The buffer is stored in a persistent reference so it can be retrieved + // during `OnStreamRead` -> `Execute` -> `on_body` by calling + // `GetUserAllocBuffer`, and the reference held by the parser is discarded + // at the end of `OnStreamRead`. + Local CallJSOnStreamAlloc(size_t suggested_size) { + if (!user_alloc_buffer_.IsEmpty()) return Local(); + + Local cb = + object()->Get(env()->context(), kOnStreamAlloc).ToLocalChecked(); + if (!cb->IsFunction()) return Local(); + + Local arg = + Integer::NewFromUnsigned(env()->isolate(), kAllocBufferSize); + Local b = + MakeCallback(cb.As(), 1, &arg).FromMaybe(Local()); + if (b.IsEmpty() || !Buffer::HasInstance(b)) return Local(); + + user_alloc_buffer_.Reset(env()->isolate(), b); + return b; + } + + // `GetUserAllocBuffer` provides a buffer to `on_body` if it matches the + // buffer provided by the user `kOnStreamAlloc` callback. + // Partial match is also accepted by slicing the buffer. + Local GetUserAllocBuffer(const char* at, size_t length) { + if (user_alloc_buffer_.IsEmpty()) return Local(); + + Local b = user_alloc_buffer_.Get(env()->isolate()); + DCHECK(Buffer::HasInstance(b)); + const size_t alloc_len = Buffer::Length(b); + const char* alloc_start = Buffer::Data(b); + const char* alloc_end = alloc_start + alloc_len; + + // exact match + if (alloc_start == at && alloc_len == length) return b; + + // slice match + if (alloc_start <= at && alloc_end >= at + length) { + Local abv = b.As(); + Local ab = abv->Buffer(); + const size_t offset = abv->ByteOffset() + at - alloc_start; + return Buffer::New(env()->isolate(), ab, offset, length).ToLocalChecked(); + } + + return Local(); + } llhttp_t parser_; StringPtr fields_[kMaxHeaderFieldsCount]; // header fields @@ -1037,6 +1117,7 @@ class Parser : public AsyncWrap, public StreamListener { uint64_t max_http_header_size_; uint64_t last_message_start_; ConnectionsList* connectionsList_; + Global user_alloc_buffer_; BaseObjectPtr binding_data_; @@ -1274,6 +1355,8 @@ void InitializeHttpParser(Local target, Integer::NewFromUnsigned(env->isolate(), kOnExecute)); t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnTimeout"), Integer::NewFromUnsigned(env->isolate(), kOnTimeout)); + t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnStreamAlloc"), + Integer::NewFromUnsigned(env->isolate(), kOnStreamAlloc)); t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kLenientNone"), Integer::NewFromUnsigned(env->isolate(), kLenientNone)); diff --git a/typings/internalBinding/http_parser.d.ts b/typings/internalBinding/http_parser.d.ts index 5ab5651b637176..557b92833a66d9 100644 --- a/typings/internalBinding/http_parser.d.ts +++ b/typings/internalBinding/http_parser.d.ts @@ -13,6 +13,7 @@ declare namespace InternalHttpParserBinding { static kOnMessageComplete: 4; static kOnExecute: 5; static kOnTimeout: 6; + static kOnStreamAlloc: 7; static kLenientNone: number; static kLenientHeaders: number;