Skip to content

Commit 878ab39

Browse files
Yong Tanmeta-codesync[bot]
authored andcommitted
Enable CPU compression offload for EB mode in Thrift server
Summary: D97565705 moved Thrift server response compression from IO threads to CPU threads, but only when executor_ is non-null. In EB mode, executor_ is null (the defaultSync resource pool has no executor), so compression still runs on the IO thread — defeating the purpose. This diff fixes the EB mode gap by computing the compression executor as a local variable in sendReply() with a fallback chain: executor_ (TM mode) → server's handler executor via the context chain (EB mode) → folly::getGlobalCPUExecutor() (safety net). The dispatch methods now accept the executor as a parameter instead of reading executor_ directly. ## Key properties: - No new members on any object, no new virtual methods - executor_ is never mutated — EB method semantics are unchanged - The fallback chain (~4ns) only runs when: flag is on, payload exceeds the compression threshold, and we're on the IO thread in EB mode — negligible compared to the compression work it enables (us–ms) - Gated by the existing thrift_server_compress_response_on_cpu flag ## Benchmark Results: CPU Compression Offload (Echo32k_semi_random_eb) | Metric | Baseline | Server Offload | Change | |---|---|---|---| | Average QPS | 27,824 | 50,614 | +82% | | p50 Latency (ms) | 5.352 | 3.153 | -41% | | p99 Latency (ms) | 10.275 | 4.196 | -59% | | p100 Latency (ms) | 26.030 | 11.664 | -55% | | Server CPU Utilization | 2.27 | 4.89 | +115% | | Client CPU Utilization | 2.37 | 4.04 | +71% | ### Summary Offloading compression to CPU threads delivers an **82% QPS improvement** and cuts **p99 latency by 59%** for semi-compressible 32KB payloads on EB-thread handlers. The trade-off is higher CPU utilization (+115% server-side), which is expected — the IO threads are no longer blocked by compression and can accept requests faster, driving more total throughput. The CPU threads absorb the compression work in parallel, converting idle CPU capacity into lower latency and higher throughput. ### Limitations - **IO thread saturation required.** The feature only helps when IO threads are the bottleneck. If IO threads have spare capacity, inline compression is fast enough and the dispatch overhead provides no benefit. - **Thread-hop cost.** Each dispatched response pays a fixed overhead for executor queue insertion, CPU thread dequeue, reply queue notification (eventfd syscall), and IO thread wakeup. This fixed cost is independent of payload size, so it becomes proportionally less significant for larger payloads. - **Minimum payload size.** Payloads must be large enough that compression time significantly exceeds the thread-hop overhead. A minimum threshold of 1KB (`thrift_server_min_cpu_compression_payload_size`) is enforced to prevent small responses (e.g., pings) from being dispatched at a net loss. - **Data compressibility matters.** The feature benefits semi-compressible data (structured Thrift responses, JSON-like content) where compression is both CPU-expensive and effective at reducing wire size. Trivially compressible data (repeated bytes) compresses too fast to justify the hop. Incompressible data (random bytes) gains nothing from compression and bottlenecks on network IO instead. ### Two-threshold interaction There are two independent size thresholds that gate compression behavior. They serve different purposes and do not conflict: - `compressionSizeLimit` (existing, per-connection) — configured via the client's compression config (compressionConfig_.compressionSizeLimit()). Controls whether compression happens at all. Payloads at or below this limit skip compression entirely (no algorithm is selected). This threshold is unchanged by this diff. - `thrift_server_min_cpu_compression_payload_size` (new, global flag, default 1024) — controls where compression runs (CPU thread vs inline on IO thread). Payloads below this threshold still get compressed, but inline on the current thread rather than being dispatched to a CPU executor. This avoids the thread-hop overhead for small payloads where inline compression is cheaper than the dispatch cost. Evaluation order in `shouldDispatchCompressionToCpu(payloadSize)`: - If payloadSize < `thrift_server_min_cpu_compression_payload_size` → compress inline (no dispatch) - If `getEligibleCompressionAlgorithm(payloadSize)` returns nullopt (no algorithm, or payload ≤ `compressionSizeLimit`) → no compression at all - Otherwise → dispatch compression to CPU thread This does not change existing behavior. Both thresholds are only evaluated when `thrift_server_compress_response_on_cpu` is true (default false). Services that have not opted in see zero behavior change. For services that have opted in, the new minimum size threshold adds a small-payload bypass that wasn't previously needed (because the prior code only dispatched when executor_ was non-null, which excluded EB mode entirely). Reviewed By: robertroeser Differential Revision: D100902596 fbshipit-source-id: 583199eb8d05d14af0a5119a7cf72a17736b91c3
1 parent fdfa45a commit 878ab39

5 files changed

Lines changed: 86 additions & 25 deletions

File tree

thrift/lib/cpp2/async/processor/HandlerCallbackBase.cpp

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <folly/ExceptionWrapper.h>
1818
#include <folly/Executor.h>
19+
#include <folly/executors/GlobalExecutor.h>
1920
#include <folly/stop_watch.h>
2021
#include <thrift/lib/cpp/TApplicationException.h>
2122
#include <thrift/lib/cpp/concurrency/ThreadManager.h>
@@ -149,6 +150,26 @@ void HandlerCallbackBase::doExceptionWrapped(folly::exception_wrapper ew) {
149150
}
150151
}
151152

