Skip to content

Add support for pinging #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/signalrclient/signalr_client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ namespace signalr
SIGNALRCLIENT_API const std::shared_ptr<scheduler>& __cdecl get_scheduler() const noexcept;
SIGNALRCLIENT_API void set_handshake_timeout(std::chrono::milliseconds);
SIGNALRCLIENT_API std::chrono::milliseconds get_handshake_timeout() const noexcept;
SIGNALRCLIENT_API void set_server_timeout(std::chrono::milliseconds);
SIGNALRCLIENT_API std::chrono::milliseconds get_server_timeout() const noexcept;
SIGNALRCLIENT_API void set_keepalive_interval(std::chrono::milliseconds);
SIGNALRCLIENT_API std::chrono::milliseconds get_keepalive_interval() const noexcept;

private:
#ifdef USE_CPPRESTSDK
Expand All @@ -56,5 +60,7 @@ namespace signalr
std::map<std::string, std::string> m_http_headers;
std::shared_ptr<scheduler> m_scheduler;
std::chrono::milliseconds m_handshake_timeout;
std::chrono::milliseconds m_server_timeout;
std::chrono::milliseconds m_keepalive_interval;
};
}
132 changes: 131 additions & 1 deletion src/signalrclient/hub_connection_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ namespace signalr
callback(exception);
}, exception);
}
else
{
connection->start_keepalive();
}
};

auto handshake_request = handshake::write_handshake(connection->m_protocol);
Expand Down Expand Up @@ -348,6 +352,7 @@ namespace signalr
}
}

reset_server_timeout();
auto messages = m_protocol->parse_messages(response);

for (const auto& val : messages)
Expand Down Expand Up @@ -385,7 +390,10 @@ namespace signalr
// Sent to server only, should not be received by client
throw std::runtime_error("Received unexpected message type 'CancelInvocation'.");
case message_type::ping:
// TODO
if (m_logger.is_enabled(trace_level::debug))
{
m_logger.log(trace_level::debug, "ping message received.");
}
break;
case message_type::close:
// TODO
Expand Down Expand Up @@ -477,6 +485,8 @@ namespace signalr
}
}
});

reset_send_ping();
}
catch (const std::exception& e)
{
Expand Down Expand Up @@ -510,6 +520,126 @@ namespace signalr
m_disconnected = disconnected;
}

void hub_connection_impl::reset_send_ping()
{
auto timeMs = (std::chrono::steady_clock::now() + m_signalr_client_config.get_keepalive_interval()).time_since_epoch();
m_nextActivationSendPing.store(std::chrono::duration_cast<std::chrono::milliseconds>(timeMs).count());
}

void hub_connection_impl::reset_server_timeout()
{
auto timeMs = (std::chrono::steady_clock::now() + m_signalr_client_config.get_server_timeout()).time_since_epoch();
m_nextActivationServerTimeout.store(std::chrono::duration_cast<std::chrono::milliseconds>(timeMs).count());
}

void hub_connection_impl::start_keepalive()
{
if (m_logger.is_enabled(trace_level::debug))
{
m_logger.log(trace_level::debug, "starting keep alive timer.");
}

auto send_ping = [](std::shared_ptr<hub_connection_impl> connection)
{
if (!connection)
{
return;
}

if (connection->get_connection_state() != connection_state::connected)
{
return;
}

try
{
hub_message ping_msg(signalr::message_type::ping);
auto message = connection->m_protocol->write_message(&ping_msg);

std::weak_ptr<hub_connection_impl> weak_connection = connection;
connection->m_connection->send(
message,
connection->m_protocol->transfer_format(), [weak_connection](std::exception_ptr exception)
{
auto connection = weak_connection.lock();
if (connection)
{
if (exception)
{
if (connection->m_logger.is_enabled(trace_level::warning))
{
connection->m_logger.log(trace_level::warning, "failed to send ping!");
}
}
else
{
connection->reset_send_ping();
}
}
});
}
catch (const std::exception& e)
{
if (connection->m_logger.is_enabled(trace_level::warning))
{
connection->m_logger.log(trace_level::warning, std::string("failed to send ping: ").append(e.what()));
}
}
};

send_ping(shared_from_this());
reset_server_timeout();

std::weak_ptr<hub_connection_impl> weak_connection = shared_from_this();
timer(m_signalr_client_config.get_scheduler(),
[send_ping, weak_connection](std::chrono::milliseconds)
{
auto connection = weak_connection.lock();

if (!connection)
{
return true;
}

if (connection->get_connection_state() != connection_state::connected)
{
return true;
}

auto timeNowmSeconds =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();

if (timeNowmSeconds > connection->m_nextActivationServerTimeout.load())
{
if (connection->get_connection_state() == connection_state::connected)
{
auto error_msg = std::string("server timeout (")
.append(std::to_string(connection->m_signalr_client_config.get_server_timeout().count()))
.append(" ms) elapsed without receiving a message from the server.");
if (connection->m_logger.is_enabled(trace_level::warning))
{
connection->m_logger.log(trace_level::warning, error_msg);
}

connection->m_connection->stop([](std::exception_ptr)
{
}, std::make_exception_ptr(signalr_exception(error_msg)));
}
}

if (timeNowmSeconds > connection->m_nextActivationSendPing.load())
{
if (connection->m_logger.is_enabled(trace_level::debug))
{
connection->m_logger.log(trace_level::debug, "sending ping to server.");
}
send_ping(connection);
}

return false;
});
}

