diff --git a/Makefile b/Makefile index 8436b0b1..302c8e23 100644 --- a/Makefile +++ b/Makefile @@ -28,12 +28,16 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ :MetricsTests.Integration_Cassandra_Requests\ :MetricsTests.Integration_Cassandra_StatsShardConnections\ +:ExecutionProfileTest.Integration_Cassandra_InvalidName\ +:ExecutionProfileTest.Integration_Cassandra_RequestTimeout\ +:ExecutionProfileTest.Integration_Cassandra_Consistency\ +:ExecutionProfileTest.Integration_Cassandra_SerialConsistency\ +:ExecutionProfileTest.Integration_Cassandra_LatencyAwareRouting\ :-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\ :HeartbeatTests.Integration_Cassandra_HeartbeatFailed\ :ControlConnectionTests.Integration_Cassandra_TopologyChange\ :ControlConnectionTests.Integration_Cassandra_FullOutage\ :ControlConnectionTests.Integration_Cassandra_TerminatedUsingMultipleIoThreadsWithError\ -:ExecutionProfileTest.InvalidName\ :*NoCompactEnabledConnection\ :PreparedMetadataTests.Integration_Cassandra_AlterProperlyUpdatesColumnCount\ :UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace) @@ -70,6 +74,11 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\ :MetricsTests.Integration_Cassandra_Requests\ :MetricsTests.Integration_Cassandra_StatsShardConnections\ +:ExecutionProfileTest.Integration_Cassandra_InvalidName\ +:ExecutionProfileTest.Integration_Cassandra_RequestTimeout\ +:ExecutionProfileTest.Integration_Cassandra_Consistency\ +:ExecutionProfileTest.Integration_Cassandra_SerialConsistency\ +:ExecutionProfileTest.Integration_Cassandra_LatencyAwareRouting\ :-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\ :PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\ :HeartbeatTests.Integration_Cassandra_HeartbeatFailed\ @@ -77,7 +86,6 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\ :ControlConnectionTests.Integration_Cassandra_FullOutage\ :ControlConnectionTests.Integration_Cassandra_TerminatedUsingMultipleIoThreadsWithError\ :SslTests.Integration_Cassandra_ReconnectAfterClusterCrashAndRestart\ -:ExecutionProfileTest.InvalidName\ :*NoCompactEnabledConnection\ :PreparedMetadataTests.Integration_Cassandra_AlterProperlyUpdatesColumnCount\ :UseKeyspaceCaseSensitiveTests.Integration_Cassandra_ConnectWithKeyspace) diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/integration_testing.rs index 96157df2..bf372eaa 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/integration_testing.rs @@ -8,6 +8,7 @@ use scylla::observability::history::{AttemptId, HistoryListener, RequestId, Spec use scylla::policies::retry::RetryDecision; use crate::argconv::{BoxFFI, CMut, CassBorrowedExclusivePtr}; +use crate::batch::CassBatch; use crate::cluster::CassCluster; use crate::statement::{BoundStatement, CassStatement}; use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t}; @@ -116,3 +117,18 @@ pub unsafe extern "C" fn testing_statement_set_sleeping_history_listener( .set_history_listener(history_listener), } } + +#[unsafe(no_mangle)] +pub unsafe extern "C" fn testing_batch_set_sleeping_history_listener( + batch_raw: CassBorrowedExclusivePtr, + sleep_time_ms: cass_uint64_t, +) { + let sleep_time = Duration::from_millis(sleep_time_ms); + let history_listener = Arc::new(SleepingHistoryListener(sleep_time)); + + let batch = BoxFFI::as_mut_ref(batch_raw).unwrap(); + + Arc::make_mut(&mut batch.state) + .batch + .set_history_listener(history_listener) +} diff --git a/src/testing.cpp b/src/testing.cpp index b120e49a..7e397901 100644 --- a/src/testing.cpp +++ b/src/testing.cpp @@ -102,4 +102,8 @@ void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64 testing_statement_set_sleeping_history_listener(statement, sleep_time_ms); } +void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_time_ms) { + testing_batch_set_sleeping_history_listener(batch, sleep_time_ms); +} + }}} // namespace datastax::internal::testing diff --git a/src/testing.hpp b/src/testing.hpp index 197c7228..75bb9629 100644 --- a/src/testing.hpp +++ b/src/testing.hpp @@ -52,6 +52,8 @@ CASS_EXPORT void set_record_attempted_hosts(CassStatement* statement, bool enabl CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms); +CASS_EXPORT void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_time_ms); + }}} // namespace datastax::internal::testing #endif diff --git a/src/testing_rust_impls.h b/src/testing_rust_impls.h index cda25cff..e6f07800 100644 --- a/src/testing_rust_impls.h +++ b/src/testing_rust_impls.h @@ -26,6 +26,11 @@ CASS_EXPORT void testing_free_contact_points(char* contact_points); // This can be used to enforce a sleep time during statement execution, which increases the latency. CASS_EXPORT void testing_statement_set_sleeping_history_listener(CassStatement *statement, cass_uint64_t sleep_time_ms); + +// Sets a sleeping history listener on the batch. +// This can be used to enforce a sleep time during batch execution, which increases the latency. +CASS_EXPORT void testing_batch_set_sleeping_history_listener(CassBatch *batch, + cass_uint64_t sleep_time_ms); } #endif diff --git a/tests/src/integration/objects/statement.hpp b/tests/src/integration/objects/statement.hpp index 7080effe..1940333c 100644 --- a/tests/src/integration/objects/statement.hpp +++ b/tests/src/integration/objects/statement.hpp @@ -365,6 +365,16 @@ class Batch : public Object { ASSERT_EQ(CASS_OK, cass_batch_set_execution_profile(get(), name.c_str())); } + /** + * Set a sleeping history listener on the batch. + * This can be used to enforce a sleep time during batch execution, which increases the latency. + * + * @param sleep_time_ms Sleep time in milliseconds + */ + void set_sleep_time(uint64_t sleep_time_ms) { + datastax::internal::testing::set_sleeping_history_listener_on_batch(get(), sleep_time_ms); + } + /** * Enable/Disable whether the statements in a batch are idempotent. Idempotent * batches are able to be automatically retried after timeouts/errors and can diff --git a/tests/src/integration/tests/test_exec_profile.cpp b/tests/src/integration/tests/test_exec_profile.cpp index fd14ebec..989903b8 100644 --- a/tests/src/integration/tests/test_exec_profile.cpp +++ b/tests/src/integration/tests/test_exec_profile.cpp @@ -42,9 +42,9 @@ class ExecutionProfileTest : public Integration { // Create the execution profiles for the test cases if (!skip_base_execution_profile_) { profiles_["request_timeout"] = ExecutionProfile::build().with_request_timeout(1); - // Setting bad consistency type for exec profile is forbidden in cpp-rust-driver - // profiles_["consistency"] = - // ExecutionProfile::build().with_consistency(CASS_CONSISTENCY_SERIAL); + profiles_["consistency"] = + ExecutionProfile::build().with_consistency(CASS_CONSISTENCY_SERIAL); + // Setting bad serial-consistency type for exec profile is forbidden in cpp-rust-driver // profiles_["serial_consistency"] = // ExecutionProfile::build().with_serial_consistency(CASS_CONSISTENCY_ONE); profiles_["round_robin"] = @@ -274,11 +274,13 @@ CASSANDRA_INTEGRATION_TEST_F(ExecutionProfileTest, RequestTimeout) { Batch batch; batch.add(statement); batch.set_execution_profile("request_timeout"); + batch.set_sleep_time(2); // Simulate >=2ms latency result = session_.execute(batch, false); ASSERT_EQ(CASS_ERROR_LIB_REQUEST_TIMED_OUT, result.error_code()); // Execute a simple query with assigned profile (should timeout) statement.set_execution_profile("request_timeout"); + statement.set_sleep_time(2); // Simulate >=2ms latency result = session_.execute(statement, false); ASSERT_EQ(CASS_ERROR_LIB_REQUEST_TIMED_OUT, result.error_code()); } @@ -315,7 +317,7 @@ CASSANDRA_INTEGRATION_TEST_F(ExecutionProfileTest, Consistency) { cass_version = static_cast(cass_version).get_cass_version(); } std::string expected_message = "SERIAL is not supported as conditional update commit consistency"; - if (cass_version >= "4.0.0") { + if (!Options::is_scylla() && cass_version >= "4.0.0") { expected_message = "You must use conditional updates for serializable writes"; } ASSERT_TRUE(contains(result.error_message(), expected_message)); @@ -329,38 +331,48 @@ CASSANDRA_INTEGRATION_TEST_F(ExecutionProfileTest, Consistency) { } /** - * Utilize the execution profile to override statement serial consistency - * - * This test will perform an insert query using the execution profile - * 'serial_consistency'; overriding the default setting. The execution profile - * should fail due to an invalid serial consistency applied to profile. + * Attempt to utilize an invalid serial consistency level on a statement. * * @jira_ticket CPP-492 * @test_category execution_profiles * @since DSE 1.4.0 - * @expected_result Execution profile will fail (invalid serial consistency) + * @expected_result Setting bad consistency level as serial consistency fails */ CASSANDRA_INTEGRATION_TEST_F(ExecutionProfileTest, SerialConsistency) { SKIP_IF_CASSANDRA_VERSION_LT(2.0.0); CHECK_FAILURE; - // Execute a batched query with assigned profile (should fail - Batch batch; - batch.add(insert_); - batch.set_execution_profile("serial_consistency"); - Result result = session_.execute(batch, false); - ASSERT_EQ(CASS_ERROR_SERVER_INVALID_QUERY, result.error_code()); - ASSERT_TRUE(contains( - result.error_message(), - "Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL")); - - // Execute a simple query with assigned profile (should fail) - insert_.set_execution_profile("serial_consistency"); - result = session_.execute(insert_, false); - ASSERT_EQ(CASS_ERROR_SERVER_INVALID_QUERY, result.error_code()); - ASSERT_TRUE(contains( - result.error_message(), - "Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL")); + // Original test case expected us to set consistency ONE in place of serial consistency. + // The statement would then be executed, resulting in server error. + // This is something that we could technically do, but rust-driver disallows it on a type level. + // In result, we do not allow this in cpp-rust-driver as well. + // We can at least check that setting the bad consistency level for serial consistency + // fails on client side. + ExecutionProfile builder = ExecutionProfile::build(); + ASSERT_EQ(CASS_ERROR_LIB_BAD_PARAMS, cass_execution_profile_set_serial_consistency( + builder.get(), CASS_CONSISTENCY_ONE)); + + // --- ORIGINAL TEST CASE BEGIN --- + + // // Execute a batched query with assigned profile (should fail + // Batch batch; + // batch.add(insert_); + // batch.set_execution_profile("serial_consistency"); + // Result result = session_.execute(batch, false); + // ASSERT_EQ(CASS_ERROR_SERVER_INVALID_QUERY, result.error_code()); + // ASSERT_TRUE(contains( + // result.error_message(), + // "Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL")); + + // // Execute a simple query with assigned profile (should fail) + // insert_.set_execution_profile("serial_consistency"); + // result = session_.execute(insert_, false); + // ASSERT_EQ(CASS_ERROR_SERVER_INVALID_QUERY, result.error_code()); + // ASSERT_TRUE(contains( + // result.error_message(), + // "Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL")); + + // --- ORIGINAL TEST CASE BEGIN --- } /** @@ -432,7 +444,7 @@ CASSANDRA_INTEGRATION_TEST_F(ExecutionProfileTest, LatencyAwareRouting) { CHECK_FAILURE; // Execute batch with the assigned profile and add criteria for the logger - logger_.add_critera("Calculated new minimum"); + logger_.add_critera("Latency awareness: updated min average latency to"); for (int i = 0; i < 1000; ++i) { Batch batch; batch.add(insert_);