153+
folly::Executor::KeepAlive<>
154+
HandlerCallbackBase::getCompressionExecutorFallback() {
155+
// Walk the context chain to the server's handler executor (defaultAsync pool
156+
// in resource-pools mode, or the ThreadManager in legacy mode). Fall back to
157+
// the global CPU executor as a safety net — it is always non-null.
158+
if (reqCtx_) {
159+
if (auto* connCtx = reqCtx_->getConnectionContext()) {
160+
if (auto* workerCtx = connCtx->getWorkerContext()) {
161+
if (auto* serverCtx = workerCtx->getServerContext()) {
162+
auto exec = serverCtx->getHandlerExecutorKeepAlive();
163+
if (exec) {
164+
return exec;
165+
}
166+
}
167+
}
168+
}
169+
}
170+
return folly::getGlobalCPUExecutor();
171+
}
172+
152173
void HandlerCallbackBase::sendReply(SerializedResponse response) {
153174
this->ctx_.reset();
154175

@@ -161,9 +182,14 @@ void HandlerCallbackBase::sendReply(SerializedResponse response) {
161182
auto payloadSize =
162183
response.buffer ? response.buffer->computeChainDataLength() : 0;
163184
if (req_->shouldDispatchCompressionToCpu(payloadSize) && getEventBase() &&
164-
getEventBase()->inRunningEventBaseThread() && executor_) {
165-
dispatchReplyToCpuThread(std::move(response), payloadSize);
166-
return;
185+
getEventBase()->inRunningEventBaseThread()) {
186+
auto compressionExec =
187+
executor_ ? executor_ : getCompressionExecutorFallback();
188+
if (compressionExec) {
189+
dispatchReplyToCpuThread(
190+
std::move(response), payloadSize, compressionExec);
191+
return;
192+
}
167193
}
168194
preCompressed = req_->compressResponse(response, reqCtx_, payloadSize);
169195
}
@@ -194,7 +220,9 @@ void HandlerCallbackBase::sendReply(SerializedResponse response) {
194220
}
195221

196222
void HandlerCallbackBase::dispatchReplyToCpuThread(
197-
SerializedResponse response, size_t payloadSize) {
223+
SerializedResponse response,
224+
size_t payloadSize,
225+
const folly::Executor::KeepAlive<>& compressionExecutor) {
198226
// Capture all state needed on the CPU thread. After this method returns,
199227
// HandlerCallbackBase may be destroyed (req_ is moved out, so the
200228
// destructor's cleanup path will skip the active-request error).
@@ -204,13 +232,13 @@ void HandlerCallbackBase::dispatchReplyToCpuThread(
204232
auto writeTransforms = reqCtx->getHeader()->getWriteTTransforms();
205233
auto* replyQueue = &getReplyQueue();
206234

207-
executor_->add([req = std::move(req),
208-
reqCtx,
209-
protoSeqId,
210-
writeTransforms = std::move(writeTransforms),
211-
replyQueue,
212-
payloadSize,
213-
response = std::move(response)]() mutable {
235+
compressionExecutor->add([req = std::move(req),
236+
reqCtx,
237+
protoSeqId,
238+
writeTransforms = std::move(writeTransforms),
239+
replyQueue,
240+
payloadSize,
241+
response = std::move(response)]() mutable {
214242
// On CPU thread: attempt compression.
215243
bool preCompressed = req->compressResponse(response, reqCtx, payloadSize);
216244

@@ -252,9 +280,14 @@ void HandlerCallbackBase::sendReply(
252280
? responseAndStream.response.buffer->computeChainDataLength()
253281
: 0;
254282
if (req_->shouldDispatchCompressionToCpu(payloadSize) && getEventBase() &&
255-
getEventBase()->inRunningEventBaseThread() && executor_) {
256-
dispatchStreamReplyToCpuThread(std::move(responseAndStream), payloadSize);
257-
return;
283+
getEventBase()->inRunningEventBaseThread()) {
284+
auto compressionExec =
285+
executor_ ? executor_ : getCompressionExecutorFallback();
286+
if (compressionExec) {
287+
dispatchStreamReplyToCpuThread(
288+
std::move(responseAndStream), payloadSize, compressionExec);
289+
return;
290+
}
258291
}
259292
preCompressed = req_->compressResponse(
260293
responseAndStream.response, reqCtx_, payloadSize);
@@ -325,7 +358,9 @@ void HandlerCallbackBase::setupStreamFactory(
325358
}
326359

327360
void HandlerCallbackBase::dispatchStreamReplyToCpuThread(
328-
ResponseAndServerStreamFactory&& responseAndStream, size_t payloadSize) {
361+
ResponseAndServerStreamFactory&& responseAndStream,
362+
size_t payloadSize,
363+
const folly::Executor::KeepAlive<>& compressionExecutor) {
329364
// Capture all state needed on the CPU thread. After this method returns,
330365
// HandlerCallbackBase may be destroyed (req_ is moved out).
331366
auto req = std::move(req_);
@@ -338,13 +373,14 @@ void HandlerCallbackBase::dispatchStreamReplyToCpuThread(
338373
auto& stream = responseAndStream.stream;
339374
setupStreamFactory(stream);
340375

341-
executor_->add([req = std::move(req),
342-
reqCtx,
343-
protoSeqId,
344-
writeTransforms = std::move(writeTransforms),
345-
replyQueue,
346-
payloadSize,
347-
responseAndStream = std::move(responseAndStream)]() mutable {
376+
compressionExecutor->add([req = std::move(req),
377+
reqCtx,
378+
protoSeqId,
379+
writeTransforms = std::move(writeTransforms),
380+
replyQueue,
381+
payloadSize,
382+
responseAndStream =
383+
std::move(responseAndStream)]() mutable {
348384
// On CPU thread: attempt compression.
349385
bool preCompressed =
350386
req->compressResponse(responseAndStream.response, reqCtx, payloadSize);

thrift/lib/cpp2/async/processor/HandlerCallbackBase.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -380,13 +380,22 @@ class HandlerCallbackBase {
380380
void sendReply(ResponseAndServerStreamFactory&& responseAndStream);
381381

382382
private:
383-
// Dispatches compression + reply to the CPU executor when sendReply is
383+
// Returns an executor for compression offload when executor_ is null (EB
384+
// mode). Walks the context chain to the server's handler executor, falling
385+
// back to folly::getGlobalCPUExecutor() as a safety net.
386+
folly::Executor::KeepAlive<> getCompressionExecutorFallback();
387+
388+
// Dispatches compression + reply to the given CPU executor when sendReply is
384389
// called on the IO thread. Moves all needed state into the lambda so
385390
// HandlerCallbackBase can be destroyed after this returns.
386391
void dispatchReplyToCpuThread(
387-
SerializedResponse response, size_t payloadSize);
392+
SerializedResponse response,
393+
size_t payloadSize,
394+
const folly::Executor::KeepAlive<>& compressionExecutor);
388395
void dispatchStreamReplyToCpuThread(
389-
ResponseAndServerStreamFactory&& responseAndStream, size_t payloadSize);
396+
ResponseAndServerStreamFactory&& responseAndStream,
397+
size_t payloadSize,
398+
const folly::Executor::KeepAlive<>& compressionExecutor);
390399

391400
// Sets up stream factory with interaction, context stack, method name, and
392401
// interceptor context. Shared by sendReply and

thrift/lib/cpp2/server/ServerFlags.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ THRIFT_FLAG_DEFINE_bool(
3939

4040
THRIFT_FLAG_DEFINE_bool(thrift_server_compress_response_on_cpu, false);
4141

42+
THRIFT_FLAG_DEFINE_int64(thrift_server_min_cpu_compression_payload_size, 1024);
43+
4244
FOLLY_GFLAGS_DEFINE_bool(
4345
thrift_use_token_bucket_concurrency_controller,
4446
false,

thrift/lib/cpp2/server/ServerFlags.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ THRIFT_FLAG_DECLARE_bool(allow_resource_pools_set_thread_manager_from_executor);
3232

3333
THRIFT_FLAG_DECLARE_bool(thrift_server_compress_response_on_cpu);
3434

35+
// This flag does not control whether compression happens — that is solely
36+
// determined by compressionSizeLimit. It only controls where compression runs:
37+
// payloads below this threshold are compressed inline on the IO thread
38+
// (skipping the thread-hop overhead), while larger payloads are dispatched to a
39+
// CPU thread. Only effective when thrift_server_compress_response_on_cpu is
40+
// enabled.
41+
THRIFT_FLAG_DECLARE_int64(thrift_server_min_cpu_compression_payload_size);
42+
3543
// Use TokenBucketConcurrencyController as a standard concurrency controller in
3644
// ThriftServer
3745
FOLLY_GFLAGS_DECLARE_bool(thrift_use_token_bucket_concurrency_controller);

thrift/lib/cpp2/transport/core/ThriftRequest.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
THRIFT_FLAG_DEFINE_int64(queue_time_logging_threshold_ms, 5);
2929
THRIFT_FLAG_DEFINE_bool(enable_request_event_logging, true);
3030
THRIFT_FLAG_DECLARE_bool(thrift_server_compress_response_on_cpu);
31+
THRIFT_FLAG_DECLARE_int64(thrift_server_min_cpu_compression_payload_size);
3132

3233
namespace apache::thrift {
3334

@@ -521,6 +522,11 @@ ThriftRequestCore::getEligibleCompressionAlgorithm(size_t payloadSize) const {
521522

522523
bool ThriftRequestCore::shouldDispatchCompressionToCpu(
523524
size_t payloadSize) const {
525+
auto minSize = static_cast<size_t>(
526+
THRIFT_FLAG(thrift_server_min_cpu_compression_payload_size));
527+
if (payloadSize < minSize) {
528+
return false;
529+
}
524530
return getEligibleCompressionAlgorithm(payloadSize).has_value();
525531
}
526532

0 commit comments

Comments
 (0)