// unnamed namespace makes it invisble outside this translation unit
namespace
{
Expand Down
8 changes: 8 additions & 0 deletions src/signalrclient/hub_connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ namespace signalr
signalr_client_config m_signalr_client_config;
std::unique_ptr<hub_protocol> m_protocol;

std::atomic<int64_t> m_nextActivationServerTimeout;
std::atomic<int64_t> m_nextActivationSendPing;

std::mutex m_stop_callback_lock;
std::vector<std::function<void(std::exception_ptr)>> m_stop_callbacks;

Expand All @@ -75,5 +78,10 @@ namespace signalr
void invoke_hub_method(const std::string& method_name, const std::vector<signalr::value>& arguments, const std::string& callback_id,
std::function<void()> set_completion, std::function<void(const std::exception_ptr)> set_exception) noexcept;
bool invoke_callback(completion_message* completion);

void reset_send_ping();
void reset_server_timeout();

void start_keepalive();
};
}
32 changes: 32 additions & 0 deletions src/signalrclient/signalr_client_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace signalr

signalr_client_config::signalr_client_config()
: m_handshake_timeout(std::chrono::seconds(15))
, m_server_timeout(std::chrono::seconds(30))
, m_keepalive_interval(std::chrono::seconds(15))
{
m_scheduler = std::make_shared<signalr_default_scheduler>();
}
Expand Down Expand Up @@ -92,4 +94,34 @@ namespace signalr
{
return m_handshake_timeout;
}

void signalr_client_config::set_server_timeout(std::chrono::milliseconds timeout)
{
if (timeout <= std::chrono::seconds(0))
{
throw std::runtime_error("timeout must be greater than 0.");
}

m_server_timeout = timeout;
}

std::chrono::milliseconds signalr_client_config::get_server_timeout() const noexcept
{
return m_server_timeout;
}

void signalr_client_config::set_keepalive_interval(std::chrono::milliseconds interval)
{
if (interval <= std::chrono::seconds(0))
{
throw std::runtime_error("interval must be greater than 0.");
}

m_keepalive_interval = interval;
}

std::chrono::milliseconds signalr_client_config::get_keepalive_interval() const noexcept
{
return m_keepalive_interval;
}
}
136 changes: 134 additions & 2 deletions test/signalrclienttests/hub_connection_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1738,9 +1738,9 @@ TEST(config, can_replace_scheduler)

mre.get();

// http_client->send (negotiate), websocket_client->start, handshake timeout timer, websocket_client->send, websocket_client->send, websocket_client->stop
// http_client->send (negotiate), websocket_client->start, handshake timeout timer, websocket_client->send, websocket_client->send, keep alive timer, websocket_client->send ping, websocket_client->stop
// handshake timeout timer can trigger more than once if test takes more than 1 second
ASSERT_GE(6, scheduler->schedule_count);
ASSERT_GE(scheduler->schedule_count, 8);
}

