Skip to content

Commit f7bf94c

Browse files
avifeneshclaude
authored andcommitted
perf: Reduce mutex contention and avoid batch clone (#5230)
* perf: Reduce mutex contention and avoid batch clone Two performance improvements: 1. Lock Optimization (glide-core cluster_async) Release mutex immediately after mem::take() instead of holding it during the entire request processing loop. This eliminates contention when multiple clients share the tokio runtime. Before: Mutex held while iterating and spawning futures After: Mutex released immediately after draining the queue 2. Clone Removal (java executeBatchAsync) Take ownership of batch instead of cloning it before the async spawn. For large batches, this avoids expensive deep clones of command data. Before: let batch_clone = batch.clone(); // Expensive for large batches After: Move batch directly into the async block Both changes are safe: - Lock optimization: mem::take atomically moves all requests out - Clone removal: batch is consumed by the async block anyway Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com> * Clean up verbose comments Signed-off-by: Ubuntu <ubuntu@ip-172-31-25-236.us-east-2.compute.internal> Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com> * fix: Use expect() for mutex lock consistency Address Copilot review comment - use .expect(MUTEX_WRITE_ERR) instead of if let Ok() for consistency with line 3079 and the rest of the codebase. Mutex poisoning should not be silently ignored. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com> * perf(java): Optimize UTF-8 string decoding Replace decode().toString() with new String(bytes, UTF_8) for simpler and more consistent decoding. Benchmarks show this is equivalent in performance while being cleaner code. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com> * fix(java): Ensure consistent byte order for direct buffer decoding Set explicit BIG_ENDIAN byte order on duplicated buffer to ensure consistent behavior across platforms. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com> --------- Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com> Signed-off-by: Ubuntu <ubuntu@ip-172-31-25-236.us-east-2.compute.internal> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Signed-off-by: Shoham Elias <shohame@amazon.com>
1 parent dec91f5 commit f7bf94c

File tree

4 files changed

+50
-49
lines changed

4 files changed

+50
-49
lines changed

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3207,27 +3207,34 @@ where
32073207
.get_cluster_param(|params| params.retry_params.clone())
32083208
.expect(MUTEX_READ_ERR);
32093209
let mut poll_flush_action = PollFlushAction::None;
3210-
let mut pending_requests_guard = self.inner.pending_requests.lock().unwrap();
3211-
if !pending_requests_guard.is_empty() {
3212-
let mut pending_requests = mem::take(&mut *pending_requests_guard);
3213-
for request in pending_requests.drain(..) {
3214-
// Drop the request if none is waiting for a response to free up resources for
3215-
// requests callers care about (load shedding). It will be ambiguous whether the
3216-
// request actually goes through regardless.
3217-
if request.sender.is_closed() {
3218-
continue;
3219-
}
3210+
let mut pending_requests = {
3211+
let mut guard = self.inner.pending_requests.lock().expect(MUTEX_WRITE_ERR);
3212+
mem::take(&mut *guard)
3213+
};
3214+
3215+
for request in pending_requests.drain(..) {
3216+
// Drop the request if none is waiting for a response to free up resources for
3217+
// requests callers care about (load shedding). It will be ambiguous whether the
3218+
// request actually goes through regardless.
3219+
if request.sender.is_closed() {
3220+
continue;
3221+
}
3222+
3223+
let future = Self::try_request(request.info.clone(), self.inner.clone()).boxed();
3224+
self.in_flight_requests.push(Box::pin(Request {
3225+
retry_params: retry_params.clone(),
3226+
request: Some(request),
3227+
future: RequestState::Future { future },
3228+
}));
3229+
}
32203230

3221-
let future = Self::try_request(request.info.clone(), self.inner.clone()).boxed();
3222-
self.in_flight_requests.push(Box::pin(Request {
3223-
retry_params: retry_params.clone(),
3224-
request: Some(request),
3225-
future: RequestState::Future { future },
3226-
}));
3231+
// Preserve capacity
3232+
{
3233+
let mut guard = self.inner.pending_requests.lock().expect(MUTEX_WRITE_ERR);
3234+
if guard.is_empty() {
3235+
*guard = pending_requests;
32273236
}
3228-
*pending_requests_guard = pending_requests;
32293237
}
3230-
drop(pending_requests_guard);
32313238

32323239
loop {
32333240
let retry_params = retry_params.clone();

java/client/src/main/java/glide/managers/CommandManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ protected <T> CompletableFuture<T> submitCommandToJni(
544544

545545
private Object normalizeDirectBuffer(ByteBuffer buffer, boolean expectUtf8Response) {
546546
ByteBuffer dup = buffer.duplicate();
547+
dup.order(ByteOrder.BIG_ENDIAN);
547548
dup.rewind();
548549
if (dup.remaining() == 0) {
549550
return expectUtf8Response ? "" : glide.api.models.GlideString.gs(new byte[0]);

java/client/src/main/java/glide/utils/BufferUtils.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,9 @@ public static String decodeUtf8(ByteBuffer buffer, int length) {
2828
if (length == 0) {
2929
return "";
3030
}
31-
32-
// Save current limit and set temporary limit for decoding
33-
int savedLimit = buffer.limit();
34-
try {
35-
buffer.limit(buffer.position() + length);
36-
// decode() automatically advances the position
37-
return StandardCharsets.UTF_8.decode(buffer).toString();
38-
} finally {
39-
// Always restore the original limit
40-
buffer.limit(savedLimit);
41-
}
31+
byte[] bytes = new byte[length];
32+
buffer.get(bytes);
33+
return new String(bytes, StandardCharsets.UTF_8);
4234
}
4335

4436
/**
@@ -49,9 +41,12 @@ public static String decodeUtf8(ByteBuffer buffer, int length) {
4941
* @throws java.nio.charset.CharacterCodingException if the bytes are not valid UTF-8
5042
*/
5143
public static String decodeUtf8(ByteBuffer buffer) {
52-
if (buffer.remaining() == 0) {
44+
int length = buffer.remaining();
45+
if (length == 0) {
5346
return "";
5447
}
55-
return StandardCharsets.UTF_8.decode(buffer).toString();
48+
byte[] bytes = new byte[length];
49+
buffer.get(bytes);
50+
return new String(bytes, StandardCharsets.UTF_8);
5651
}
5752
}

java/src/lib.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,16 +1532,18 @@ pub extern "system" fn Java_glide_internal_GlideNativeBridge_executeBatchAsync(
15321532
}
15331533
};
15341534

1535-
// Extract the batch from the command request
1536-
let batch = match &command_request.command {
1535+
// Extract optional root span pointer from the request (if provided by Java)
1536+
let root_span_ptr_opt = command_request.root_span_ptr;
1537+
let route = command_request.route.0.map(|r| *r);
1538+
1539+
// Extract the batch from the command request (take ownership to avoid clone)
1540+
let batch = match command_request.command {
15371541
Some(command_request::Command::Batch(batch)) => batch,
15381542
_ => {
15391543
log::error!("Expected batch command in request");
15401544
return Some(());
15411545
}
15421546
};
1543-
// Extract optional root span pointer from the request (if provided by Java)
1544-
let root_span_ptr_opt = command_request.root_span_ptr;
15451547

15461548
let handle_id = client_ptr as u64;
15471549
let jvm = match env.get_java_vm() {
@@ -1551,10 +1553,6 @@ pub extern "system" fn Java_glide_internal_GlideNativeBridge_executeBatchAsync(
15511553
return Some(());
15521554
}
15531555
};
1554-
1555-
// Spawn async task for batch execution using existing glide-core patterns
1556-
let batch_clone = batch.clone();
1557-
let route = command_request.route.0.map(|r| *r);
15581556
let runtime = get_runtime();
15591557
runtime.spawn(async move {
15601558
let client_result = ensure_client_for_handle(handle_id).await;
@@ -1575,13 +1573,13 @@ pub extern "system" fn Java_glide_internal_GlideNativeBridge_executeBatchAsync(
15751573
}
15761574
// Create pipeline using existing FFI approach
15771575
let mut pipeline =
1578-
redis::Pipeline::with_capacity(batch_clone.commands.len());
1579-
if batch_clone.is_atomic {
1576+
redis::Pipeline::with_capacity(batch.commands.len());
1577+
if batch.is_atomic {
15801578
pipeline.atomic();
15811579
}
15821580

15831581
// Add commands to pipeline using existing bridge logic
1584-
for cmd in &batch_clone.commands {
1582+
for cmd in &batch.commands {
15851583
match protobuf_bridge::create_valkey_command(cmd) {
15861584
Ok(valkey_cmd) => pipeline.add_command(valkey_cmd),
15871585
Err(e) => {
@@ -1605,27 +1603,27 @@ pub extern "system" fn Java_glide_internal_GlideNativeBridge_executeBatchAsync(
16051603
})?;
16061604

16071605
// Execute using existing client methods
1608-
let exec_res = if batch_clone.is_atomic {
1606+
let exec_res = if batch.is_atomic {
16091607
client
16101608
.send_transaction(
16111609
&pipeline,
16121610
routing,
1613-
batch_clone.timeout,
1614-
batch_clone.raise_on_error.unwrap_or(true),
1611+
batch.timeout,
1612+
batch.raise_on_error.unwrap_or(true),
16151613
)
16161614
.await
16171615
} else {
16181616
client
16191617
.send_pipeline(
16201618
&pipeline,
16211619
routing,
1622-
batch_clone.raise_on_error.unwrap_or(true),
1623-
batch_clone.timeout,
1620+
batch.raise_on_error.unwrap_or(true),
1621+
batch.timeout,
16241622
redis::PipelineRetryStrategy {
1625-
retry_server_error: batch_clone
1623+
retry_server_error: batch
16261624
.retry_server_error
16271625
.unwrap_or(false),
1628-
retry_connection_error: batch_clone
1626+
retry_connection_error: batch
16291627
.retry_connection_error
16301628
.unwrap_or(false),
16311629
},

0 commit comments

Comments
 (0)