Skip to content

Add support for pinging #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Below are instructions to build on different OS's. You can also use the followin
```powershell
PS> git submodule update --init
PS> .\submodules\vcpkg\bootstrap-vcpkg.bat
PS> .\submodules\vcpkg\vcpkg.exe install cpprestsdk:x64-windows
PS> .\submodules\vcpkg\vcpkg.exe install cpprestsdk:x64-windows msgpack:x64-windows
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, if you want you can add this to the -DUSE_MSGPACK section in the table above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Visual Studio shows errors regarding msg-pack then it try to parse cmakelists on folder-project opening. Maybe it is not an issue.

PS> mkdir build.release
PS> cd build.release
PS> cmake .. -A x64 -DCMAKE_TOOLCHAIN_FILE="..\submodules\vcpkg\scripts\buildsystems\vcpkg.cmake" -DCMAKE_BUILD_TYPE=Release -DUSE_CPPRESTSDK=true
Expand Down
6 changes: 6 additions & 0 deletions include/signalrclient/signalr_client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ namespace signalr
SIGNALRCLIENT_API const std::shared_ptr<scheduler>& __cdecl get_scheduler() const noexcept;
SIGNALRCLIENT_API void set_handshake_timeout(std::chrono::milliseconds);
SIGNALRCLIENT_API std::chrono::milliseconds get_handshake_timeout() const noexcept;
SIGNALRCLIENT_API void set_server_timeout(std::chrono::milliseconds);
SIGNALRCLIENT_API std::chrono::milliseconds get_server_timeout() const noexcept;
SIGNALRCLIENT_API void set_keepalive_interval(std::chrono::milliseconds);
SIGNALRCLIENT_API std::chrono::milliseconds get_keepalive_interval() const noexcept;

private:
#ifdef USE_CPPRESTSDK
Expand All @@ -56,5 +60,7 @@ namespace signalr
std::map<std::string, std::string> m_http_headers;
std::shared_ptr<scheduler> m_scheduler;
std::chrono::milliseconds m_handshake_timeout;
std::chrono::milliseconds m_server_timeout;
std::chrono::milliseconds m_keepalive_interval;
};
}
2 changes: 1 addition & 1 deletion src/signalrclient/cancellation_token_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ namespace signalr
void reset()
{
std::lock_guard<std::mutex> lock(m_lock);
assert(m_callbacks.empty());
//assert(m_callbacks.empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion fails if I try to call start hub connection after it has disconnected. Is it okay to reuse hub connection instance or should I recreate it after disconnection?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to reuse hub connection instance or should I recreate it after disconnection?

Yes.

This assertion fails if I try to call start hub connection after it has disconnected.

That generally means something wasn't properly cleaned up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into my app code and prepare a test case and then propose a fix.

Copy link
Contributor Author

@gonzo-coder gonzo-coder Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happened then hub_connection disconnects on initial transport failure, for example then server is unavailable, and after that app calls start again, in my case this is a reconnect scenario, I have modified existing test to model this issue:

TEST(start, propogates_exception_from_negotiate_and_starts_again)
{
    auto http_client = std::make_shared<test_http_client>([](const std::string& url, http_request, cancellation_token) -> http_response
        {
            throw custom_exception();
        });

    auto websocket_client = create_test_websocket_client();
    auto hub_connection = hub_connection_builder::create("http://fakeuri")
        .with_logging(std::make_shared<memory_log_writer>(), trace_level::none)
        .with_http_client_factory([http_client](const signalr_client_config& config)
            {
                http_client->set_scheduler(config.get_scheduler());
                return http_client;
            })
        .with_websocket_factory([websocket_client](const signalr_client_config&) { return websocket_client; })
        .build();

    auto mre = manual_reset_event<void>();
    hub_connection.start([&mre](std::exception_ptr exception)
        {
            mre.set(exception);
        });

    try
    {
        mre.get();
        ASSERT_TRUE(false);
    }
    catch (const custom_exception& e)
    {
        ASSERT_STREQ("custom exception", e.what());
    }

    hub_connection.start([&mre](std::exception_ptr exception)
        {
            
        }); ///<-------- here assertion fails

    ASSERT_EQ(connection_state::disconnected, hub_connection.get_connection_state());
}

This happens because in the connection_impl::start_negotiate, around 202 line, m_disconnect_cts is not canceled like in the connection_impl::shutdown.

I think I should create a separate issue on this matter because it is not directly connected with the issue we solving here.

m_signaled = false;
m_callbacks.clear();
}
Expand Down
200 changes: 158 additions & 42 deletions src/signalrclient/hub_connection_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace signalr
const std::shared_ptr<log_writer>& log_writer, std::function<std::shared_ptr<http_client>(const signalr_client_config&)> http_client_factory,
std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> websocket_factory, const bool skip_negotiation)
: m_connection(connection_impl::create(url, trace_level, log_writer, http_client_factory, websocket_factory, skip_negotiation))
, m_logger(log_writer, trace_level),
, m_logger(log_writer, trace_level),
m_callback_manager("connection went out of scope before invocation result was received"),
m_handshakeReceived(false), m_disconnected([](std::exception_ptr) noexcept {}), m_protocol(std::move(hub_protocol))
{ }
Expand All @@ -50,39 +50,39 @@ namespace signalr
std::weak_ptr<hub_connection_impl> weak_hub_connection = shared_from_this();

m_connection->set_message_received([weak_hub_connection](std::string&& message)
{
auto connection = weak_hub_connection.lock();
if (connection)
{
connection->process_message(std::move(message));
}
});
auto connection = weak_hub_connection.lock();
if (connection)
{
connection->process_message(std::move(message));
}
});

m_connection->set_disconnected([weak_hub_connection](std::exception_ptr exception)
{
auto connection = weak_hub_connection.lock();
if (connection)
{
// start may be waiting on the handshake response so we complete it here, this no-ops if already set
connection->m_handshakeTask->set(std::make_exception_ptr(signalr_exception("connection closed while handshake was in progress.")));
try
auto connection = weak_hub_connection.lock();
if (connection)
{
connection->m_disconnect_cts->cancel();
}
catch (const std::exception& ex)
{
if (connection->m_logger.is_enabled(trace_level::warning))
// start may be waiting on the handshake response so we complete it here, this no-ops if already set
connection->m_handshakeTask->set(std::make_exception_ptr(signalr_exception("connection closed while handshake was in progress.")));
try
{
connection->m_logger.log(trace_level::warning, std::string("disconnect event threw an exception during connection closure: ")
.append(ex.what()));
connection->m_disconnect_cts->cancel();
}
catch (const std::exception& ex)
{
if (connection->m_logger.is_enabled(trace_level::warning))
{
connection->m_logger.log(trace_level::warning, std::string("disconnect event threw an exception during connection closure: ")
.append(ex.what()));
}
}
}

connection->m_callback_manager.clear("connection was stopped before invocation result was received");
connection->m_callback_manager.clear("connection was stopped before invocation result was received");

connection->m_disconnected(exception);
}
});
connection->m_disconnected(exception);
}
});
}

void hub_connection_impl::on(const std::string& event_name, const std::function<void(const std::vector<signalr::value>&)>& handler)
Expand All @@ -105,7 +105,7 @@ namespace signalr
"an action for this event has already been registered. event name: " + event_name);
}

m_subscriptions.insert({event_name, handler});
m_subscriptions.insert({ event_name, handler });
}

void hub_connection_impl::start(std::function<void(std::exception_ptr)> callback) noexcept
Expand Down Expand Up @@ -185,6 +185,8 @@ namespace signalr
callback(exception);
}, exception);
}

connection->start_keepalive(weak_connection);
};

auto handshake_request = handshake::write_handshake(connection->m_protocol);
Expand Down Expand Up @@ -237,22 +239,22 @@ namespace signalr

connection->m_connection->send(handshake_request, connection->m_protocol->transfer_format(),
[handle_handshake, handshake_request_done, handshake_request_lock](std::exception_ptr exception)
{
{
std::lock_guard<std::mutex> lock(*handshake_request_lock);
if (*handshake_request_done == true)
{
// callback ran from timer or cancellation token, nothing to do here
return;
}
std::lock_guard<std::mutex> lock(*handshake_request_lock);
if (*handshake_request_done == true)
{
// callback ran from timer or cancellation token, nothing to do here
return;
}

// indicates that the handshake timer doesn't need to call the callback, it just needs to set the timeout exception
// handle_handshake will be waiting on the handshake completion (error or success) to call the callback
*handshake_request_done = true;
}
// indicates that the handshake timer doesn't need to call the callback, it just needs to set the timeout exception
// handle_handshake will be waiting on the handshake completion (error or success) to call the callback
*handshake_request_done = true;
}

handle_handshake(exception, true);
});
handle_handshake(exception, true);
});
});
}

Expand Down Expand Up @@ -348,6 +350,7 @@ namespace signalr
}
}

reset_server_timeout();
auto messages = m_protocol->parse_messages(response);

for (const auto& val : messages)
Expand Down Expand Up @@ -385,15 +388,18 @@ namespace signalr
// Sent to server only, should not be received by client
throw std::runtime_error("Received unexpected message type 'CancelInvocation'.");
case message_type::ping:
// TODO
if (m_logger.is_enabled(trace_level::info))
{
m_logger.log(trace_level::info, std::string("ping message received."));
}
break;
case message_type::close:
// TODO
break;
}
}
}
catch (const std::exception &e)
catch (const std::exception& e)
{
if (m_logger.is_enabled(trace_level::error))
{
Expand Down Expand Up @@ -436,14 +442,14 @@ namespace signalr
[callback](const std::exception_ptr e) { callback(signalr::value(), e); }));

invoke_hub_method(method_name, arguments, callback_id, nullptr,
[callback](const std::exception_ptr e){ callback(signalr::value(), e); });
[callback](const std::exception_ptr e) { callback(signalr::value(), e); });
}

void hub_connection_impl::send(const std::string& method_name, const std::vector<signalr::value>& arguments, std::function<void(std::exception_ptr)> callback) noexcept
{
invoke_hub_method(method_name, arguments, "",
[callback]() { callback(nullptr); },
[callback](const std::exception_ptr e){ callback(e); });
[callback](const std::exception_ptr e) { callback(e); });
}

void hub_connection_impl::invoke_hub_method(const std::string& method_name, const std::vector<signalr::value>& arguments,
Expand Down Expand Up @@ -477,6 +483,8 @@ namespace signalr
}
}
});

reset_send_ping();
}
catch (const std::exception& e)
{
Expand Down Expand Up @@ -510,6 +518,114 @@ namespace signalr
m_disconnected = disconnected;
}

void hub_connection_impl::reset_send_ping()
{
auto timeMs = (std::chrono::steady_clock::now() + m_signalr_client_config.get_keepalive_interval()).time_since_epoch();
m_nextActivationSendPing.store(std::chrono::duration_cast<std::chrono::milliseconds>(timeMs).count());
}

void hub_connection_impl::reset_server_timeout()
{
auto timeMs = (std::chrono::steady_clock::now() + m_signalr_client_config.get_server_timeout()).time_since_epoch();
m_nextActivationServerTimeout.store(std::chrono::duration_cast<std::chrono::milliseconds>(timeMs).count());
}

void hub_connection_impl::start_keepalive(std::weak_ptr<hub_connection_impl> weak_connection)
{
auto connection = weak_connection.lock();

if (connection)
{
if (connection->m_logger.is_enabled(trace_level::info))
connection->m_logger.log(trace_level::info, std::string("Start keep alive timer!"));
}

auto send_ping = [weak_connection]()
{
auto connection = weak_connection.lock();
if (connection && connection->get_connection_state() != connection_state::connected)
{
return;
}

try
{
hub_message ping_msg(signalr::message_type::ping);
auto message = connection->m_protocol->write_message(&ping_msg);

connection->m_connection->send(
message,
connection->m_protocol->transfer_format(), [weak_connection](std::exception_ptr exception)
{
auto connection = weak_connection.lock();
if (connection)
{
if (exception)
{
if (connection->m_logger.is_enabled(trace_level::warning))
connection->m_logger.log(trace_level::warning, std::string("failed to send ping!"));
}
else
{
connection->reset_send_ping();
}
}
});
}
catch (const std::exception& e)
{
if (connection->m_logger.is_enabled(trace_level::warning))
{
connection->m_logger.log(trace_level::warning, std::string("failed to send ping: ").append(e.what()));
}
}
};

send_ping();
reset_server_timeout();

timer(m_signalr_client_config.get_scheduler(),
[send_ping, weak_connection](std::chrono::milliseconds)
{
auto connection = weak_connection.lock();

if (!connection)
{
return true;
}

if (connection && connection->get_connection_state() != connection_state::connected)
{
return true;
}

auto timeNowmSeconds =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();

if (timeNowmSeconds > connection->m_nextActivationServerTimeout.load())
{
if (connection->get_connection_state() == connection_state::connected)
{
if (connection->m_logger.is_enabled(trace_level::warning))
connection->m_logger.log(trace_level::warning, std::string("Server keepalive timeout. Stopping..."));
connection->m_connection->stop([](std::exception_ptr)
{

}, nullptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create an exception and pass it here; it will surface in the users' disconnected callback.

}
}

if (timeNowmSeconds > connection->m_nextActivationSendPing.load())
{
if (connection->m_logger.is_enabled(trace_level::info))
connection->m_logger.log(trace_level::info, std::string("Send ping to server..."));
send_ping();
}

return false;
});
}

// unnamed namespace makes it invisble outside this translation unit
namespace
{
Expand Down
8 changes: 8 additions & 0 deletions src/signalrclient/hub_connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ namespace signalr
signalr_client_config m_signalr_client_config;
std::unique_ptr<hub_protocol> m_protocol;

std::atomic<int64_t> m_nextActivationServerTimeout;
std::atomic<int64_t> m_nextActivationSendPing;

std::mutex m_stop_callback_lock;
std::vector<std::function<void(std::exception_ptr)>> m_stop_callbacks;

Expand All @@ -75,5 +78,10 @@ namespace signalr
void invoke_hub_method(const std::string& method_name, const std::vector<signalr::value>& arguments, const std::string& callback_id,
std::function<void()> set_completion, std::function<void(const std::exception_ptr)> set_exception) noexcept;
bool invoke_callback(completion_message* completion);

void reset_send_ping();
void reset_server_timeout();

void start_keepalive(std::weak_ptr<hub_connection_impl> weak_connection);
};
}
31 changes: 31 additions & 0 deletions src/signalrclient/signalr_client_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace signalr

signalr_client_config::signalr_client_config()
: m_handshake_timeout(std::chrono::seconds(15))
, m_server_timeout(std::chrono::seconds(30))
, m_keepalive_interval(std::chrono::seconds(15))
{
m_scheduler = std::make_shared<signalr_default_scheduler>();
}
Expand Down Expand Up @@ -92,4 +94,33 @@ namespace signalr
{
return m_handshake_timeout;
}

void signalr_client_config::set_server_timeout(std::chrono::milliseconds timeout)
{
if (timeout <= std::chrono::seconds(0))
{
throw std::runtime_error("timeout must be greater than 0.");
}

m_server_timeout = timeout;
}

std::chrono::milliseconds signalr_client_config::get_server_timeout() const noexcept
{
return m_server_timeout;
}
void signalr_client_config::set_keepalive_interval(std::chrono::milliseconds interval)
{
if (interval <= std::chrono::seconds(0))
{
throw std::runtime_error("timeout must be greater than 0.");
}

m_keepalive_interval = interval;
}

std::chrono::milliseconds signalr_client_config::get_keepalive_interval() const noexcept
{
return m_keepalive_interval;
}
}