class throw_hub_protocol : public hub_protocol
Expand Down Expand Up @@ -1814,3 +1814,135 @@ TEST(send, throws_if_protocol_fails)

ASSERT_EQ(connection_state::connected, hub_connection->get_connection_state());
}

TEST(keepalive, sends_ping_messages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have a test that a ping isn't sent if other messages are being sent from the hub_connection.

{
signalr_client_config config;
config.set_keepalive_interval(std::chrono::seconds(1));
config.set_server_timeout(std::chrono::seconds(3));
auto ping_mre = manual_reset_event<void>();
auto messages = std::make_shared<std::deque<std::string>>();
auto websocket_client = create_test_websocket_client(
/* send function */ [messages, &ping_mre](const std::string& msg, std::function<void(std::exception_ptr)> callback)
{
if (messages->size() < 3)
{
messages->push_back(msg);
}
if (messages->size() == 3)
{
ping_mre.set();
}
callback(nullptr);
},
[](const std::string&, std::function<void(std::exception_ptr)> callback) { callback(nullptr); },
[](std::function<void(std::exception_ptr)> callback) { callback(nullptr); },
false);
auto hub_connection = create_hub_connection(websocket_client);
hub_connection.set_client_config(config);

auto mre = manual_reset_event<void>();
hub_connection.start([&mre](std::exception_ptr exception)
{
mre.set(exception);
});

ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
websocket_client->receive_message("{}\x1e");

mre.get();

ping_mre.get();

ASSERT_EQ(3, messages->size());
ASSERT_EQ("{\"protocol\":\"json\",\"version\":1}\x1e", (*messages)[0]);
ASSERT_EQ("{\"type\":6}\x1e", (*messages)[1]);
ASSERT_EQ("{\"type\":6}\x1e", (*messages)[2]);
ASSERT_EQ(connection_state::connected, hub_connection.get_connection_state());
}

TEST(keepalive, server_timeout_on_no_ping_from_server)
{
signalr_client_config config;
config.set_keepalive_interval(std::chrono::seconds(1));
config.set_server_timeout(std::chrono::seconds(1));
auto websocket_client = create_test_websocket_client();
auto hub_connection = create_hub_connection(websocket_client);
hub_connection.set_client_config(config);

auto disconnected_called = false;

auto disconnect_mre = manual_reset_event<void>();
hub_connection.set_disconnected([&disconnected_called, &disconnect_mre](std::exception_ptr ex)
{
disconnect_mre.set(ex);
});

auto mre = manual_reset_event<void>();
hub_connection.start([&mre](std::exception_ptr exception)
{
mre.set(exception);
});

ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
websocket_client->receive_message("{}\x1e");

mre.get();

try
{
disconnect_mre.get();
ASSERT_TRUE(false);
}
catch (const std::exception& ex)
{
ASSERT_STREQ("server timeout (1000 ms) elapsed without receiving a message from the server.", ex.what());
}
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
}

TEST(keepalive, resets_server_timeout_timer_on_any_message_from_server)
{
signalr_client_config config;
config.set_keepalive_interval(std::chrono::seconds(1));
config.set_server_timeout(std::chrono::seconds(1));
auto websocket_client = create_test_websocket_client();
auto hub_connection = create_hub_connection(websocket_client);
hub_connection.set_client_config(config);

auto disconnect_mre = manual_reset_event<void>();
hub_connection.set_disconnected([&disconnect_mre](std::exception_ptr ex)
{
disconnect_mre.set(ex);
});

auto mre = manual_reset_event<void>();
hub_connection.start([&mre](std::exception_ptr exception)
{
mre.set(exception);
});

ASSERT_FALSE(websocket_client->receive_loop_started.wait(5000));
ASSERT_FALSE(websocket_client->handshake_sent.wait(5000));
websocket_client->receive_message("{}\x1e");

mre.get();

std::this_thread::sleep_for(config.get_server_timeout() - std::chrono::milliseconds(500));
websocket_client->receive_message("{\"type\":6}\x1e");
std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(connection_state::connected, hub_connection.get_connection_state());

try
{
disconnect_mre.get();
ASSERT_TRUE(false);
}
catch (const std::exception& ex)
{
ASSERT_STREQ("server timeout (1000 ms) elapsed without receiving a message from the server.", ex.what());
}
ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
}
Loading