Skip to content

Commit cba271f

Browse files
authored
rpcdaemon: use zlib for streaming (#2871)
1 parent 7fcccf2 commit cba271f

File tree

6 files changed

+106
-27
lines changed

6 files changed

+106
-27
lines changed

.github/actions/perf-common-steps/action.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ runs:
182182
run_perf mainnet eth_getTransactionByHash 5 stress_test_eth_getTransactionByHash_13M 1:1,100:30,1000:20,10000:20
183183
run_perf mainnet eth_getTransactionReceipt 5 stress_test_eth_getTransactionReceipt_14M 1:1,100:30,1000:20,5000:20,10000:20,20000:20
184184
run_perf mainnet eth_createAccessList 5 stress_test_eth_createAccessList_16M 1:1,100:30,1000:20,10000:20,20000:20
185+
run_perf mainnet debug_traceTransaction 5 stress_test_debug_trace_transaction_21M.tar 1:1,100:30,1000:20,2000:20
185186
fi
186187
187188
if [ $failed_test -eq 0 ]; then

silkworm/rpc/core/evm_debug.cpp

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,30 +59,32 @@ std::string DebugConfig::to_string() const {
5959

6060
return out.str();
6161
}
62-
6362
std::string uint256_to_hex(const evmone::uint256& x) {
64-
std::stringstream ss;
65-
ss << "0x";
63+
static constexpr std::string_view kHexDigits{"0123456789abcdef"};
64+
std::string out(2 + 64, '\0');
65+
char* dest = out.data();
66+
67+
*dest++ = '0';
68+
*dest++ = 'x';
6669

6770
bool leading_zeros = true;
6871
const uint64_t* px = &x[0];
6972
for (int i = 3; i >= 0; --i) {
70-
if (px[i] == 0 && leading_zeros) {
71-
continue;
72-
}
73-
if (leading_zeros) {
74-
ss << std::hex << px[i];
75-
leading_zeros = false;
76-
} else {
77-
ss << std::setfill('0') << std::setw(16) << std::hex << px[i];
73+
for (int shift = 60; shift >= 0; shift -= 4) {
74+
char hex_digit = kHexDigits[(px[i] >> shift) & 0xF];
75+
if (hex_digit != '0' || !leading_zeros) {
76+
*dest++ = hex_digit;
77+
leading_zeros = false;
78+
}
7879
}
7980
}
8081

8182
if (leading_zeros) {
82-
ss << "0";
83+
*dest++ = '0';
8384
}
8485

85-
return ss.str();
86+
out.resize(static_cast<size_t>(dest - out.data()));
87+
return out;
8688
}
8789

8890
static void output_stack(std::vector<std::string>& vect, const evmone::uint256* stack, int stack_size) {

silkworm/rpc/http/chunker.hpp

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
#pragma once
55

66
#include <algorithm>
7+
#include <deque>
78
#include <memory>
8-
#include <queue>
99

1010
namespace silkworm::rpc::http {
1111

@@ -27,28 +27,27 @@ class Chunker {
2727
// creates chunk: even if new:buffer is greater kDefaultMaxChunkSize
2828
while (position < new_buffer.size()) {
2929
size_t available_space = kDefaultMaxChunkSize - current_chunk_.size();
30-
size_t chunk_size = std::min(available_space, new_buffer.size() - position);
30+
const size_t chunk_size = std::min(available_space, new_buffer.size() - position);
3131

3232
current_chunk_.append(new_buffer, position, chunk_size);
3333
position += chunk_size;
3434

35-
// one chunk is completed copy it in complet_chunk
36-
if (current_chunk_.size() == kDefaultMaxChunkSize) {
37-
complete_chunk_.push(current_chunk_);
35+
// one chunk is completed copy it in complete_chunk queue
36+
if (current_chunk_.size() >= kDefaultMaxChunkSize) {
37+
complete_chunk_.push_back(std::move(current_chunk_));
3838
current_chunk_.clear();
39-
current_chunk_.reserve(kDefaultMaxChunkSize);
4039
}
4140
}
4241
}
4342

4443
std::pair<std::string, bool> get_complete_chunk() {
4544
if (!complete_chunk_.empty()) {
46-
// at least one chunk is availble return it , indicating if first chunk or not
45+
// at least one chunk is available return it , indicating if first chunk or not
4746
auto ret_first_chunk = !first_chunk_completed_;
4847
first_chunk_completed_ = true;
49-
std::string chunk = complete_chunk_.front();
50-
complete_chunk_.pop();
51-
return std::make_pair(chunk, ret_first_chunk);
48+
std::string chunk = std::move(complete_chunk_.front());
49+
complete_chunk_.pop_front();
50+
return std::make_pair(std::move(chunk), ret_first_chunk);
5251
}
5352
// queue is empty no chunk are available
5453
return std::make_pair("", false);
@@ -58,18 +57,18 @@ class Chunker {
5857
return !complete_chunk_.empty();
5958
}
6059

61-
std::pair<std::string, bool> get_remainder() const {
60+
std::pair<std::string, bool> get_remainder() {
6261
if (current_chunk_.empty()) {
6362
// no bytes are present on current_chunk so return empty string and indication if first chunk or not
6463
// we are in two possible cases: at least one completed chunk is already produced, or any chunk are produced
6564
return std::make_pair("", !first_chunk_completed_);
6665
}
6766
// returns the chunk
68-
return std::make_pair(current_chunk_, !first_chunk_completed_);
67+
return std::make_pair(std::move(current_chunk_), !first_chunk_completed_);
6968
}
7069

7170
private:
72-
std::queue<std::string> complete_chunk_;
71+
std::deque<std::string> complete_chunk_;
7372
std::string current_chunk_;
7473
bool first_chunk_completed_{false};
7574
};

silkworm/rpc/http/connection.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,10 @@ Task<void> Connection::open_stream(uint64_t request_id) {
268268

269269
// add chunking supports
270270
request_data.chunk = std::make_unique<Chunker>();
271+
if (request_data.gzip_encoding_requested) {
272+
request_data.zlib_compressor = std::make_unique<ZlibCompressor>();
273+
}
274+
request_data.chunk = std::make_unique<Chunker>();
271275

272276
co_return;
273277
}
@@ -557,4 +561,10 @@ Task<void> Connection::compress(const std::string& clear_data, std::string& comp
557561
});
558562
}
559563

564+
Task<void> Connection::compress_stream(const std::string& clear_data, std::string& compressed_data, const RequestData& req_data, bool last) {
565+
co_await async_task(workers_.executor(), [&]() -> void {
566+
req_data.zlib_compressor->compress_chunk(clear_data, compressed_data, last);
567+
});
568+
}
569+
560570
} // namespace silkworm::rpc::http

silkworm/rpc/http/connection.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <silkworm/rpc/common/interface_log.hpp>
2121
#include <silkworm/rpc/common/worker_pool.hpp>
2222
#include <silkworm/rpc/http/chunker.hpp>
23+
#include <silkworm/rpc/http/zlib_compressor.hpp>
2324
#include <silkworm/rpc/transport/request_handler.hpp>
2425
#include <silkworm/rpc/transport/stream_writer.hpp>
2526
#include <silkworm/rpc/ws/connection.hpp>
@@ -28,7 +29,7 @@ namespace silkworm::rpc::http {
2829

2930
using RequestWithStringBody = boost::beast::http::request<boost::beast::http::string_body>;
3031

31-
inline constexpr size_t kDefaultCapacity = 4 * 1024;
32+
inline constexpr size_t kDefaultCapacity = 1 * 1024 * 1024;
3233

3334
struct RequestData {
3435
bool request_keep_alive{false};
@@ -38,6 +39,7 @@ struct RequestData {
3839
std::string origin;
3940
boost::beast::http::verb method{boost::beast::http::verb::unknown};
4041
std::unique_ptr<Chunker> chunk;
42+
std::unique_ptr<ZlibCompressor> zlib_compressor;
4143
};
4244

4345
//! Represents a single connection from a client.
@@ -98,6 +100,7 @@ class Connection : public StreamWriter {
98100
static std::string get_date_time();
99101

100102
Task<void> compress(const std::string& clear_data, std::string& compressed_data);
103+
Task<void> compress_stream(const std::string& clear_data, std::string& compressed_data, const RequestData& req_data, bool last);
101104
Task<void> create_chunk_header(RequestData& request_data);
102105
Task<size_t> send_chunk(const std::string& content);
103106

silkworm/rpc/http/zlib_compressor.hpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2025 The Silkworm Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#pragma once
5+
6+
#include <zlib.h>
7+
8+
#include <memory>
9+
#include <stdexcept>
10+
#include <string>
11+
12+
namespace silkworm::rpc::http {
13+
14+
inline constexpr int kZlibCompressionBufferSize = 65536;
15+
16+
class ZlibCompressor {
17+
public:
18+
ZlibCompressor(const ZlibCompressor&) = delete;
19+
20+
ZlibCompressor() {
21+
memset(&stream_, 0, sizeof(z_stream));
22+
stream_.zalloc = Z_NULL;
23+
stream_.zfree = Z_NULL;
24+
stream_.opaque = Z_NULL;
25+
26+
if (deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
27+
throw std::runtime_error("zlib initialization error");
28+
}
29+
}
30+
31+
~ZlibCompressor() {
32+
deflateEnd(&stream_);
33+
}
34+
35+
void compress_chunk(const std::string& clear_data, std::string& compressed_data, const bool flush) {
36+
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(clear_data.data()));
37+
stream_.avail_in = static_cast<unsigned int>(clear_data.size());
38+
size_t offset = 0;
39+
do {
40+
compressed_data.resize(kZlibCompressionBufferSize + offset);
41+
42+
stream_.next_out = reinterpret_cast<Bytef*>(compressed_data.data() + offset);
43+
stream_.avail_out = kZlibCompressionBufferSize;
44+
45+
const int ret = deflate(&stream_, flush ? Z_FINISH : Z_NO_FLUSH);
46+
if (ret == Z_STREAM_ERROR) {
47+
throw std::runtime_error("zlib compression error");
48+
}
49+
offset += kZlibCompressionBufferSize - stream_.avail_out;
50+
51+
if (flush && ret == Z_STREAM_END) {
52+
break;
53+
}
54+
55+
} while (stream_.avail_in > 0 || stream_.avail_out == 0);
56+
57+
compressed_data.resize(offset);
58+
}
59+
60+
private:
61+
z_stream stream_;
62+
};
63+
64+
} // namespace silkworm::rpc::http

0 commit comments

Comments
 (0)