Skip to content

Commit cb5ab51

Browse files
committed
Addressing PR comments
Signed-off-by: barshaul <barshaul@amazon.com>
1 parent 1b896ce commit cb5ab51

File tree

4 files changed

+45
-31
lines changed

4 files changed

+45
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* Go: Add `Client Set & Get` ([#3302](https://github.com/valkey-io/valkey-glide/pull/3302))* Go: Add `Move` ([#3369](https://github.com/valkey-io/valkey-glide/pull/3369))
3737
* Go: Add `Scan` ([#3378](https://github.com/valkey-io/valkey-glide/pull/3378))
3838
* Core/FFI/Go: Add Support for Async and Sync Client Types in FFI ([#3451](https://github.com/valkey-io/valkey-glide/pull/3451))
39+
* Core/FFI/Go: Add support for async and sync client types in FFI ([#3451](https://github.com/valkey-io/valkey-glide/pull/3451))
3940

4041
#### Breaking Changes
4142

ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"]
1717
[dev-dependencies]
1818
rstest = "^0.23"
1919
serial_test = "3"
20+
lazy_static = "1"
2021

2122
[profile.release]
2223
lto = true

ffi/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,7 @@ pub unsafe extern "C" fn free_command_result(command_result_ptr: *mut CommandRes
217217
if !command_error.command_error_message.is_null() {
218218
free_error_message(command_error.command_error_message as *mut c_char);
219219
}
220-
drop(command_error);
221220
}
222-
drop(command_result);
223221
}
224222
}
225223

@@ -359,7 +357,7 @@ impl ClientAdapter {
359357
}
360358
}
361359

362-
// Invokes the asynchronous failure callback with an error.
360+
/// Invokes the asynchronous failure callback with an error.
363361
///
364362
/// This function is used in async client flows to report command execution failures
365363
/// back to the calling language (e.g., Go) via a registered failure callback.
@@ -795,7 +793,7 @@ pub unsafe extern "C" fn command(
795793

796794
/// Creates a heap-allocated `CommandResult` containing a `CommandError`.
797795
///
798-
/// This function is used to construct an error response when a Redis command fails,
796+
/// This function is used to construct an error response when a Valkey command fails,
799797
/// intended to be returned through FFI to the calling language.
800798
///
801799
/// The resulting `CommandResult` contains:
@@ -841,7 +839,7 @@ fn create_error_result(err: RedisError) -> *mut CommandResult {
841839
/// This function will panic if the error message cannot be converted into a `CString`.
842840
///
843841
/// # Safety
844-
/// The returned C string must be freed using [`free_error_message`] after it is no longer needed.
842+
/// The returned C string must be freed using [`free_error_message`].
845843
fn to_c_error(err: RedisError) -> (*const c_char, RequestErrorType) {
846844
let message = errors::error_message(&err);
847845
let error_type = errors::error_type(&err);

ffi/tests/ffi_client_tests.rs

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use glide_core::connection_request::{ConnectionRequest, NodeAddress, TlsMode};
22
use glide_core::errors::RequestErrorType;
33
use glide_core::request_type::RequestType;
44
use glide_ffi::*;
5+
use lazy_static::lazy_static;
56
use protobuf::Message;
67
use rstest::rstest;
78
use std::collections::HashMap;
@@ -10,26 +11,35 @@ use std::net::TcpListener;
1011
use std::process::{Child, Command};
1112
use std::sync::{
1213
atomic::{AtomicUsize, Ordering},
13-
Arc,
14+
Arc, RwLock,
1415
};
15-
use std::sync::{LazyLock, Mutex};
1616
use tokio::runtime::Runtime;
1717
use tokio::time::{sleep, Duration};
1818

19-
static ASYNC_SUCCESS_COUNTER: LazyLock<Arc<AtomicUsize>> =
20-
LazyLock::new(|| Arc::new(AtomicUsize::new(0)));
21-
static ASYNC_FAILURE_COUNTER: LazyLock<Arc<AtomicUsize>> =
22-
LazyLock::new(|| Arc::new(AtomicUsize::new(0)));
23-
type StringResultMap = LazyLock<Mutex<HashMap<usize, Result<String, (String, RequestErrorType)>>>>;
24-
static ASYNC_RESULTS_MAP: StringResultMap = LazyLock::new(|| Mutex::new(HashMap::new()));
19+
pub(crate) struct AsyncMetrics {
20+
pub success_count: AtomicUsize,
21+
pub failure_count: AtomicUsize,
22+
pub results: HashMap<usize, Result<String, (String, RequestErrorType)>>,
23+
}
24+
25+
lazy_static! {
26+
static ref ASYNC_METRICS: Arc<RwLock<AsyncMetrics>> = Arc::new(RwLock::new(AsyncMetrics {
27+
success_count: AtomicUsize::new(0),
28+
failure_count: AtomicUsize::new(0),
29+
results: HashMap::new(),
30+
}));
31+
}
32+
33+
const ASYNC_WRITE_LOCK_ERR: &str = "Failed to aquire ASYNC_METRICS the write lock";
34+
const ASYNC_READ_LOCK_ERR: &str = "Failed to aquire ASYNC_METRICS the write lock";
2535

2636
/// Success callback function for String responses for the async client
2737
extern "C" fn string_success_callback(index: usize, response_ptr: *const CommandResponse) {
28-
let mut map = ASYNC_RESULTS_MAP
29-
.lock()
30-
.expect("Failed to aquire the results' lock");
31-
map.insert(index, Ok(parse_string_res(response_ptr)));
32-
ASYNC_SUCCESS_COUNTER.fetch_add(1, Ordering::SeqCst);
38+
let mut metrics = ASYNC_METRICS.write().expect(ASYNC_WRITE_LOCK_ERR);
39+
metrics
40+
.results
41+
.insert(index, Ok(parse_string_res(response_ptr)));
42+
metrics.success_count.fetch_add(1, Ordering::SeqCst);
3343
}
3444

3545
/// Failure callback function for the async client
@@ -38,11 +48,11 @@ extern "C" fn failure_callback(
3848
err_msg_ptr: *const c_char,
3949
error_type: RequestErrorType,
4050
) {
41-
let mut map = ASYNC_RESULTS_MAP
42-
.lock()
43-
.expect("Failed to aquire the results' lock");
44-
map.insert(index, Err((parse_error_msg(err_msg_ptr), error_type)));
45-
ASYNC_FAILURE_COUNTER.fetch_add(1, Ordering::SeqCst);
51+
let mut metrics = ASYNC_METRICS.write().expect(ASYNC_WRITE_LOCK_ERR);
52+
metrics
53+
.results
54+
.insert(index, Err((parse_error_msg(err_msg_ptr), error_type)));
55+
metrics.failure_count.fetch_add(1, Ordering::SeqCst);
4656
}
4757

4858
fn parse_string_res(response_ptr: *const CommandResponse) -> String {
@@ -169,7 +179,8 @@ fn execute_command(
169179
)
170180
};
171181
if command_res_ptr.is_null() {
172-
// Async client is being used, let the async callback to be called
182+
// If the returned CommandResult pointer is a null it means that the Async client is being used.
183+
// We shall let the async callback to be called.
173184
let rt = Runtime::new().unwrap();
174185
rt.block_on(async {
175186
sleep(Duration::from_millis(1)).await;
@@ -186,8 +197,9 @@ fn get_sync_response(cmd_resp: *mut CommandResponse) -> String {
186197
}
187198

188199
fn get_async_response(index: usize) -> String {
189-
let map = ASYNC_RESULTS_MAP.lock().unwrap();
190-
let result = map
200+
let metrics = ASYNC_METRICS.read().expect(ASYNC_READ_LOCK_ERR);
201+
let result = metrics
202+
.results
191203
.get(&index)
192204
.expect("Couldn't find the relevant idx in the map");
193205
assert!(result.is_ok());
@@ -204,8 +216,9 @@ fn get_sync_error(command_error: *mut CommandError) -> (String, RequestErrorType
204216
}
205217

206218
fn get_async_error(index: usize) -> (String, RequestErrorType) {
207-
let map = ASYNC_RESULTS_MAP.lock().unwrap();
208-
let result = map
219+
let metrics = ASYNC_METRICS.read().expect(ASYNC_READ_LOCK_ERR);
220+
let result = metrics
221+
.results
209222
.get(&index)
210223
.expect("Couldn't find the relevant idx in the map");
211224
assert!(result.is_err());
@@ -214,7 +227,6 @@ fn get_async_error(index: usize) -> (String, RequestErrorType) {
214227
}
215228

216229
#[rstest]
217-
#[serial_test::serial]
218230
fn test_ffi_client_command_execution(#[values(false, true)] async_client: bool) {
219231
let server = Server::new();
220232
let connection_request_bytes = create_connection_request(server.port);
@@ -252,7 +264,8 @@ fn test_ffi_client_command_execution(#[values(false, true)] async_client: bool)
252264
);
253265
let ping_res = if async_client {
254266
assert!(good_res.is_none()); // result should be returned through callback
255-
assert_eq!(ASYNC_SUCCESS_COUNTER.load(Ordering::SeqCst), 1);
267+
let metrics = ASYNC_METRICS.read().expect(ASYNC_READ_LOCK_ERR);
268+
assert_eq!(metrics.success_count.load(Ordering::SeqCst), 1);
256269
get_async_response(good_cmd_idx)
257270
} else {
258271
assert!(good_res.is_some());
@@ -270,7 +283,8 @@ fn test_ffi_client_command_execution(#[values(false, true)] async_client: bool)
270283
);
271284
let (err_msg, err_type) = if async_client {
272285
assert!(bad_res.is_none()); // result should be returned through callback
273-
assert_eq!(ASYNC_FAILURE_COUNTER.load(Ordering::SeqCst), 1);
286+
let metrics = ASYNC_METRICS.read().expect(ASYNC_READ_LOCK_ERR);
287+
assert_eq!(metrics.failure_count.load(Ordering::SeqCst), 1);
274288
get_async_error(bad_cmd_idx)
275289
} else {
276290
assert!(bad_res.is_some());

0 commit comments

Comments
 (0)