diff --git a/src/Servers/Kestrel/test/BindTests/AddressRegistrationTests.cs b/src/Servers/Kestrel/test/BindTests/AddressRegistrationTests.cs index 88c37750f604..50fbbfb73dc9 100644 --- a/src/Servers/Kestrel/test/BindTests/AddressRegistrationTests.cs +++ b/src/Servers/Kestrel/test/BindTests/AddressRegistrationTests.cs @@ -100,6 +100,7 @@ public async Task RegisterIPEndPoint_IPv6StaticPort_Success() [ConditionalTheory] [MemberData(nameof(IPEndPointRegistrationDataDynamicPort))] [IPv6SupportedCondition] + [Flaky("https://github.com/aspnet/AspNetCore-Internal/issues/2074", FlakyOn.AzP.macOS)] public async Task RegisterIPEndPoint_DynamicPort_Success(IPEndPoint endPoint, string testUrl) { await RegisterIPEndPoint_Success(endPoint, testUrl); @@ -447,6 +448,7 @@ public Task DefaultsServerAddress_BindsToIPv4WithHttps() [ConditionalFact] [IPv6SupportedCondition] + [Flaky("https://github.com/aspnet/AspNetCore-Internal/issues/1756", FlakyOn.AzP.macOS)] public Task DefaultsServerAddress_BindsToIPv6WithHttps() { if (!CanBindToEndpoint(IPAddress.Loopback, 5000) || !CanBindToEndpoint(IPAddress.IPv6Loopback, 5000) diff --git a/src/SignalR/clients/cpp/include/signalrclient/http_client.h b/src/SignalR/clients/cpp/include/signalrclient/http_client.h new file mode 100644 index 000000000000..3b3b9b390afd --- /dev/null +++ b/src/SignalR/clients/cpp/include/signalrclient/http_client.h @@ -0,0 +1,54 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#pragma once + +#include +#include +#include +#include + +namespace signalr +{ + enum class http_method + { + GET, + POST + }; + + class http_request + { + public: + http_method method; + std::map headers; + std::string content; + std::chrono::seconds timeout; + }; + + class http_response + { + public: + http_response() {} + http_response(http_response&& rhs) noexcept : status_code(rhs.status_code), content(std::move(rhs.content)) {} + http_response(int code, const std::string& content) : status_code(code), content(content) {} + + http_response& operator=(http_response&& rhs) + { + status_code = rhs.status_code; + content = std::move(rhs.content); + + return *this; + } + + int status_code = 0; + std::string content; + }; + + class http_client + { + public: + virtual void send(std::string url, http_request request, std::function callback) = 0; + + virtual ~http_client() {} + }; +} diff --git a/src/SignalR/clients/cpp/include/signalrclient/transfer_format.h b/src/SignalR/clients/cpp/include/signalrclient/transfer_format.h new file mode 100644 index 000000000000..8355d25309f1 --- /dev/null +++ b/src/SignalR/clients/cpp/include/signalrclient/transfer_format.h @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#pragma once + +namespace signalr +{ + enum class transfer_format + { + text, + binary + }; +} diff --git a/src/SignalR/clients/cpp/include/signalrclient/transport_type.h b/src/SignalR/clients/cpp/include/signalrclient/transport_type.h index 2db53381825c..bc30ee464602 100644 --- a/src/SignalR/clients/cpp/include/signalrclient/transport_type.h +++ b/src/SignalR/clients/cpp/include/signalrclient/transport_type.h @@ -10,4 +10,4 @@ namespace signalr long_polling, websockets }; -} \ No newline at end of file +} diff --git a/src/SignalR/clients/cpp/include/signalrclient/websocket_client.h b/src/SignalR/clients/cpp/include/signalrclient/websocket_client.h new file mode 100644 index 000000000000..b45e73995f3b --- /dev/null +++ b/src/SignalR/clients/cpp/include/signalrclient/websocket_client.h @@ -0,0 +1,23 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#pragma once + +#include "transfer_format.h" + +namespace signalr +{ + class websocket_client + { + public: + virtual ~websocket_client() {}; + + virtual void start(std::string url, transfer_format format, std::function callback) = 0; + + virtual void stop(std::function callback) = 0; + + virtual void send(std::string payload, std::function callback) = 0; + + virtual void receive(std::function callback) = 0; + }; +} diff --git a/src/SignalR/clients/cpp/samples/HubConnectionSample/HubConnectionSample.cpp b/src/SignalR/clients/cpp/samples/HubConnectionSample/HubConnectionSample.cpp index c67e9f1a2543..d1e8896986a4 100644 --- a/src/SignalR/clients/cpp/samples/HubConnectionSample/HubConnectionSample.cpp +++ b/src/SignalR/clients/cpp/samples/HubConnectionSample/HubConnectionSample.cpp @@ -17,14 +17,13 @@ class logger : public signalr::log_writer } }; -void send_message(signalr::hub_connection& connection, const std::string& name, const std::string& message) +void send_message(signalr::hub_connection& connection, const std::string& message) { web::json::value args{}; - args[0] = web::json::value::string(utility::conversions::to_string_t(name)); - args[1] = web::json::value(utility::conversions::to_string_t(message)); + args[0] = web::json::value(utility::conversions::to_string_t(message)); // if you get an internal compiler error uncomment the lambda below or install VS Update 4 - connection.invoke("Invoke", args/*, [](const web::json::value&){}*/) + connection.invoke("Send", args) .then([](pplx::task invoke_task) // fire and forget but we need to observe exceptions { try @@ -39,7 +38,7 @@ void send_message(signalr::hub_connection& connection, const std::string& name, }); } -void chat(const std::string& name) +void chat() { signalr::hub_connection connection("http://localhost:5000/default", signalr::trace_level::all, std::make_shared()); connection.on("Send", [](const web::json::value& m) @@ -48,7 +47,7 @@ void chat(const std::string& name) }); connection.start() - .then([&connection, name]() + .then([&connection]() { ucout << U("Enter your message:"); for (;;) @@ -61,7 +60,7 @@ void chat(const std::string& name) break; } - send_message(connection, name, message); + send_message(connection, message); } }) .then([&connection]() // fine to capture by reference - we are blocking so it is guaranteed to be valid @@ -84,11 +83,7 @@ void chat(const std::string& name) int main() { - ucout << U("Enter your name: "); - std::string name; - std::getline(std::cin, name); - - chat(name); + chat(); return 0; } diff --git a/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj b/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj index c74fbdc9a9ae..88168fdf9844 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj +++ b/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj @@ -41,18 +41,22 @@ + + + + @@ -60,13 +64,12 @@ - + - @@ -75,12 +78,13 @@ + - + Create @@ -105,4 +109,4 @@ - + \ No newline at end of file diff --git a/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj.filters b/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj.filters index eb4f38f008b6..aceb1a98079d 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj.filters +++ b/src/SignalR/clients/cpp/src/signalrclient/Build/VS/signalrclient.vcxproj.filters @@ -45,9 +45,6 @@ Header Files - - Header Files - Header Files @@ -57,9 +54,6 @@ Header Files - - Header Files - Header Files @@ -111,6 +105,21 @@ Header Files + + Header Files + + + Header Files + + + Header Files + + + Header Files + + + Header Files + @@ -164,7 +173,10 @@ Source Files - + + Source Files + + Source Files diff --git a/src/SignalR/clients/cpp/src/signalrclient/connection_impl.cpp b/src/SignalR/clients/cpp/src/signalrclient/connection_impl.cpp index 26d40e67970f..3ac69d7382da 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/connection_impl.cpp +++ b/src/SignalR/clients/cpp/src/signalrclient/connection_impl.cpp @@ -6,11 +6,12 @@ #include #include "constants.h" #include "connection_impl.h" -#include "request_sender.h" +#include "negotiate.h" #include "url_builder.h" #include "trace_log_writer.h" #include "make_unique.h" #include "signalrclient/signalr_exception.h" +#include "default_http_client.h" namespace signalr { @@ -23,22 +24,30 @@ namespace signalr std::shared_ptr connection_impl::create(const std::string& url, trace_level trace_level, const std::shared_ptr& log_writer) { - return connection_impl::create(url, trace_level, log_writer, std::make_unique(), std::make_unique()); + return connection_impl::create(url, trace_level, log_writer, nullptr, std::make_unique()); } std::shared_ptr connection_impl::create(const std::string& url, trace_level trace_level, const std::shared_ptr& log_writer, - std::unique_ptr web_request_factory, std::unique_ptr transport_factory) + std::unique_ptr http_client, std::unique_ptr transport_factory) { return std::shared_ptr(new connection_impl(url, trace_level, - log_writer ? log_writer : std::make_shared(), std::move(web_request_factory), std::move(transport_factory))); + log_writer ? log_writer : std::make_shared(), std::move(http_client), std::move(transport_factory))); } connection_impl::connection_impl(const std::string& url, trace_level trace_level, const std::shared_ptr& log_writer, - std::unique_ptr web_request_factory, std::unique_ptr transport_factory) - : m_base_url(url), m_connection_state(connection_state::disconnected), m_logger(log_writer, trace_level), - m_transport(nullptr), m_web_request_factory(std::move(web_request_factory)), m_transport_factory(std::move(transport_factory)), - m_message_received([](const std::string&) noexcept {}), m_disconnected([]() noexcept {}) - { } + std::unique_ptr http_client, std::unique_ptr transport_factory) + : m_base_url(url), m_connection_state(connection_state::disconnected), m_logger(log_writer, trace_level), m_transport(nullptr), + m_transport_factory(std::move(transport_factory)), m_message_received([](const std::string&) noexcept {}), m_disconnected([]() noexcept {}) + { + if (http_client != nullptr) + { + m_http_client = std::move(http_client); + } + else + { + m_http_client = std::unique_ptr(new default_http_client()); + } + } connection_impl::~connection_impl() { @@ -105,7 +114,7 @@ namespace signalr { return pplx::task_from_exception("connection no longer exists"); } - return request_sender::negotiate(*connection->m_web_request_factory, url, connection->m_signalr_client_config); + return negotiate::negotiate(*connection->m_http_client, url, connection->m_signalr_client_config); }, m_disconnect_cts.get_token()) .then([weak_connection, start_tce, redirect_count, url](negotiation_response negotiation_response) { @@ -224,52 +233,54 @@ namespace signalr const auto& disconnect_cts = m_disconnect_cts; const auto& logger = m_logger; - auto process_response_callback = - [weak_connection, disconnect_cts, logger](const std::string& response) mutable - { - // When a connection is stopped we don't wait for its transport to stop. As a result if the same connection - // is immediately re-started the old transport can still invoke this callback. To prevent this we capture - // the disconnect_cts by value which allows distinguishing if the message is for the running connection - // or for the one that was already stopped. If this is the latter we just ignore it. - if (disconnect_cts.get_token().is_canceled()) - { - logger.log(trace_level::info, - std::string{ "ignoring stray message received after connection was restarted. message: " } - .append(response)); - return; - } + auto transport = connection->m_transport_factory->create_transport( + transport_type::websockets, connection->m_logger, connection->m_signalr_client_config); - auto connection = weak_connection.lock(); - if (connection) + transport->on_receive([disconnect_cts, connect_request_tce, logger, weak_connection](std::string message, std::exception_ptr exception) + { + if (exception != nullptr) { - connection->process_response(response); - } - }; + try + { + // Rethrowing the exception so we can log it + std::rethrow_exception(exception); + } + catch (const std::exception & e) + { + // When a connection is stopped we don't wait for its transport to stop. As a result if the same connection + // is immediately re-started the old transport can still invoke this callback. To prevent this we capture + // the disconnect_cts by value which allows distinguishing if the error is for the running connection + // or for the one that was already stopped. If this is the latter we just ignore it. + if (disconnect_cts.get_token().is_canceled()) + { + logger.log(trace_level::info, + std::string{ "ignoring stray error received after connection was restarted. error: " } + .append(e.what())); + return; + } - auto error_callback = - [weak_connection, connect_request_tce, disconnect_cts, logger](const std::exception &e) mutable - { - // When a connection is stopped we don't wait for its transport to stop. As a result if the same connection - // is immediately re-started the old transport can still invoke this callback. To prevent this we capture - // the disconnect_cts by value which allows distinguishing if the error is for the running connection - // or for the one that was already stopped. If this is the latter we just ignore it. - if (disconnect_cts.get_token().is_canceled()) + // no op after connection started successfully + connect_request_tce.set_exception(exception); + } + } + else { - logger.log(trace_level::info, - std::string{ "ignoring stray error received after connection was restarted. error: " } - .append(e.what())); + if (disconnect_cts.get_token().is_canceled()) + { + logger.log(trace_level::info, + std::string{ "ignoring stray message received after connection was restarted. message: " } + .append(message)); + return; + } - return; + auto connection = weak_connection.lock(); + if (connection) + { + connection->process_response(message); + } } - - // no op after connection started successfully - connect_request_tce.set_exception(e); - }; - - auto transport = connection->m_transport_factory->create_transport( - transport_type::websockets, connection->m_logger, connection->m_signalr_client_config, - process_response_callback, error_callback); + }); pplx::create_task([connect_request_tce, disconnect_cts, weak_connection]() { @@ -300,12 +311,14 @@ namespace signalr auto query_string = "id=" + m_connection_id; auto connect_url = url_builder::build_connect(url, transport->get_transport_type(), query_string); - transport->connect(connect_url) - .then([transport, connect_request_tce, logger](pplx::task connect_task) + transport->start(connect_url, transfer_format::text, [transport, connect_request_tce, logger](std::exception_ptr exception) mutable { try { - connect_task.get(); + if (exception != nullptr) + { + std::rethrow_exception(exception); + } connect_request_tce.set(); } catch (const std::exception& e) @@ -368,12 +381,16 @@ namespace signalr logger.log(trace_level::info, std::string("sending data: ").append(data)); - return transport->send(data) - .then([logger](pplx::task send_task) + pplx::task_completion_event event; + transport->send(data, [logger, event](std::exception_ptr exception) mutable { try { - send_task.get(); + if (exception != nullptr) + { + std::rethrow_exception(exception); + } + event.set(); } catch (const std::exception &e) { @@ -382,9 +399,11 @@ namespace signalr std::string("error sending data: ") .append(e.what())); - throw; + event.set_exception(exception); } }); + + return pplx::create_task(event); } pplx::task connection_impl::stop() @@ -471,7 +490,20 @@ namespace signalr change_state(connection_state::disconnecting); } - return m_transport->disconnect(); + pplx::task_completion_event tce; + m_transport->stop([tce](std::exception_ptr exception) + { + if (exception != nullptr) + { + tce.set_exception(exception); + } + else + { + tce.set(); + } + }); + + return pplx::create_task(tce); } connection_state connection_impl::get_connection_state() const noexcept diff --git a/src/SignalR/clients/cpp/src/signalrclient/connection_impl.h b/src/SignalR/clients/cpp/src/signalrclient/connection_impl.h index cd712e59217d..1972c4ebc118 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/connection_impl.h +++ b/src/SignalR/clients/cpp/src/signalrclient/connection_impl.h @@ -6,6 +6,7 @@ #include #include #include "cpprest/http_client.h" +#include "signalrclient/http_client.h" #include "signalrclient/trace_level.h" #include "signalrclient/connection_state.h" #include "signalrclient/signalr_client_config.h" @@ -28,7 +29,7 @@ namespace signalr static std::shared_ptr create(const std::string& url, trace_level trace_level, const std::shared_ptr& log_writer); static std::shared_ptr create(const std::string& url, trace_level trace_level, const std::shared_ptr& log_writer, - std::unique_ptr web_request_factory, std::unique_ptr transport_factory); + std::unique_ptr http_client, std::unique_ptr transport_factory); connection_impl(const connection_impl&) = delete; @@ -52,7 +53,6 @@ namespace signalr std::atomic m_connection_state; logger m_logger; std::shared_ptr m_transport; - std::unique_ptr m_web_request_factory; std::unique_ptr m_transport_factory; std::function m_message_received; @@ -63,9 +63,10 @@ namespace signalr std::mutex m_stop_lock; event m_start_completed_event; std::string m_connection_id; + std::unique_ptr m_http_client; connection_impl(const std::string& url, trace_level trace_level, const std::shared_ptr& log_writer, - std::unique_ptr web_request_factory, std::unique_ptr transport_factory); + std::unique_ptr http_client, std::unique_ptr transport_factory); pplx::task> start_transport(const std::string& url); pplx::task send_connect_request(const std::shared_ptr& transport, diff --git a/src/SignalR/clients/cpp/src/signalrclient/default_http_client.cpp b/src/SignalR/clients/cpp/src/signalrclient/default_http_client.cpp new file mode 100644 index 000000000000..6a56e5c0d723 --- /dev/null +++ b/src/SignalR/clients/cpp/src/signalrclient/default_http_client.cpp @@ -0,0 +1,73 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#include "stdafx.h" +#include "default_http_client.h" + +namespace signalr +{ + void default_http_client::send(std::string url, http_request request, std::function callback) + { + web::http::method method; + if (request.method == http_method::GET) + { + method = U("GET"); + } + else if (request.method == http_method::POST) + { + method = U("POST"); + } + else + { + callback(http_response(), std::make_exception_ptr(std::runtime_error("unknown http method"))); + return; + } + + web::http::http_request http_request; + http_request.set_method(method); + http_request.set_body(request.content); + if (request.headers.size() > 0) + { + web::http::http_headers headers; + for (auto& header : request.headers) + { + headers.add(utility::conversions::to_string_t(header.first), utility::conversions::to_string_t(header.second)); + } + http_request.headers() = headers; + } + + auto milliseconds = std::chrono::milliseconds(request.timeout).count(); + pplx::cancellation_token_source cts; + if (milliseconds != 0) + { + pplx::create_task([milliseconds, cts]() + { + pplx::wait((unsigned int)milliseconds); + cts.cancel(); + }); + } + + web::http::client::http_client client(utility::conversions::to_string_t(url)); + client.request(http_request, cts.get_token()) + .then([callback](pplx::task response_task) + { + try + { + auto http_response = response_task.get(); + auto status_code = http_response.status_code(); + http_response.extract_utf8string() + .then([callback, status_code](std::string response_body) + { + signalr::http_response response; + response.content = response_body; + response.status_code = status_code; + callback(std::move(response), nullptr); + }); + } + catch (...) + { + callback(http_response(), std::current_exception()); + } + }); + } +} diff --git a/src/SignalR/clients/cpp/src/signalrclient/default_http_client.h b/src/SignalR/clients/cpp/src/signalrclient/default_http_client.h new file mode 100644 index 000000000000..6cffb8d37a11 --- /dev/null +++ b/src/SignalR/clients/cpp/src/signalrclient/default_http_client.h @@ -0,0 +1,16 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#pragma once + +#include "signalrclient/http_client.h" +#include "cpprest/http_client.h" + +namespace signalr +{ + class default_http_client : public http_client + { + public: + void send(std::string url, http_request request, std::function callback) override; + }; +} diff --git a/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.cpp b/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.cpp index 2bf07b93f9f9..e8c683c141b5 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.cpp +++ b/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.cpp @@ -21,30 +21,73 @@ namespace signalr : m_underlying_client(create_client_config(signalr_client_config)) { } - pplx::task default_websocket_client::connect(const std::string& url) + void default_websocket_client::start(std::string url, transfer_format, std::function callback) { - return m_underlying_client.connect(utility::conversions::to_string_t(url)); + m_underlying_client.connect(utility::conversions::to_string_t(url)) + .then([callback](pplx::task task) + { + try + { + task.get(); + callback(nullptr); + } + catch (...) + { + callback(std::current_exception()); + } + }); } - pplx::task default_websocket_client::send(const std::string &message) + void default_websocket_client::stop(std::function callback) { - web::websockets::client::websocket_outgoing_message msg; - msg.set_utf8_message(message); - return m_underlying_client.send(msg); + m_underlying_client.close() + .then([callback](pplx::task task) + { + try + { + callback(nullptr); + } + catch (...) + { + callback(std::current_exception()); + } + }); } - pplx::task default_websocket_client::receive() + void default_websocket_client::send(std::string payload, std::function callback) { - // the caller is responsible for observing exceptions - return m_underlying_client.receive() - .then([](web::websockets::client::websocket_incoming_message msg) + web::websockets::client::websocket_outgoing_message msg; + msg.set_utf8_message(payload); + m_underlying_client.send(msg) + .then([callback](pplx::task task) { - return msg.extract_string(); + try + { + task.get(); + callback(nullptr); + } + catch (...) + { + callback(std::current_exception()); + } }); } - pplx::task default_websocket_client::close() + void default_websocket_client::receive(std::function callback) { - return m_underlying_client.close(); + m_underlying_client.receive() + .then([callback](pplx::task task) + { + try + { + auto response = task.get(); + auto msg = response.extract_string().get(); + callback(msg, nullptr); + } + catch (...) + { + callback("", std::current_exception()); + } + }); } } diff --git a/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.h b/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.h index 7a7386db9dcc..7da7760a1f5c 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.h +++ b/src/SignalR/clients/cpp/src/signalrclient/default_websocket_client.h @@ -5,7 +5,7 @@ #include "cpprest/ws_client.h" #include "signalrclient/signalr_client_config.h" -#include "websocket_client.h" +#include "signalrclient/websocket_client.h" namespace signalr { @@ -14,14 +14,10 @@ namespace signalr public: explicit default_websocket_client(const signalr_client_config& signalr_client_config = {}) noexcept; - pplx::task connect(const std::string& url) override; - - pplx::task send(const std::string& message) override; - - pplx::task receive() override; - - pplx::task close() override; - + void start(std::string url, transfer_format format, std::function callback) override; + void stop(std::function callback) override; + void send(std::string payload, std::function callback) override; + void receive(std::function callback) override; private: web::websockets::client::websocket_client m_underlying_client; }; diff --git a/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.cpp b/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.cpp index 8f9d7a029020..a8d9a83f2113 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.cpp +++ b/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.cpp @@ -24,15 +24,15 @@ namespace signalr const std::shared_ptr& log_writer) { return hub_connection_impl::create(url, trace_level, log_writer, - std::make_unique(), std::make_unique()); + nullptr, std::make_unique()); } std::shared_ptr hub_connection_impl::create(const std::string& url, trace_level trace_level, - const std::shared_ptr& log_writer, std::unique_ptr web_request_factory, + const std::shared_ptr& log_writer, std::unique_ptr http_client, std::unique_ptr transport_factory) { auto connection = std::shared_ptr(new hub_connection_impl(url, trace_level, - log_writer ? log_writer : std::make_shared(), std::move(web_request_factory), std::move(transport_factory))); + log_writer ? log_writer : std::make_shared(), std::move(http_client), std::move(transport_factory))); connection->initialize(); @@ -40,10 +40,10 @@ namespace signalr } hub_connection_impl::hub_connection_impl(const std::string& url, trace_level trace_level, - const std::shared_ptr& log_writer, std::unique_ptr web_request_factory, + const std::shared_ptr& log_writer, std::unique_ptr http_client, std::unique_ptr transport_factory) : m_connection(connection_impl::create(url, trace_level, log_writer, - std::move(web_request_factory), std::move(transport_factory))), m_logger(log_writer, trace_level), + std::move(http_client), std::move(transport_factory))), m_logger(log_writer, trace_level), m_callback_manager(json::value::parse(_XPLATSTR("{ \"error\" : \"connection went out of scope before invocation result was received\"}"))), m_disconnected([]() noexcept {}), m_handshakeReceived(false) { } diff --git a/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.h b/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.h index 1548c2c4980a..ca3b6c586a8d 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.h +++ b/src/SignalR/clients/cpp/src/signalrclient/hub_connection_impl.h @@ -24,7 +24,7 @@ namespace signalr const std::shared_ptr& log_writer); static std::shared_ptr create(const std::string& url, trace_level trace_level, - const std::shared_ptr& log_writer, std::unique_ptr web_request_factory, + const std::shared_ptr& log_writer, std::unique_ptr http_client, std::unique_ptr transport_factory); hub_connection_impl(const hub_connection_impl&) = delete; @@ -46,7 +46,7 @@ namespace signalr private: hub_connection_impl(const std::string& url, trace_level trace_level, const std::shared_ptr& log_writer, - std::unique_ptr web_request_factory, std::unique_ptr transport_factory); + std::unique_ptr http_client, std::unique_ptr transport_factory); std::shared_ptr m_connection; logger m_logger; diff --git a/src/SignalR/clients/cpp/src/signalrclient/negotiate.cpp b/src/SignalR/clients/cpp/src/signalrclient/negotiate.cpp new file mode 100644 index 000000000000..a38d1e31e3a9 --- /dev/null +++ b/src/SignalR/clients/cpp/src/signalrclient/negotiate.cpp @@ -0,0 +1,103 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#include "stdafx.h" +#include "negotiate.h" +#include "url_builder.h" +#include "signalrclient/signalr_exception.h" + +namespace signalr +{ + namespace negotiate + { + pplx::task negotiate(http_client& client, const std::string& base_url, + const signalr_client_config& config) + { + auto negotiate_url = url_builder::build_negotiate(base_url); + + pplx::task_completion_event tce; + + // TODO: signalr_client_config + http_request request; + request.method = http_method::POST; + + for (auto& header : config.get_http_headers()) + { + request.headers.insert(std::make_pair(utility::conversions::to_utf8string(header.first), utility::conversions::to_utf8string(header.second))); + } + + client.send(negotiate_url, request, [tce](http_response http_response, std::exception_ptr exception) + { + if (exception != nullptr) + { + tce.set_exception(exception); + return; + } + + if (http_response.status_code != 200) + { + tce.set_exception(signalr_exception("negotiate failed with status code " + std::to_string(http_response.status_code))); + return; + } + + try + { + auto negotiation_response_json = web::json::value::parse(utility::conversions::to_string_t(http_response.content)); + + negotiation_response response; + + if (negotiation_response_json.has_field(_XPLATSTR("error"))) + { + response.error = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("error")].as_string()); + tce.set(std::move(response)); + return; + } + + if (negotiation_response_json.has_field(_XPLATSTR("connectionId"))) + { + response.connectionId = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("connectionId")].as_string()); + } + + if (negotiation_response_json.has_field(_XPLATSTR("availableTransports"))) + { + for (auto transportData : negotiation_response_json[_XPLATSTR("availableTransports")].as_array()) + { + available_transport transport; + transport.transport = utility::conversions::to_utf8string(transportData[_XPLATSTR("transport")].as_string()); + + for (auto format : transportData[_XPLATSTR("transferFormats")].as_array()) + { + transport.transfer_formats.push_back(utility::conversions::to_utf8string(format.as_string())); + } + + response.availableTransports.push_back(transport); + } + } + + if (negotiation_response_json.has_field(_XPLATSTR("url"))) + { + response.url = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("url")].as_string()); + + if (negotiation_response_json.has_field(_XPLATSTR("accessToken"))) + { + response.accessToken = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("accessToken")].as_string()); + } + } + + if (negotiation_response_json.has_field(_XPLATSTR("ProtocolVersion"))) + { + tce.set_exception(signalr_exception("Detected a connection attempt to an ASP.NET SignalR Server. This client only supports connecting to an ASP.NET Core SignalR Server. See https://aka.ms/signalr-core-differences for details.")); + } + + tce.set(std::move(response)); + } + catch (...) + { + tce.set_exception(std::current_exception()); + } + }); + + return pplx::create_task(tce); + } + } +} diff --git a/src/SignalR/clients/cpp/src/signalrclient/request_sender.h b/src/SignalR/clients/cpp/src/signalrclient/negotiate.h similarity index 74% rename from src/SignalR/clients/cpp/src/signalrclient/request_sender.h rename to src/SignalR/clients/cpp/src/signalrclient/negotiate.h index 73becd3eb06d..a627e533d78c 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/request_sender.h +++ b/src/SignalR/clients/cpp/src/signalrclient/negotiate.h @@ -7,13 +7,13 @@ #include "signalrclient/transport_type.h" #include "web_request_factory.h" #include "negotiation_response.h" - +#include "signalrclient/http_client.h" namespace signalr { - namespace request_sender + namespace negotiate { - pplx::task negotiate(web_request_factory& request_factory, const std::string& base_url, + pplx::task negotiate(http_client& client, const std::string& base_url, const signalr_client_config& signalr_client_config = signalr::signalr_client_config{}); } } diff --git a/src/SignalR/clients/cpp/src/signalrclient/request_sender.cpp b/src/SignalR/clients/cpp/src/signalrclient/request_sender.cpp deleted file mode 100644 index b3d2e99d3914..000000000000 --- a/src/SignalR/clients/cpp/src/signalrclient/request_sender.cpp +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -#include "stdafx.h" -#include "request_sender.h" -#include "http_sender.h" -#include "url_builder.h" -#include "signalrclient/signalr_exception.h" - -namespace signalr -{ - namespace request_sender - { - pplx::task negotiate(web_request_factory& request_factory, const std::string& base_url, - const signalr_client_config& signalr_client_config) - { - auto negotiate_url = url_builder::build_negotiate(base_url); - - return http_sender::post(request_factory, negotiate_url, signalr_client_config) - .then([](std::string body) - { - auto negotiation_response_json = web::json::value::parse(utility::conversions::to_string_t(body)); - - negotiation_response response; - - if (negotiation_response_json.has_field(_XPLATSTR("error"))) - { - response.error = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("error")].as_string()); - return std::move(response); - } - - if (negotiation_response_json.has_field(_XPLATSTR("connectionId"))) - { - response.connectionId = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("connectionId")].as_string()); - } - - if (negotiation_response_json.has_field(_XPLATSTR("availableTransports"))) - { - for (auto transportData : negotiation_response_json[_XPLATSTR("availableTransports")].as_array()) - { - available_transport transport; - transport.transport = utility::conversions::to_utf8string(transportData[_XPLATSTR("transport")].as_string()); - - for (auto format : transportData[_XPLATSTR("transferFormats")].as_array()) - { - transport.transfer_formats.push_back(utility::conversions::to_utf8string(format.as_string())); - } - - response.availableTransports.push_back(transport); - } - } - - if (negotiation_response_json.has_field(_XPLATSTR("url"))) - { - response.url = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("url")].as_string()); - - if (negotiation_response_json.has_field(_XPLATSTR("accessToken"))) - { - response.accessToken = utility::conversions::to_utf8string(negotiation_response_json[_XPLATSTR("accessToken")].as_string()); - } - } - - if (negotiation_response_json.has_field(_XPLATSTR("ProtocolVersion"))) - { - throw signalr_exception("Detected a connection attempt to an ASP.NET SignalR Server. This client only supports connecting to an ASP.NET Core SignalR Server. See https://aka.ms/signalr-core-differences for details."); - } - - return std::move(response); - }); - } - } -} diff --git a/src/SignalR/clients/cpp/src/signalrclient/transport.cpp b/src/SignalR/clients/cpp/src/signalrclient/transport.cpp index 89de95eeaea0..4f235e16f438 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/transport.cpp +++ b/src/SignalR/clients/cpp/src/signalrclient/transport.cpp @@ -7,23 +7,12 @@ namespace signalr { - transport::transport(const logger& logger, const std::function& process_response_callback, - std::function error_callback) - : m_logger(logger), m_process_response_callback(process_response_callback), m_error_callback(error_callback) + transport::transport(const logger& logger) + : m_logger(logger) {} // Do NOT remove this destructor. Letting the compiler generate and inline the default dtor may lead to // undefinded behavior since we are using an incomplete type. More details here: http://herbsutter.com/gotw/_100/ transport::~transport() { } - - void transport::process_response(const std::string &message) - { - m_process_response_callback(message); - } - - void transport::error(const std::exception& e) - { - m_error_callback(e); - } -} \ No newline at end of file +} diff --git a/src/SignalR/clients/cpp/src/signalrclient/transport.h b/src/SignalR/clients/cpp/src/signalrclient/transport.h index 3316b2af03c1..4bb3a5643df4 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/transport.h +++ b/src/SignalR/clients/cpp/src/signalrclient/transport.h @@ -5,6 +5,7 @@ #include "pplx/pplxtasks.h" #include "signalrclient/transport_type.h" +#include "signalrclient/transfer_format.h" #include "logger.h" namespace signalr @@ -12,28 +13,21 @@ namespace signalr class transport { public: - virtual pplx::task connect(const std::string &url) = 0; - - virtual pplx::task send(const std::string &data) = 0; - - virtual pplx::task disconnect() = 0; - virtual transport_type get_transport_type() const = 0; virtual ~transport(); - protected: - transport(const logger& logger, const std::function& process_response_callback, - std::function error_callback); + virtual void start(const std::string& url, transfer_format format, std::function callback) = 0; + virtual void stop(std::function callback) = 0; + virtual void on_close(std::function callback) = 0; - void process_response(const std::string &message); - void error(const std::exception &e); + virtual void send(std::string payload, std::function callback) = 0; - logger m_logger; + virtual void on_receive(std::function callback) = 0; - private: - std::function m_process_response_callback; + protected: + transport(const logger& logger); - std::function m_error_callback; + logger m_logger; }; } diff --git a/src/SignalR/clients/cpp/src/signalrclient/transport_factory.cpp b/src/SignalR/clients/cpp/src/signalrclient/transport_factory.cpp index 125b5b3bacc9..8c74b6aeed28 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/transport_factory.cpp +++ b/src/SignalR/clients/cpp/src/signalrclient/transport_factory.cpp @@ -8,15 +8,13 @@ namespace signalr { std::shared_ptr transport_factory::create_transport(transport_type transport_type, const logger& logger, - const signalr_client_config& signalr_client_config, - std::function process_response_callback, - std::function error_callback) + const signalr_client_config& signalr_client_config) { if (transport_type == signalr::transport_type::websockets) { return websocket_transport::create( [signalr_client_config](){ return std::make_shared(signalr_client_config); }, - logger, process_response_callback, error_callback); + logger); } throw std::runtime_error("not implemented"); @@ -24,4 +22,4 @@ namespace signalr transport_factory::~transport_factory() { } -} \ No newline at end of file +} diff --git a/src/SignalR/clients/cpp/src/signalrclient/transport_factory.h b/src/SignalR/clients/cpp/src/signalrclient/transport_factory.h index a6c80df4c31a..3d55b83a9285 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/transport_factory.h +++ b/src/SignalR/clients/cpp/src/signalrclient/transport_factory.h @@ -14,10 +14,8 @@ namespace signalr { public: virtual std::shared_ptr create_transport(transport_type transport_type, const logger& logger, - const signalr_client_config& signalr_client_config, - std::function process_response_callback, - std::function error_callback); + const signalr_client_config& signalr_client_config); virtual ~transport_factory(); }; -} \ No newline at end of file +} diff --git a/src/SignalR/clients/cpp/src/signalrclient/websocket_client.h b/src/SignalR/clients/cpp/src/signalrclient/websocket_client.h deleted file mode 100644 index 9be26a27e8c3..000000000000 --- a/src/SignalR/clients/cpp/src/signalrclient/websocket_client.h +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. - -#pragma once - -#include "pplx/pplxtasks.h" - -namespace signalr -{ - class websocket_client - { - public: - virtual pplx::task connect(const std::string& url) = 0; - - virtual pplx::task send(const std::string& message) = 0; - - virtual pplx::task receive() = 0; - - virtual pplx::task close() = 0; - - virtual ~websocket_client() {}; - }; -} diff --git a/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.cpp b/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.cpp index 96812decc931..85e4f556eecd 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.cpp +++ b/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.cpp @@ -5,21 +5,21 @@ #include "websocket_transport.h" #include "logger.h" #include "signalrclient/signalr_exception.h" +#include namespace signalr { std::shared_ptr websocket_transport::create(const std::function()>& websocket_client_factory, - const logger& logger, const std::function& process_response_callback, - std::function error_callback) + const logger& logger) { return std::shared_ptr( - new websocket_transport(websocket_client_factory, logger, process_response_callback, error_callback)); + new websocket_transport(websocket_client_factory, logger)); } websocket_transport::websocket_transport(const std::function()>& websocket_client_factory, - const logger& logger, const std::function& process_response_callback, - std::function error_callback) - : transport(logger, process_response_callback, error_callback), m_websocket_client_factory(websocket_client_factory) + const logger& logger) + : transport(logger), m_websocket_client_factory(websocket_client_factory), m_close_callback([](std::exception_ptr) {}), + m_process_response_callback([](std::string, std::exception_ptr) {}) { // we use this cts to check if the receive loop is running so it should be // initially cancelled to indicate that the receive loop is not running @@ -30,7 +30,9 @@ namespace signalr { try { - disconnect().get(); + pplx::task_completion_event event; + stop([event](std::exception_ptr) { event.set(); }); + pplx::create_task(event).get(); } catch (...) // must not throw from the destructor {} @@ -41,7 +43,106 @@ namespace signalr return transport_type::websockets; } - pplx::task websocket_transport::connect(const std::string& url) + // Note that the connection assumes that the error callback won't be fired when the result is being processed. This + // may no longer be true when we replace the `receive_loop` with "on_message_received" and "on_close" events if they + // can be fired on different threads in which case we will have to lock before setting groups token and message id. + void websocket_transport::receive_loop(pplx::cancellation_token_source cts) + { + auto this_transport = shared_from_this(); + auto logger = this_transport->m_logger; + + // Passing the `std::weak_ptr` prevents from a memory leak where we would capture the shared_ptr to + // the transport in the continuation lambda and as a result as long as the loop runs the ref count would never get to + // zero. Now we capture the weak pointer and get the shared pointer only when the continuation runs so the ref count is + // incremented when the shared pointer is acquired and then decremented when it goes out of scope of the continuation. + auto weak_transport = std::weak_ptr(this_transport); + + auto websocket_client = this_transport->safe_get_websocket_client(); + + // There are two cases when we exit the loop. The first case is implicit - we pass the cancellation_token + // to `then` (note this is after the lambda body) and if the token is cancelled the continuation will not + // run at all. The second - explicit - case happens if the token gets cancelled after the continuation has + // been started in which case we just stop the loop by not scheduling another receive task. + websocket_client->receive([weak_transport, cts, logger, websocket_client](std::string message, std::exception_ptr exception) + { + if (exception != nullptr) + { + try + { + std::rethrow_exception(exception); + } + catch (const std::exception & e) + { + logger.log( + trace_level::errors, + std::string("[websocket transport] error receiving response from websocket: ") + .append(e.what())); + } + catch (...) + { + logger.log( + trace_level::errors, + std::string("[websocket transport] unknown error occurred when receiving response from websocket")); + + exception = std::make_exception_ptr(signalr_exception("unknown error")); + } + + cts.cancel(); + + std::promise promise; + websocket_client->stop([&promise](std::exception_ptr exception) + { + if (exception != nullptr) + { + promise.set_exception(exception); + } + else + { + promise.set_value(); + } + }); + + try + { + promise.get_future().get(); + } + // We prefer the outer exception bubbling up to the user + // REVIEW: log here? + catch (...) {} + + auto transport = weak_transport.lock(); + if (transport) + { + transport->m_close_callback(exception); + } + + return; + } + + auto transport = weak_transport.lock(); + if (transport) + { + transport->m_process_response_callback(message, nullptr); + + if (!cts.get_token().is_canceled()) + { + transport->receive_loop(cts); + } + } + }); + } + + std::shared_ptr websocket_transport::safe_get_websocket_client() + { + { + const std::lock_guard lock(m_websocket_client_lock); + auto websocket_client = m_websocket_client; + + return websocket_client; + } + } + + void websocket_transport::start(const std::string& url, transfer_format format, std::function callback) { web::uri uri(utility::conversions::to_string_t(url)); _ASSERTE(uri.scheme() == _XPLATSTR("ws") || uri.scheme() == _XPLATSTR("wss")); @@ -66,20 +167,21 @@ namespace signalr } pplx::cancellation_token_source receive_loop_cts; - pplx::task_completion_event connect_tce; auto transport = shared_from_this(); - websocket_client->connect(url) - .then([transport, connect_tce, receive_loop_cts](pplx::task connect_task) + websocket_client->start(url, format, [transport, receive_loop_cts, callback](std::exception_ptr exception) { try { - connect_task.get(); + if (exception != nullptr) + { + std::rethrow_exception(exception); + } transport->receive_loop(receive_loop_cts); - connect_tce.set(); + callback(nullptr); } - catch (const std::exception &e) + catch (const std::exception & e) { transport->m_logger.log( trace_level::errors, @@ -87,23 +189,15 @@ namespace signalr .append(e.what())); receive_loop_cts.cancel(); - connect_tce.set_exception(std::current_exception()); + callback(std::current_exception()); } }); m_receive_loop_cts = receive_loop_cts; - - return pplx::create_task(connect_tce); } } - pplx::task websocket_transport::send(const std::string &data) - { - // send will return a faulted task if client has disconnected - return safe_get_websocket_client()->send(data); - } - - pplx::task websocket_transport::disconnect() + void websocket_transport::stop(std::function callback) { std::shared_ptr websocket_client = nullptr; @@ -112,7 +206,8 @@ namespace signalr if (m_receive_loop_cts.get_token().is_canceled()) { - return pplx::task_from_result(); + callback(nullptr); + return; } m_receive_loop_cts.cancel(); @@ -121,126 +216,54 @@ namespace signalr } auto logger = m_logger; + auto close_callback = m_close_callback; - return websocket_client->close() - .then([logger](pplx::task close_task) - mutable { + websocket_client->stop([logger, callback, close_callback](std::exception_ptr exception) + { try { - close_task.get(); + if (exception != nullptr) + { + std::rethrow_exception(exception); + } + callback(nullptr); } - catch (const std::exception &e) + catch (const std::exception & e) { logger.log( trace_level::errors, std::string("[websocket transport] exception when closing websocket: ") .append(e.what())); + + callback(exception); } + + close_callback(exception); }); } - // Note that the connection assumes that the error callback won't be fired when the result is being processed. This - // may no longer be true when we replace the `receive_loop` with "on_message_received" and "on_close" events if they - // can be fired on different threads in which case we will have to lock before setting groups token and message id. - void websocket_transport::receive_loop(pplx::cancellation_token_source cts) + void websocket_transport::on_close(std::function callback) { - auto this_transport = shared_from_this(); - auto logger = this_transport->m_logger; - - // Passing the `std::weak_ptr` prevents from a memory leak where we would capture the shared_ptr to - // the transport in the continuation lambda and as a result as long as the loop runs the ref count would never get to - // zero. Now we capture the weak pointer and get the shared pointer only when the continuation runs so the ref count is - // incremented when the shared pointer is acquired and then decremented when it goes out of scope of the continuation. - auto weak_transport = std::weak_ptr(this_transport); + m_close_callback = callback; + } - auto websocket_client = this_transport->safe_get_websocket_client(); + void websocket_transport::on_receive(std::function callback) + { + m_process_response_callback = callback; + } - websocket_client->receive() - // There are two cases when we exit the loop. The first case is implicit - we pass the cancellation_token - // to `then` (note this is after the lambda body) and if the token is cancelled the continuation will not - // run at all. The second - explicit - case happens if the token gets cancelled after the continuation has - // been started in which case we just stop the loop by not scheduling another receive task. - .then([weak_transport, cts](std::string message) + void websocket_transport::send(std::string payload, std::function callback) + { + safe_get_websocket_client()->send(payload, [callback](std::exception_ptr exception) { - auto transport = weak_transport.lock(); - if (transport) - { - transport->process_response(message); - - if (!cts.get_token().is_canceled()) - { - transport->receive_loop(cts); - } - } - }, cts.get_token()) - // this continuation is used to observe exceptions from the previous tasks. It will run always - even if one of - // the previous continuations throws or was not scheduled due to the cancellation token being set to cancelled - .then([weak_transport, logger, websocket_client, cts](pplx::task task) - mutable { - try - { - task.get(); - } - catch (const pplx::task_canceled&) - { - cts.cancel(); - - logger.log(trace_level::info, - std::string("[websocket transport] receive task canceled.")); - } - catch (const std::exception& e) + if (exception != nullptr) { - cts.cancel(); - - logger.log( - trace_level::errors, - std::string("[websocket transport] error receiving response from websocket: ") - .append(e.what())); - - websocket_client->close() - .then([](pplx::task task) - { - try { task.get(); } - catch (...) {} - }); - - auto transport = weak_transport.lock(); - if (transport) - { - transport->error(e); - } + callback(exception); } - catch (...) + else { - cts.cancel(); - - logger.log( - trace_level::errors, - std::string("[websocket transport] unknown error occurred when receiving response from websocket")); - - websocket_client->close() - .then([](pplx::task task) - { - try { task.get(); } - catch (...) {} - }); - - auto transport = weak_transport.lock(); - if (transport) - { - transport->error(signalr_exception("unknown error")); - } + callback(nullptr); } }); } - - std::shared_ptr websocket_transport::safe_get_websocket_client() - { - { - const std::lock_guard lock(m_websocket_client_lock); - auto websocket_client = m_websocket_client; - - return websocket_client; - } - } } diff --git a/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.h b/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.h index 6176bcc35843..8dc362c696c4 100644 --- a/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.h +++ b/src/SignalR/clients/cpp/src/signalrclient/websocket_transport.h @@ -16,8 +16,7 @@ namespace signalr { public: static std::shared_ptr create(const std::function()>& websocket_client_factory, - const logger& logger, const std::function& process_response_callback, - std::function error_callback); + const logger& logger); ~websocket_transport(); @@ -25,31 +24,31 @@ namespace signalr websocket_transport& operator=(const websocket_transport&) = delete; - pplx::task connect(const std::string& url) override; + transport_type get_transport_type() const noexcept override; - pplx::task send(const std::string &data) override; + void start(const std::string& url, transfer_format format, std::function callback) override; + void stop(std::function callback) override; + void on_close(std::function callback) override; - pplx::task disconnect() override; + void send(std::string payload, std::function callback) override; - transport_type get_transport_type() const noexcept override; + void on_receive(std::function) override; private: websocket_transport(const std::function()>& websocket_client_factory, - const logger& logger, const std::function& process_response_callback, - std::function error_callback); + const logger& logger); std::function()> m_websocket_client_factory; std::shared_ptr m_websocket_client; std::mutex m_websocket_client_lock; std::mutex m_start_stop_lock; + std::function m_process_response_callback; + std::function m_close_callback; pplx::cancellation_token_source m_receive_loop_cts; void receive_loop(pplx::cancellation_token_source cts); - void handle_receive_error(const std::exception &e, pplx::cancellation_token_source cts, - logger logger, std::weak_ptr weak_transport); - std::shared_ptr safe_get_websocket_client(); }; } diff --git a/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj b/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj index 2fba0aff2f18..8639ac5c6d29 100644 --- a/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj +++ b/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj @@ -44,29 +44,32 @@ + + + + + - - @@ -76,13 +79,14 @@ + - + Create @@ -133,4 +137,4 @@ - + \ No newline at end of file diff --git a/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj.filters b/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj.filters index 08233634b386..16b4115e9b8d 100644 --- a/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj.filters +++ b/src/SignalR/clients/cpp/src/signalrclientdll/Build/VS/signalrclientdll.vcxproj.filters @@ -39,9 +39,6 @@ Header Files - - Header Files - Header Files @@ -60,9 +57,6 @@ Header Files - - Header Files - Header Files @@ -105,6 +99,21 @@ Header Files + + Header Files + + + Header Files + + + Header Files + + + Header Files + + + Header Files + @@ -128,9 +137,6 @@ Source Files - - Source Files - Source Files @@ -164,6 +170,12 @@ Source Files + + Source Files + + + Source Files + diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj b/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj index 2ce3bed5f235..e88d39671070 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj +++ b/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj @@ -45,6 +45,7 @@ + @@ -60,11 +61,12 @@ - + Create + @@ -97,4 +99,4 @@ true Flaky, due to https://github.com/aspnet/AspNetCore/issues/8421 - + \ No newline at end of file diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj.filters b/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj.filters index ff4d349b3b19..f8329224c0b3 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj.filters +++ b/src/SignalR/clients/cpp/test/signalrclienttests/Build/VS/signalrclienttests.vcxproj.filters @@ -39,6 +39,9 @@ Header Files + + Header Files + @@ -53,9 +56,6 @@ Source Files - - Source Files - Source Files @@ -98,8 +98,11 @@ Source Files - - - + + Source Files + + + Source Files + \ No newline at end of file diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/connection_impl_tests.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/connection_impl_tests.cpp index e95a78f66a7c..9da56d17d477 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/connection_impl_tests.cpp +++ b/src/SignalR/clients/cpp/test/signalrclienttests/connection_impl_tests.cpp @@ -13,13 +13,14 @@ #include "cpprest/ws_client.h" #include "signalrclient/signalr_exception.h" #include "signalrclient/web_exception.h" +#include "test_http_client.h" using namespace signalr; static std::shared_ptr create_connection(std::shared_ptr websocket_client = create_test_websocket_client(), std::shared_ptr log_writer = std::make_shared(), trace_level trace_level = trace_level::all) { - return connection_impl::create(create_uri(), trace_level, log_writer, create_test_web_request_factory(), + return connection_impl::create(create_uri(), trace_level, log_writer, create_test_http_client(), std::make_unique(websocket_client)); } @@ -34,7 +35,7 @@ TEST(connection_impl_connection_state, initial_connection_state_is_disconnected) TEST(connection_impl_start, cannot_start_non_disconnected_exception) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client); connection->start().wait(); @@ -55,12 +56,9 @@ TEST(connection_impl_start, connection_state_is_connecting_when_connection_is_be std::shared_ptr writer(std::make_shared()); auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* send function */ [](const std::string){ return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* connect function */[](const std::string&) - { - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed"))); - }); + /* receive function */ [](std::function callback) { callback("", std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* send function */ [](const std::string&, std::function callback) { callback(std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* connect function */[](const std::string&, std::function callback) { callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")))); }); auto connection = create_connection(websocket_client, writer, trace_level::errors); @@ -84,7 +82,7 @@ TEST(connection_impl_start, connection_state_is_connecting_when_connection_is_be TEST(connection_impl_start, connection_state_is_connected_when_connection_established_succesfully) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client); connection->start().get(); ASSERT_EQ(connection->get_connection_state(), connection_state::connected); @@ -92,14 +90,14 @@ TEST(connection_impl_start, connection_state_is_connected_when_connection_establ TEST(connection_impl_start, connection_state_is_disconnected_when_connection_cannot_be_established) { - auto web_request_factory = std::make_unique([](const std::string&) -> std::unique_ptr + auto http_client = std::make_unique([](const std::string&, http_request) { - return std::unique_ptr(new web_request_stub((unsigned short)404, "Bad request", "")); + return http_response { 404, "" }; }); auto connection = connection_impl::create(create_uri(), trace_level::none, std::make_shared(), - std::move(web_request_factory), std::make_unique()); + std::move(http_client), std::make_unique()); try { @@ -116,9 +114,9 @@ TEST(connection_impl_start, throws_for_invalid_uri) std::shared_ptr writer(std::make_shared()); auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); - auto connection = connection_impl::create(":1\t ä bad_uri&a=b", trace_level::errors, writer, create_test_web_request_factory(), std::make_unique(websocket_client)); + auto connection = connection_impl::create(":1\t ä bad_uri&a=b", trace_level::errors, writer, create_test_http_client(), std::make_unique(websocket_client)); try { @@ -139,15 +137,15 @@ TEST(connection_impl_start, start_sets_id_query_string) std::string query_string; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* send function */ [](const std::string&) { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* connect function */[&query_string](const std::string& url) + /* receive function */ [](std::function callback) { callback("", std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* send function */ [](const std::string&, std::function callback) { callback(std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* connect function */[&query_string](const std::string& url, std::function callback) { query_string = utility::conversions::to_utf8string(url.substr(url.find('?') + 1)); - return pplx::task_from_exception(web::websockets::client::websocket_exception("connecting failed")); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception("connecting failed"))); }); - auto connection = connection_impl::create(create_uri(""), trace_level::errors, writer, create_test_web_request_factory(), std::make_unique(websocket_client)); + auto connection = connection_impl::create(create_uri(""), trace_level::errors, writer, create_test_http_client(), std::make_unique(websocket_client)); try { @@ -166,15 +164,15 @@ TEST(connection_impl_start, start_appends_id_query_string) std::string query_string; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* send function */ [](const std::string) { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* connect function */[&query_string](const std::string& url) + /* receive function */ [](std::function callback) { callback("", std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* send function */ [](const std::string&, std::function callback) { callback(std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* connect function */[&query_string](const std::string& url, std::function callback) { query_string = utility::conversions::to_utf8string(url.substr(url.find('?') + 1)); - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed"))); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")))); }); - auto connection = connection_impl::create(create_uri("a=b&c=d"), trace_level::errors, writer, create_test_web_request_factory(), std::make_unique(websocket_client)); + auto connection = connection_impl::create(create_uri("a=b&c=d"), trace_level::errors, writer, create_test_http_client(), std::make_unique(websocket_client)); try { @@ -191,14 +189,14 @@ TEST(connection_impl_start, start_logs_exceptions) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string&) -> std::unique_ptr + auto http_client = std::make_unique([](const std::string&, http_request) { - return std::unique_ptr(new web_request_stub((unsigned short)404, "Bad request", "")); + return http_response{ 404, "" }; }); auto connection = connection_impl::create(create_uri(), trace_level::errors, writer, - std::move(web_request_factory), std::make_unique()); + std::move(http_client), std::make_unique()); try { @@ -211,20 +209,20 @@ TEST(connection_impl_start, start_logs_exceptions) ASSERT_FALSE(log_entries.empty()); auto entry = remove_date_from_log_entry(log_entries[0]); - ASSERT_EQ("[error ] connection could not be started due to: web exception - 404 Bad request\n", entry); + ASSERT_EQ("[error ] connection could not be started due to: negotiate failed with status code 404\n", entry); } TEST(connection_impl_start, start_propagates_exceptions_from_negotiate) { - auto web_request_factory = std::make_unique([](const std::string&) -> std::unique_ptr + auto http_client = std::make_unique([](const std::string&, http_request) { - return std::unique_ptr(new web_request_stub((unsigned short)404, "Bad request", "")); + return http_response{ 404, "" }; }); auto connection = connection_impl::create(create_uri(), trace_level::none, std::make_shared(), - std::move(web_request_factory), std::make_unique()); + std::move(http_client), std::make_unique()); try { @@ -233,7 +231,7 @@ TEST(connection_impl_start, start_propagates_exceptions_from_negotiate) } catch (const std::exception &e) { - ASSERT_EQ(_XPLATSTR("web exception - 404 Bad request"), utility::conversions::to_string_t(e.what())); + ASSERT_STREQ("negotiate failed with status code 404", e.what()); } } @@ -242,11 +240,11 @@ TEST(connection_impl_start, start_fails_if_transport_connect_throws) std::shared_ptr writer(std::make_shared()); auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* send function */ [](const std::string){ return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* connect function */[](const std::string&) + /* receive function */ [](std::function callback) { callback("", std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* send function */ [](const std::string&, std::function callback){ callback(std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* connect function */[](const std::string&, std::function callback) { - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed"))); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")))); }); auto connection = create_connection(websocket_client, writer, trace_level::errors); @@ -274,10 +272,10 @@ TEST(connection_impl_send, send_fails_if_transport_fails_when_receiving_messages { std::shared_ptr writer(std::make_shared()); - auto websocket_client = create_test_websocket_client([]() { return pplx::task_from_result(std::string("")); }, - /* send function */ [](const std::string &) + auto websocket_client = create_test_websocket_client([](std::function callback) { callback("", nullptr); }, + /* send function */ [](const std::string &, std::function callback) { - return pplx::task_from_exception(std::runtime_error("send error")); + callback(std::make_exception_ptr(std::runtime_error("send error"))); }); auto connection = create_connection(websocket_client, writer, trace_level::errors); @@ -307,29 +305,29 @@ TEST(connection_impl_start, start_fails_if_negotiate_request_fails) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string&) + auto http_client = std::make_unique([](const std::string&, http_request) { - return std::unique_ptr(new web_request_stub((unsigned short)400, "Bad Request")); + return http_response{ 400, "" }; }); auto websocket_client = std::make_shared(); - websocket_client->set_receive_function([]()->pplx::task + websocket_client->set_receive_function([](std::function callback) { - return pplx::task_from_result(std::string("{ }\x1e")); + callback("{ }\x1e", nullptr); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { connection->start().get(); ASSERT_TRUE(false); // exception not thrown } - catch (const web_exception &e) + catch (const std::exception &e) { - ASSERT_STREQ("web exception - 400 Bad Request", e.what()); + ASSERT_STREQ("negotiate failed with status code 400", e.what()); } } @@ -337,26 +335,31 @@ TEST(connection_impl_start, start_fails_if_negotiate_response_has_error) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { auto response_body = url.find("/negotiate") != std::string::npos ? "{ \"error\": \"bad negotiate\" }" : ""; - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); pplx::task_completion_event tce; auto websocket_client = std::make_shared(); - websocket_client->set_connect_function([tce](const std::string&) mutable + websocket_client->set_connect_function([tce](const std::string&, std::function callback) { - return pplx::task(tce); + pplx::task(tce) + .then([callback](pplx::task prev_task) + { + prev_task.get(); + callback(nullptr); + }); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { @@ -367,32 +370,39 @@ TEST(connection_impl_start, start_fails_if_negotiate_response_has_error) { ASSERT_STREQ("bad negotiate", e.what()); } + + tce.set(); } TEST(connection_impl_start, start_fails_if_negotiate_response_does_not_have_websockets) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { auto response_body = url.find("/negotiate") != std::string::npos ? "{ \"availableTransports\": [ { \"transport\": \"ServerSentEvents\", \"transferFormats\": [ \"Text\" ] } ] }" : ""; - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); pplx::task_completion_event tce; auto websocket_client = std::make_shared(); - websocket_client->set_connect_function([tce](const std::string&) mutable + websocket_client->set_connect_function([tce](const std::string&, std::function callback) { - return pplx::task(tce); + pplx::task(tce) + .then([callback](pplx::task prev_task) + { + prev_task.get(); + callback(nullptr); + }); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { @@ -403,32 +413,39 @@ TEST(connection_impl_start, start_fails_if_negotiate_response_does_not_have_webs { ASSERT_STREQ("The server does not support WebSockets which is currently the only transport supported by this client.", e.what()); } + + tce.set(); } TEST(connection_impl_start, start_fails_if_negotiate_response_does_not_have_transports) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { auto response_body = url.find("/negotiate") != std::string::npos ? "{ \"availableTransports\": [ ] }" : ""; - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); pplx::task_completion_event tce; auto websocket_client = std::make_shared(); - websocket_client->set_connect_function([tce](const std::string&) mutable + websocket_client->set_connect_function([tce](const std::string&, std::function callback) { - return pplx::task(tce); + pplx::task(tce) + .then([callback](pplx::task prev_task) + { + prev_task.get(); + callback(nullptr); + }); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { @@ -439,32 +456,39 @@ TEST(connection_impl_start, start_fails_if_negotiate_response_does_not_have_tran { ASSERT_STREQ("The server does not support WebSockets which is currently the only transport supported by this client.", e.what()); } + + tce.set(); } TEST(connection_impl_start, start_fails_if_negotiate_response_is_invalid) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { auto response_body = url.find("/negotiate") != std::string::npos ? "{ \"availableTransports\": [ " : ""; - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); pplx::task_completion_event tce; auto websocket_client = std::make_shared(); - websocket_client->set_connect_function([tce](const std::string&) mutable + websocket_client->set_connect_function([tce](const std::string&, std::function callback) { - return pplx::task(tce); + pplx::task(tce) + .then([callback](pplx::task prev_task) + { + prev_task.get(); + callback(nullptr); + }); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { @@ -475,13 +499,15 @@ TEST(connection_impl_start, start_fails_if_negotiate_response_is_invalid) { ASSERT_STREQ("* Line 1, Column 28 Syntax error: Malformed token", e.what()); } + + tce.set(); } TEST(connection_impl_start, negotiate_follows_redirect) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { std::string response_body = ""; if (url.find("/negotiate") != std::string::npos) @@ -497,21 +523,21 @@ TEST(connection_impl_start, negotiate_follows_redirect) } } - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); auto websocket_client = std::make_shared(); std::string connectUrl; - websocket_client->set_connect_function([&connectUrl](const std::string& url) + websocket_client->set_connect_function([&connectUrl](const std::string& url, std::function callback) { connectUrl = url; - return pplx::task_from_result(); + callback(nullptr); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); connection->start().get(); @@ -523,7 +549,7 @@ TEST(connection_impl_start, negotiate_redirect_uses_accessToken) std::shared_ptr writer(std::make_shared()); std::string accessToken; - auto web_request_factory = std::make_unique([&accessToken](const std::string& url) + auto http_client = std::make_unique([&accessToken](const std::string& url, http_request request) { std::string response_body = ""; if (url.find("/negotiate") != std::string::npos) @@ -539,26 +565,22 @@ TEST(connection_impl_start, negotiate_redirect_uses_accessToken) } } - auto request = new web_request_stub((unsigned short)200, "OK", response_body); - request->on_get_response = [&accessToken](web_request_stub& stub) - { - accessToken = utility::conversions::to_utf8string(stub.m_signalr_client_config.get_http_headers()[_XPLATSTR("Authorization")]); - }; - return std::unique_ptr(request); + accessToken = request.headers["Authorization"]; + return http_response{ 200, response_body }; }); auto websocket_client = std::make_shared(); std::string connectUrl; - websocket_client->set_connect_function([&connectUrl](const std::string& url) + websocket_client->set_connect_function([&connectUrl](const std::string& url, std::function callback) { connectUrl = url; - return pplx::task_from_result(); + callback(nullptr); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); connection->start().get(); @@ -570,7 +592,7 @@ TEST(connection_impl_start, negotiate_fails_after_too_many_redirects) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { std::string response_body = ""; if (url.find("/negotiate") != std::string::npos) @@ -579,14 +601,14 @@ TEST(connection_impl_start, negotiate_fails_after_too_many_redirects) response_body = "{ \"url\": \"http://redirected\" }"; } - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); auto websocket_client = std::make_shared(); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { @@ -602,7 +624,7 @@ TEST(connection_impl_start, negotiate_fails_if_ProtocolVersion_in_response) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { std::string response_body = ""; if (url.find("/negotiate") != std::string::npos) @@ -610,14 +632,14 @@ TEST(connection_impl_start, negotiate_fails_if_ProtocolVersion_in_response) response_body = "{\"ProtocolVersion\" : \"\" }"; } - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); auto websocket_client = std::make_shared(); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { @@ -634,7 +656,7 @@ TEST(connection_impl_start, negotiate_redirect_does_not_overwrite_url) std::shared_ptr writer(std::make_shared()); int redirectCount = 0; - auto web_request_factory = std::make_unique([&redirectCount](const std::string& url) + auto http_client = std::make_unique([&redirectCount](const std::string& url, http_request) { std::string response_body = ""; if (url.find("/negotiate") != std::string::npos) @@ -651,14 +673,14 @@ TEST(connection_impl_start, negotiate_redirect_does_not_overwrite_url) } } - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); auto websocket_client = std::make_shared(); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); connection->start().get(); ASSERT_EQ(1, redirectCount); @@ -673,15 +695,15 @@ TEST(connection_impl_start, negotiate_redirect_uses_own_query_string) std::string query_string; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* send function */ [](const std::string) { return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* connect function */[&query_string](const std::string& url) + /* receive function */ [](std::function callback) { callback("", std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* send function */ [](const std::string&, std::function callback) { callback(std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* connect function */[&query_string](const std::string& url, std::function callback) { query_string = url.substr(url.find('?') + 1); - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed"))); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")))); }); - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { std::string response_body = ""; if (url.find("/negotiate") != std::string::npos) @@ -697,10 +719,10 @@ TEST(connection_impl_start, negotiate_redirect_uses_own_query_string) } } - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); - auto connection = connection_impl::create(create_uri("a=b&c=d"), trace_level::errors, writer, std::move(web_request_factory), std::make_unique(websocket_client)); + auto connection = connection_impl::create(create_uri("a=b&c=d"), trace_level::errors, writer, std::move(http_client), std::make_unique(websocket_client)); try { @@ -717,18 +739,23 @@ TEST(connection_impl_start, start_fails_if_connect_request_times_out) { std::shared_ptr writer(std::make_shared()); - auto web_request_factory = create_test_web_request_factory(); + auto http_client = create_test_http_client(); pplx::task_completion_event tce; auto websocket_client = std::make_shared(); - websocket_client->set_connect_function([tce](const std::string&) mutable + websocket_client->set_connect_function([tce](const std::string&, std::function callback) { - return pplx::task(tce); + pplx::task(tce) + .then([callback](pplx::task prev_task) + { + prev_task.get(); + callback(nullptr); + }); }); auto connection = connection_impl::create(create_uri(), trace_level::messages, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); try { @@ -739,6 +766,8 @@ TEST(connection_impl_start, start_fails_if_connect_request_times_out) { ASSERT_STREQ("transport timed out when trying to connect", e.what()); } + + tce.set(); } TEST(connection_impl_process_response, process_response_logs_messages) @@ -746,10 +775,10 @@ TEST(connection_impl_process_response, process_response_logs_messages) std::shared_ptr writer(std::make_shared()); auto wait_receive = std::make_shared(); auto websocket_client = create_test_websocket_client( - /* receive function */ [wait_receive]() + /* receive function */ [wait_receive](std::function callback) { wait_receive->set(); - return pplx::task_from_result(std::string("{ }")); + callback("{ }", nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::messages); @@ -769,11 +798,11 @@ TEST(connection_impl_send, message_sent) std::string actual_message; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }, - /* send function */ [&actual_message](const std::string& message) + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }, + /* send function */ [&actual_message](const std::string& message, std::function callback) { actual_message = message; - return pplx::task_from_result(); + callback(nullptr); }); auto connection = create_connection(websocket_client); @@ -809,13 +838,13 @@ TEST(connection_impl_send, exceptions_from_send_logged_and_propagated) { std::shared_ptr writer(std::make_shared()); auto websocket_client = create_test_websocket_client( - /* receive function */ []() + /* receive function */ [](std::function callback) { - return pplx::task_from_result(std::string("{}")); + callback("{}", nullptr); }, - /* send function */ [](const std::string&) + /* send function */ [](const std::string&, std::function callback) { - return pplx::task_from_exception(std::runtime_error("error")); + callback(std::make_exception_ptr(std::runtime_error("error"))); }); auto connection = create_connection(websocket_client, writer, trace_level::errors); @@ -846,7 +875,7 @@ TEST(connection_impl_set_message_received, callback_invoked_when_message_receive { int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -857,7 +886,7 @@ TEST(connection_impl_set_message_received, callback_invoked_when_message_receive call_number = std::min(call_number + 1, 2); - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto connection = create_connection(websocket_client); @@ -889,7 +918,7 @@ TEST(connection_impl_set_message_received, exception_from_callback_caught_and_lo { int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -900,7 +929,7 @@ TEST(connection_impl_set_message_received, exception_from_callback_caught_and_lo call_number = std::min(call_number + 1, 2); - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); std::shared_ptr writer(std::make_shared()); @@ -935,7 +964,7 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh { int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -946,7 +975,7 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh call_number = std::min(call_number + 1, 2); - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); std::shared_ptr writer(std::make_shared()); @@ -980,7 +1009,7 @@ TEST(connection_impl_set_message_received, non_std_exception_from_callback_caugh void can_be_set_only_in_disconnected_state(std::function callback, const char* expected_exception_message) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client); connection->start().get(); @@ -1030,14 +1059,15 @@ TEST(connection_impl_stop, stopping_disconnecting_connection_returns_cancelled_t auto writer = std::shared_ptr{std::make_shared()}; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }, - /* send function */ [](const std::string){ return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* connect function */ [&close_event](const std::string&) { return pplx::task_from_result(); }, - /* close function */ [&close_event]() + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }, + /* send function */ [](const std::string, std::function callback){ callback(std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* connect function */ [&close_event](const std::string&, std::function callback) { callback(nullptr); }, + /* close function */ [&close_event](std::function callback) { - return pplx::create_task([&close_event]() + pplx::create_task([&close_event, callback]() { close_event.wait(); + callback(nullptr); }); }); @@ -1072,7 +1102,7 @@ TEST(connection_impl_stop, can_start_and_stop_connection) auto writer = std::shared_ptr{std::make_shared()}; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::state_changes); connection->start() @@ -1095,7 +1125,7 @@ TEST(connection_impl_stop, can_start_and_stop_connection_multiple_times) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::state_changes); connection->start() @@ -1137,10 +1167,10 @@ TEST(connection_impl_stop, dtor_stops_the_connection) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() + /* receive function */ [](std::function callback) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); - return pplx::task_from_result(std::string("{ }\x1e")); + callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::state_changes); @@ -1170,10 +1200,10 @@ TEST(connection_impl_stop, stop_cancels_ongoing_start_request) auto disconnect_completed_event = std::make_shared(); auto websocket_client = create_test_websocket_client( - /* receive function */ [disconnect_completed_event]() + /* receive function */ [disconnect_completed_event](std::function callback) { disconnect_completed_event->wait(); - return pplx::task_from_result(std::string("{ }\x1e")); + callback("{ }\x1e", nullptr); }); auto writer = std::shared_ptr{std::make_shared()}; @@ -1207,7 +1237,7 @@ TEST(connection_impl_stop, stop_cancels_ongoing_start_request) TEST(connection_impl_stop, ongoing_start_request_canceled_if_connection_stopped_before_init_message_received) { - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { auto response_body = url.find("/negotiate") != std::string::npos @@ -1215,17 +1245,17 @@ TEST(connection_impl_stop, ongoing_start_request_canceled_if_connection_stopped_ "\"availableTransports\" : [ { \"transport\": \"WebSockets\", \"transferFormats\": [ \"Text\", \"Binary\" ] } ] }" : ""; - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; }); - auto websocket_client = create_test_websocket_client(/*receive function*/ []() + auto websocket_client = create_test_websocket_client(/*receive function*/ [](std::function callback) { - return pplx::task_from_result("{}"); + callback("{}", nullptr); }); auto writer = std::shared_ptr{std::make_shared()}; auto connection = connection_impl::create(create_uri(), trace_level::all, writer, - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); auto start_task = connection->start(); connection->stop().get(); @@ -1253,7 +1283,7 @@ TEST(connection_impl_stop, ongoing_start_request_canceled_if_connection_stopped_ TEST(connection_impl_stop, stop_invokes_disconnected_callback) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client); auto disconnected_invoked = false; @@ -1274,7 +1304,7 @@ TEST(connection_impl_stop, std_exception_for_disconnected_callback_caught_and_lo int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -1284,7 +1314,7 @@ TEST(connection_impl_stop, std_exception_for_disconnected_callback_caught_and_lo call_number = std::min(call_number + 1, 1); - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::errors); @@ -1307,7 +1337,7 @@ TEST(connection_impl_stop, exception_for_disconnected_callback_caught_and_logged int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -1317,7 +1347,7 @@ TEST(connection_impl_stop, exception_for_disconnected_callback_caught_and_logged call_number = std::min(call_number + 1, 1); - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::errors); @@ -1338,7 +1368,7 @@ TEST(connection_impl_config, custom_headers_set_in_requests) { auto writer = std::shared_ptr{std::make_shared()}; - auto web_request_factory = std::make_unique([](const std::string& url) + auto http_client = std::make_unique([](const std::string& url, http_request) { auto response_body = url.find("/negotiate") != std::string::npos @@ -1346,23 +1376,23 @@ TEST(connection_impl_config, custom_headers_set_in_requests) "\"availableTransports\" : [ { \"transport\": \"WebSockets\", \"transferFormats\": [ \"Text\", \"Binary\" ] } ] }" : ""; - auto request = new web_request_stub((unsigned short)200, "OK", response_body); + /*auto request = new web_request_stub((unsigned short)200, "OK", response_body); request->on_get_response = [](web_request_stub& request) { auto http_headers = request.m_signalr_client_config.get_http_headers(); ASSERT_EQ(1U, http_headers.size()); ASSERT_EQ(_XPLATSTR("42"), http_headers[_XPLATSTR("Answer")]); - }; + };*/ - return std::unique_ptr(request); + return http_response{ 200, response_body }; }); auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = connection_impl::create(create_uri(), trace_level::state_changes, - writer, std::move(web_request_factory), std::make_unique(websocket_client)); + writer, std::move(http_client), std::make_unique(websocket_client)); signalr::signalr_client_config signalr_client_config{}; auto http_headers = signalr_client_config.get_http_headers(); @@ -1393,7 +1423,7 @@ TEST(connection_impl_change_state, change_state_logs) { std::shared_ptr writer(std::make_shared()); auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::state_changes); connection->start().wait(); @@ -1410,11 +1440,11 @@ TEST(connection_id, connection_id_is_set_if_start_fails_but_negotiate_request_su std::shared_ptr writer(std::make_shared()); auto websocket_client = create_test_websocket_client( - /* receive function */ [](){ return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* send function */ [](const std::string){ return pplx::task_from_exception(std::runtime_error("should not be invoked")); }, - /* connect function */[](const std::string&) + /* receive function */ [](std::function callback){ callback("", std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* send function */ [](const std::string, std::function callback){ callback(std::make_exception_ptr(std::runtime_error("should not be invoked"))); }, + /* connect function */[](const std::string&, std::function callback) { - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed"))); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")))); }); auto connection = create_connection(websocket_client, writer, trace_level::errors); @@ -1443,7 +1473,7 @@ TEST(connection_id, can_get_connection_id_when_connection_in_connected_state) auto writer = std::shared_ptr{ std::make_shared() }; auto websocket_client = create_test_websocket_client( - /* receive function */ [](){ return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback){ callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::state_changes); std::string connection_id; @@ -1462,7 +1492,7 @@ TEST(connection_id, can_get_connection_id_after_connection_has_stopped) auto writer = std::shared_ptr{ std::make_shared() }; auto websocket_client = create_test_websocket_client( - /* receive function */ [](){ return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback){ callback("{ }\x1e", nullptr); }); auto connection = create_connection(websocket_client, writer, trace_level::state_changes); connection->start() @@ -1481,9 +1511,9 @@ TEST(connection_id, connection_id_reset_when_starting_connection) auto writer = std::shared_ptr{ std::make_shared() }; auto websocket_client = create_test_websocket_client( - /* receive function */ [](){ return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback){ callback("{ }\x1e", nullptr); }); - auto web_request_factory = std::make_unique([&fail_http_requests](const std::string &url) -> std::unique_ptr + auto http_client = std::make_unique([&fail_http_requests](const std::string& url, http_request) { if (!fail_http_requests) { auto response_body = @@ -1492,15 +1522,15 @@ TEST(connection_id, connection_id_reset_when_starting_connection) "\"availableTransports\" : [ { \"transport\": \"WebSockets\", \"transferFormats\": [ \"Text\", \"Binary\" ] } ] }" : ""; - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response{ 200, response_body }; } - return std::unique_ptr(new web_request_stub((unsigned short)500, "Internal Server Error", "")); + return http_response{ 500, "" }; }); auto connection = connection_impl::create(create_uri(), trace_level::none, std::make_shared(), - std::move(web_request_factory), std::make_unique(websocket_client)); + std::move(http_client), std::make_unique(websocket_client)); connection->start() .then([connection]() diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/hub_connection_impl_tests.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/hub_connection_impl_tests.cpp index 20b1436da568..8739dc38f14c 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/hub_connection_impl_tests.cpp +++ b/src/SignalR/clients/cpp/test/signalrclienttests/hub_connection_impl_tests.cpp @@ -4,7 +4,7 @@ #include "stdafx.h" #include "test_utils.h" #include "test_transport_factory.h" -#include "test_web_request_factory.h" +#include "test_http_client.h" #include "hub_connection_impl.h" #include "trace_log_writer.h" #include "memory_log_writer.h" @@ -17,7 +17,7 @@ std::shared_ptr create_hub_connection(std::shared_ptr log_writer = std::make_shared(), trace_level trace_level = trace_level::all) { return hub_connection_impl::create(create_uri(), trace_level, log_writer, - create_test_web_request_factory(), std::make_unique(websocket_client)); + create_test_http_client(), std::make_unique(websocket_client)); } TEST(url, negotiate_appended_to_url) @@ -27,14 +27,14 @@ TEST(url, negotiate_appended_to_url) for (const auto& base_url : base_urls) { std::string requested_url; - auto web_request_factory = std::make_unique([&requested_url](const std::string& url) + auto http_client = std::make_unique([&requested_url](const std::string& url, http_request) { requested_url = url; - return std::unique_ptr(new web_request_stub((unsigned short)404, "Bad request", "")); + return http_response{ 404, "" }; }); auto hub_connection = hub_connection_impl::create(base_url, trace_level::none, - std::make_shared(), std::move(web_request_factory), + std::make_shared(), std::move(http_client), std::make_unique(create_test_websocket_client())); try @@ -50,7 +50,7 @@ TEST(url, negotiate_appended_to_url) TEST(start, start_starts_connection) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client); hub_connection->start().get(); @@ -62,8 +62,8 @@ TEST(start, start_sends_handshake) { auto message = std::make_shared(); auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }, - /* send function */ [message](const std::string& msg) { *message = msg; return pplx::task_from_result(); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }, + /* send function */ [message](const std::string& msg, std::function callback) { *message = msg; callback(nullptr); }); auto hub_connection = create_hub_connection(websocket_client); hub_connection->start().get(); @@ -78,11 +78,11 @@ TEST(start, start_waits_for_handshake_response) pplx::task_completion_event tce; pplx::task_completion_event tceWaitForSend; auto websocket_client = create_test_websocket_client( - /* receive function */ [tce, tceWaitForSend]() + /* receive function */ [tce, tceWaitForSend](std::function callback) { tceWaitForSend.set(); pplx::task(tce).get(); - return pplx::task_from_result(std::string("{ }\x1e")); + callback("{ }\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -98,7 +98,7 @@ TEST(start, start_waits_for_handshake_response) TEST(start, start_fails_for_handshake_response_with_error) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{\"error\":\"bad things\"}\x1e")); }); + /* receive function */ [](std::function callback) { callback("{\"error\":\"bad things\"}\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client); try @@ -119,11 +119,15 @@ TEST(start, start_fails_if_stop_called_before_handshake_response) pplx::task_completion_event tce; pplx::task_completion_event tceWaitForSend; auto websocket_client = create_test_websocket_client( - /* receive function */ [tce]() { return pplx::task(tce); }, - /* send function */ [tceWaitForSend](const std::string &) + /* receive function */ [tce](std::function callback) + { + auto str = pplx::task(tce).get(); + callback(str, nullptr); + }, + /* send function */ [tceWaitForSend](const std::string &, std::function callback) { tceWaitForSend.set(); - return pplx::task_from_result(); + callback(nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -147,7 +151,7 @@ TEST(start, start_fails_if_stop_called_before_handshake_response) TEST(stop, stop_stops_connection) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client); hub_connection->start().get(); @@ -159,7 +163,7 @@ TEST(stop, stop_stops_connection) TEST(stop, disconnected_callback_called_when_hub_connection_stops) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client); auto disconnected_invoked = false; @@ -177,7 +181,7 @@ TEST(stop, connection_stopped_when_going_out_of_scope) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client, writer, trace_level::state_changes); hub_connection->start().get(); @@ -205,7 +209,7 @@ TEST(stop, stop_cancels_pending_callbacks) { int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -218,7 +222,7 @@ TEST(stop, stop_cancels_pending_callbacks) call_number++; } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -241,7 +245,7 @@ TEST(stop, pending_callbacks_finished_if_hub_connections_goes_out_of_scope) { int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -254,7 +258,7 @@ TEST(stop, pending_callbacks_finished_if_hub_connections_goes_out_of_scope) call_number++; } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); pplx::task t; @@ -280,7 +284,7 @@ TEST(hub_invocation, hub_connection_invokes_users_code_on_hub_invocations) { int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number]() + /* receive function */ [call_number](std::function callback) mutable { std::string responses[] { @@ -290,7 +294,7 @@ TEST(hub_invocation, hub_connection_invokes_users_code_on_hub_invocations) call_number = std::min(call_number + 1, 1); - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -315,16 +319,17 @@ TEST(send, creates_correct_payload) bool handshakeReceived = false; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }, - /* send function */[&payload, &handshakeReceived](const std::string& m) + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }, + /* send function */[&payload, &handshakeReceived](const std::string& m, std::function callback) { if (handshakeReceived) { payload = m; - return pplx::task_from_result(); + callback(nullptr); + return; } handshakeReceived = true; - return pplx::task_from_result(); + callback(nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -341,7 +346,7 @@ TEST(send, does_not_wait_for_server_response) pplx::task_completion_event waitForSend; auto websocket_client = create_test_websocket_client( - /* receive function */ [waitForSend, call_number]() mutable + /* receive function */ [waitForSend, call_number](std::function callback) mutable { std::string responses[] { @@ -356,7 +361,7 @@ TEST(send, does_not_wait_for_server_response) pplx::task(waitForSend).get(); } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -373,16 +378,17 @@ TEST(invoke, creates_correct_payload) bool handshakeReceived = false; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }, - /* send function */[&payload, &handshakeReceived](const std::string& m) + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }, + /* send function */[&payload, &handshakeReceived](const std::string& m, std::function callback) { if (handshakeReceived) { payload = m; - return pplx::task_from_exception(std::runtime_error("error")); + callback(std::make_exception_ptr(std::runtime_error("error"))); + return; } handshakeReceived = true; - return pplx::task_from_result(); + callback(nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -404,15 +410,16 @@ TEST(invoke, callback_not_called_if_send_throws) { bool handshakeReceived = false; auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }, - /* send function */[handshakeReceived](const std::string&) mutable + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }, + /* send function */[handshakeReceived](const std::string&, std::function callback) mutable { if (handshakeReceived) { - return pplx::task_from_exception(std::runtime_error("error")); + callback(std::make_exception_ptr(std::runtime_error("error"))); + return; } handshakeReceived = true; - return pplx::task_from_result(); + callback(nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -440,7 +447,7 @@ TEST(invoke, invoke_returns_value_returned_from_the_server) int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number, callback_registered_event]() + /* receive function */ [call_number, callback_registered_event](std::function callback) mutable { std::string responses[] { @@ -455,7 +462,7 @@ TEST(invoke, invoke_returns_value_returned_from_the_server) callback_registered_event->wait(); } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -476,7 +483,7 @@ TEST(invoke, invoke_propagates_errors_from_server_as_hub_exceptions) int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number, callback_registered_event]() + /* receive function */ [call_number, callback_registered_event](std::function callback) mutable { std::string responses[] { @@ -491,7 +498,7 @@ TEST(invoke, invoke_propagates_errors_from_server_as_hub_exceptions) callback_registered_event->wait(); } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -519,7 +526,7 @@ TEST(invoke, unblocks_task_when_server_completes_call) int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number, callback_registered_event]() + /* receive function */ [call_number, callback_registered_event](std::function callback) mutable { std::string responses[] { @@ -534,7 +541,7 @@ TEST(invoke, unblocks_task_when_server_completes_call) callback_registered_event->wait(); } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -557,7 +564,7 @@ TEST(receive, logs_if_callback_for_given_id_not_found) int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number, message_received_event, handshake_sent]() + /* receive function */ [call_number, message_received_event, handshake_sent](std::function callback) mutable { std::string responses[] { @@ -575,12 +582,12 @@ TEST(receive, logs_if_callback_for_given_id_not_found) message_received_event->set(); } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }, - [handshake_sent](const std::string&) + [handshake_sent](const std::string&, std::function callback) { handshake_sent->set(); - return pplx::task_from_result(); + callback(nullptr); }); std::shared_ptr writer(std::make_shared()); @@ -602,7 +609,7 @@ TEST(invoke_void, invoke_creates_runtime_error) int call_number = -1; auto websocket_client = create_test_websocket_client( - /* receive function */ [call_number, callback_registered_event]() + /* receive function */ [call_number, callback_registered_event](std::function callback) mutable { std::string responses[] { @@ -617,7 +624,7 @@ TEST(invoke_void, invoke_creates_runtime_error) callback_registered_event->wait(); } - return pplx::task_from_result(responses[call_number]); + callback(responses[call_number], nullptr); }); auto hub_connection = create_hub_connection(websocket_client); @@ -643,7 +650,7 @@ TEST(invoke_void, invoke_creates_runtime_error) TEST(connection_id, can_get_connection_id) { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client); ASSERT_EQ("", hub_connection->get_connection_id()); @@ -692,7 +699,7 @@ TEST(on, cannot_register_handler_if_connection_not_in_disconnected_state) try { auto websocket_client = create_test_websocket_client( - /* receive function */ []() { return pplx::task_from_result(std::string("{ }\x1e")); }); + /* receive function */ [](std::function callback) { callback("{ }\x1e", nullptr); }); auto hub_connection = create_hub_connection(websocket_client); hub_connection->start().get(); diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/request_sender_tests.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/negotiate_tests.cpp similarity index 54% rename from src/SignalR/clients/cpp/test/signalrclienttests/request_sender_tests.cpp rename to src/SignalR/clients/cpp/test/signalrclienttests/negotiate_tests.cpp index d332848f7729..bc69936b33b2 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/request_sender_tests.cpp +++ b/src/SignalR/clients/cpp/test/signalrclienttests/negotiate_tests.cpp @@ -2,46 +2,42 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. #include "stdafx.h" -#include "cpprest/details/basic_types.h" -#include "signalrclient/web_exception.h" -#include "request_sender.h" -#include "web_request_stub.h" -#include "test_web_request_factory.h" -#include "signalrclient/signalr_exception.h" +#include "negotiate.h" +#include "test_http_client.h" using namespace signalr; -TEST(request_sender_negotiate, request_created_with_correct_url) +TEST(negotiate, request_created_with_correct_url) { std::string requested_url; - auto request_factory = test_web_request_factory([&requested_url](const std::string& url) -> std::unique_ptr + auto http_client = test_http_client([&requested_url](const std::string& url, http_request request) { std::string response_body( "{ \"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", " "\"availableTransports\" : [] }"); requested_url = url; - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response(200, response_body); }); - request_sender::negotiate(request_factory, "http://fake/signalr").get(); + negotiate::negotiate(http_client, "http://fake/signalr").get(); ASSERT_EQ("http://fake/signalr/negotiate", requested_url); } -TEST(request_sender_negotiate, negotiation_request_sent_and_response_serialized) +TEST(negotiate, negotiation_request_sent_and_response_serialized) { - auto request_factory = test_web_request_factory([](const std::string&) -> std::unique_ptr + auto request_factory = test_http_client([](const std::string&, http_request request) { std::string response_body( "{\"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", " "\"availableTransports\" : [ { \"transport\": \"WebSockets\", \"transferFormats\": [ \"Text\", \"Binary\" ] }," "{ \"transport\": \"ServerSentEvents\", \"transferFormats\": [ \"Text\" ] } ] }"); - return std::unique_ptr(new web_request_stub((unsigned short)200, "OK", response_body)); + return http_response(200, response_body); }); - auto response = request_sender::negotiate(request_factory, "http://fake/signalr").get(); + auto response = negotiate::negotiate(request_factory, "http://fake/signalr").get(); ASSERT_EQ("f7707523-307d-4cba-9abf-3eef701241e8", response.connectionId); ASSERT_EQ(2u, response.availableTransports.size()); @@ -51,3 +47,20 @@ TEST(request_sender_negotiate, negotiation_request_sent_and_response_serialized) ASSERT_EQ(1u, response.availableTransports[1].transfer_formats.size()); ASSERT_EQ("Text", response.availableTransports[1].transfer_formats[0]); } + +TEST(negotiate, negotiation_response_with_redirect) +{ + auto request_factory = test_http_client([](const std::string&, http_request request) + { + std::string response_body( + "{\"url\" : \"http://redirect\", " + "\"accessToken\" : \"secret\" }"); + + return http_response(200, response_body); + }); + + auto response = negotiate::negotiate(request_factory, "http://fake/signalr").get(); + + ASSERT_EQ("http://redirect", response.url); + ASSERT_EQ("secret", response.accessToken); +} diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_http_client.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/test_http_client.cpp new file mode 100644 index 000000000000..9fd9c11f0e01 --- /dev/null +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_http_client.cpp @@ -0,0 +1,26 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#include "stdafx.h" +#include "test_http_client.h" + +test_http_client::test_http_client(std::function create_http_response_fn) + : m_http_response(create_http_response_fn) +{ +} + +void test_http_client::send(std::string url, http_request request, std::function callback) +{ + http_response response; + std::exception_ptr exception; + try + { + response = m_http_response(url, request); + } + catch (...) + { + exception = std::current_exception(); + } + + callback(std::move(response), exception); +} diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_http_client.h b/src/SignalR/clients/cpp/test/signalrclienttests/test_http_client.h new file mode 100644 index 000000000000..74f0d4d8bb93 --- /dev/null +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_http_client.h @@ -0,0 +1,17 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +#pragma once + +#include "signalrclient/http_client.h" + +using namespace signalr; + +class test_http_client : public http_client +{ +public: + test_http_client(std::function create_http_response_fn); + void send(std::string url, http_request request, std::function callback) override; +private: + std::function m_http_response; +}; diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.cpp index 1d4e3cbe00c5..1406dce8595f 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.cpp +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.cpp @@ -10,12 +10,11 @@ test_transport_factory::test_transport_factory(const std::shared_ptr test_transport_factory::create_transport(transport_type transport_type, const logger& logger, - const signalr_client_config&, std::function process_message_callback, - std::function error_callback) + const signalr_client_config&) { if (transport_type == signalr::transport_type::websockets) { - return websocket_transport::create([&](){ return m_websocket_client; }, logger, process_message_callback, error_callback); + return websocket_transport::create([&](){ return m_websocket_client; }, logger); } throw std::runtime_error("not supported"); diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.h b/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.h index 2f7f6c9a81ad..f4fdc4761920 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.h +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_transport_factory.h @@ -4,7 +4,7 @@ #pragma once #include "transport_factory.h" -#include "websocket_client.h" +#include "signalrclient/websocket_client.h" using namespace signalr; @@ -14,9 +14,7 @@ class test_transport_factory : public transport_factory test_transport_factory(const std::shared_ptr& websocket_client); std::shared_ptr create_transport(transport_type transport_type, const logger& logger, - const signalr_client_config& signalr_client_config, - std::function process_message_callback, - std::function error_callback) override; + const signalr_client_config& signalr_client_config) override; private: std::shared_ptr m_websocket_client; diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.cpp index 39f4b0fb0224..0863492fc577 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.cpp +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.cpp @@ -5,6 +5,7 @@ #include "test_utils.h" #include "test_websocket_client.h" #include "test_web_request_factory.h" +#include "test_http_client.h" using namespace signalr; @@ -17,10 +18,10 @@ std::string remove_date_from_log_entry(const std::string &log_entry) return log_entry.substr(date_end_index + 1); } -std::shared_ptr create_test_websocket_client(std::function()> receive_function, - std::function(const std::string& msg)> send_function, - std::function(const std::string& url)> connect_function, - std::function()> close_function) +std::shared_ptr create_test_websocket_client(std::function)> receive_function, + std::function)> send_function, + std::function)> connect_function, + std::function)> close_function) { auto websocket_client = std::make_shared(); websocket_client->set_receive_function(receive_function); @@ -45,6 +46,20 @@ std::unique_ptr create_test_web_request_factory() }); } +std::unique_ptr create_test_http_client() +{ + return std::make_unique([](const std::string & url, http_request request) + { + auto response_body = + url.find_first_of("/negotiate") != 0 + ? "{\"connectionId\" : \"f7707523-307d-4cba-9abf-3eef701241e8\", " + "\"availableTransports\" : [ { \"transport\": \"WebSockets\", \"transferFormats\": [ \"Text\", \"Binary\" ] } ] }" + : ""; + + return http_response{ 200, response_body }; + }); +} + std::string create_uri() { auto unit_test = ::testing::UnitTest::GetInstance(); diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.h b/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.h index 4e007c657884..eb7d98c25625 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.h +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_utils.h @@ -4,19 +4,97 @@ #pragma once #include "cpprest/details/basic_types.h" -#include "websocket_client.h" +#include "signalrclient/websocket_client.h" #include "web_request_factory.h" +#include "signalrclient/http_client.h" +#include std::string remove_date_from_log_entry(const std::string &log_entry); std::shared_ptr create_test_websocket_client( - std::function()> receive_function = [](){ return pplx::task_from_result(""); }, - std::function(const std::string& msg)> send_function = [](const std::string&){ return pplx::task_from_result(); }, - std::function(const std::string& url)> connect_function = [](const std::string&){ return pplx::task_from_result(); }, - std::function()> close_function = [](){ return pplx::task_from_result(); }); + std::function)> receive_function = [](std::function callback) { callback("", nullptr); }, + std::function)> send_function = [](const std::string&, std::function callback) { callback(nullptr); }, + std::function)> connect_function = [](const std::string&, std::function callback) { callback(nullptr); }, + std::function)> close_function = [](std::function callback) { callback(nullptr); }); std::unique_ptr create_test_web_request_factory(); +std::unique_ptr create_test_http_client(); std::string create_uri(); std::string create_uri(const std::string& query_string); std::vector filter_vector(const std::vector& source, const std::string& string); std::string dump_vector(const std::vector& source); + +template +class manual_reset_event +{ +public: + void set(T value) + { + m_promise.set_value(value); + } + + void set_exception(std::exception exception) + { + m_promise.set_exception(std::make_exception_ptr(exception)); + } + + void set_exception(std::exception_ptr exception) + { + m_promise.set_exception(exception); + } + + T get() + { + // TODO: timeout + try + { + auto ret = m_promise.get_future().get(); + m_promise = std::promise(); + return ret; + } + catch (...) + { + m_promise = std::promise(); + std::rethrow_exception(std::current_exception()); + } + } +private: + std::promise m_promise; +}; + +template <> +class manual_reset_event +{ +public: + void set() + { + m_promise.set_value(); + } + + void set_exception(std::exception exception) + { + m_promise.set_exception(std::make_exception_ptr(exception)); + } + + void set_exception(std::exception_ptr exception) + { + m_promise.set_exception(exception); + } + + void get() + { + try + { + m_promise.get_future().get(); + } + catch (...) + { + m_promise = std::promise(); + std::rethrow_exception(std::current_exception()); + } + + m_promise = std::promise(); + } +private: + std::promise m_promise; +}; diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.cpp index fbf2e8ef9334..d24b7724a62c 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.cpp +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.cpp @@ -3,51 +3,63 @@ #include "stdafx.h" #include "test_websocket_client.h" +#include "pplx/pplxtasks.h" test_websocket_client::test_websocket_client() - : m_connect_function([](const std::string&){ return pplx::task_from_result(); }), - m_send_function ([](const std::string msg){ return pplx::task_from_result(); }), - m_receive_function([](){ return pplx::task_from_result(""); }), - m_close_function([](){ return pplx::task_from_result(); }) - + : m_connect_function([](const std::string&, std::function callback) { callback(nullptr); }), + m_send_function([](const std::string msg, std::function callback) { callback(nullptr); }), + m_receive_function([](std::function callback) { callback("", nullptr); }), + m_close_function([](std::function callback) { callback(nullptr); }) { } -pplx::task test_websocket_client::connect(const std::string& url) +void test_websocket_client::start(std::string url, transfer_format, std::function callback) { - return m_connect_function(url); + pplx::create_task([url, callback, this]() + { + m_connect_function(url, callback); + }); } -pplx::task test_websocket_client::send(const std::string &msg) +void test_websocket_client::stop(std::function callback) { - return m_send_function(msg); + pplx::create_task([callback, this]() + { + m_close_function(callback); + }); } -pplx::task test_websocket_client::receive() +void test_websocket_client::send(std::string payload, std::function callback) { - return pplx::create_task([this]() { return m_receive_function(); }); + pplx::create_task([payload, callback, this]() + { + m_send_function(payload, callback); + }); } -pplx::task test_websocket_client::close() +void test_websocket_client::receive(std::function callback) { - return m_close_function(); + pplx::create_task([callback, this]() + { + m_receive_function(callback); + }); } -void test_websocket_client::set_connect_function(std::function(const std::string& url)> connect_function) +void test_websocket_client::set_connect_function(std::function)> connect_function) { m_connect_function = connect_function; } -void test_websocket_client::set_send_function(std::function(const std::string& msg)> send_function) +void test_websocket_client::set_send_function(std::function)> send_function) { m_send_function = send_function; } -void test_websocket_client::set_receive_function(std::function()> receive_function) +void test_websocket_client::set_receive_function(std::function)> receive_function) { m_receive_function = receive_function; } -void test_websocket_client::set_close_function(std::function()> close_function) +void test_websocket_client::set_close_function(std::function)> close_function) { m_close_function = close_function; } diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.h b/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.h index bea66aded20d..85ec4e460014 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.h +++ b/src/SignalR/clients/cpp/test/signalrclienttests/test_websocket_client.h @@ -4,7 +4,7 @@ #pragma once #include -#include "websocket_client.h" +#include "signalrclient/websocket_client.h" using namespace signalr; @@ -13,28 +13,28 @@ class test_websocket_client : public websocket_client public: test_websocket_client(); - pplx::task connect(const std::string& url) override; + void start(std::string url, transfer_format format, std::function callback) override; - pplx::task send(const std::string& msg) override; + void stop(std::function callback) override; - pplx::task receive() override; + void send(std::string payload, std::function callback) override; - pplx::task close() override; + void receive(std::function callback) override; - void set_connect_function(std::function(const std::string& url)> connect_function); + void set_connect_function(std::function)> connect_function); - void set_send_function(std::function(const std::string& msg)> send_function); + void set_send_function(std::function)> send_function); - void set_receive_function(std::function()> receive_function); + void set_receive_function(std::function)> receive_function); - void set_close_function(std::function()> close_function); + void set_close_function(std::function)> close_function); private: - std::function(const std::string& url)> m_connect_function; + std::function)> m_connect_function; - std::function(const std::string&)> m_send_function; + std::function)> m_send_function; - std::function()> m_receive_function; + std::function)> m_receive_function; - std::function()> m_close_function; + std::function)> m_close_function; }; diff --git a/src/SignalR/clients/cpp/test/signalrclienttests/websocket_transport_tests.cpp b/src/SignalR/clients/cpp/test/signalrclienttests/websocket_transport_tests.cpp index 2c620a0ee1d0..03114e44b18a 100644 --- a/src/SignalR/clients/cpp/test/signalrclienttests/websocket_transport_tests.cpp +++ b/src/SignalR/clients/cpp/test/signalrclienttests/websocket_transport_tests.cpp @@ -7,6 +7,7 @@ #include "test_websocket_client.h" #include "websocket_transport.h" #include "memory_log_writer.h" +#include using namespace signalr; @@ -16,24 +17,29 @@ TEST(websocket_transport_connect, connect_connects_and_starts_receive_loop) auto receive_called = std::make_shared(); auto client = std::make_shared(); - client->set_connect_function([&connect_called](const std::string&) -> pplx::task + client->set_connect_function([&connect_called](const std::string&, std::function callback) { connect_called = true; - return pplx::task_from_result(); + callback(nullptr); }); - client->set_receive_function([receive_called]()->pplx::task + client->set_receive_function([receive_called](std::function callback) { receive_called->set(); - return pplx::task_from_result(std::string("")); + callback("", nullptr); }); std::shared_ptr writer(std::make_shared()); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::info), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::info)); - ws_transport->connect("ws://fakeuri.org/connect?param=42").get(); + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org/connect?param=42", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + + mre.get(); ASSERT_TRUE(connect_called); ASSERT_FALSE(receive_called->wait(5000)); @@ -48,18 +54,22 @@ TEST(websocket_transport_connect, connect_connects_and_starts_receive_loop) TEST(websocket_transport_connect, connect_propagates_exceptions) { auto client = std::make_shared(); - client->set_connect_function([](const std::string&)->pplx::task + client->set_connect_function([](const std::string&, std::function callback) { - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed"))); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")))); }); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); try { - ws_transport->connect("ws://fakeuri.org").get(); - ASSERT_TRUE(false); // exception not thrown + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr exception) + { + mre.set_exception(exception); + }); + mre.get(); + ASSERT_TRUE(false); } catch (const std::exception &e) { @@ -70,21 +80,24 @@ TEST(websocket_transport_connect, connect_propagates_exceptions) TEST(websocket_transport_connect, connect_logs_exceptions) { auto client = std::make_shared(); - client->set_connect_function([](const std::string&) -> pplx::task + client->set_connect_function([](const std::string&, std::function callback) { - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed"))); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connecting failed")))); }); std::shared_ptr writer(std::make_shared()); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::errors), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::errors)); + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr exception) + { + mre.set_exception(exception); + }); try { - ws_transport->connect("ws://fakeuri.org").wait(); + mre.get(); } - catch (...) - { } + catch (...) {} auto log_entries = std::dynamic_pointer_cast(writer)->get_log_entries(); @@ -100,15 +113,23 @@ TEST(websocket_transport_connect, connect_logs_exceptions) TEST(websocket_transport_connect, cannot_call_connect_on_already_connected_transport) { auto client = std::make_shared(); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); - ws_transport->connect("ws://fakeuri.org").wait(); + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); try { - ws_transport->connect("ws://fakeuri.org").wait(); - ASSERT_TRUE(false); // exception not thrown + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr exception) + { + mre.set_exception(exception); + }); + mre.get(); + ASSERT_TRUE(false); } catch (const std::exception &e) { @@ -119,13 +140,26 @@ TEST(websocket_transport_connect, cannot_call_connect_on_already_connected_trans TEST(websocket_transport_connect, can_connect_after_disconnecting) { auto client = std::make_shared(); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); + + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); + + ws_transport->stop([&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); - ws_transport->connect("ws://fakeuri.org").get(); - ws_transport->disconnect().get(); - ws_transport->connect("ws://fakeuri.org").get(); - // shouldn't throw or crash + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); } TEST(websocket_transport_send, send_creates_and_sends_websocket_messages) @@ -134,18 +168,26 @@ TEST(websocket_transport_send, send_creates_and_sends_websocket_messages) auto client = std::make_shared(); - client->set_send_function([&send_called](const std::string&) -> pplx::task + client->set_send_function([&send_called](const std::string&, std::function callback) { send_called = true; - return pplx::task_from_result(); + callback(nullptr); }); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); - ws_transport->connect("ws://url") - .then([ws_transport](){ return ws_transport->send("ABC"); }) - .wait(); + auto mre = manual_reset_event(); + ws_transport->start("ws://url", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); + + ws_transport->send("ABC", [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); ASSERT_TRUE(send_called); } @@ -156,43 +198,67 @@ TEST(websocket_transport_disconnect, disconnect_closes_websocket) auto client = std::make_shared(); - client->set_close_function([&close_called]() -> pplx::task + client->set_close_function([&close_called](std::function callback) { close_called = true; - return pplx::task_from_result(); + callback(nullptr); }); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); - ws_transport->connect("ws://url") - .then([ws_transport]() - { - return ws_transport->disconnect(); - }).get(); + auto mre = manual_reset_event(); + ws_transport->start("ws://url", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); + + ws_transport->stop([&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); ASSERT_TRUE(close_called); } -TEST(websocket_transport_disconnect, disconnect_does_not_throw) +TEST(websocket_transport_stop, propogates_exception_from_close) { auto client = std::make_shared(); bool close_called = false; - client->set_close_function([&close_called]() -> pplx::task + client->set_close_function([&close_called](std::function callback) { close_called = true; - return pplx::task_from_exception(std::exception()); + callback(std::make_exception_ptr(std::exception())); }); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); - ws_transport->connect("ws://url") - .then([ws_transport]() + auto mre = manual_reset_event(); + ws_transport->start("ws://url", transfer_format::text, [&mre](std::exception_ptr) { - return ws_transport->disconnect(); - }).get(); + mre.set(); + }); + mre.get(); + + ws_transport->stop([&mre](std::exception_ptr exception) + { + if (exception) + { + mre.set_exception(exception); + } + else + { + mre.set(); + } + }); + try + { + mre.get(); + ASSERT_TRUE(false); + } + catch (...) { } ASSERT_TRUE(close_called); } @@ -200,21 +266,27 @@ TEST(websocket_transport_disconnect, disconnect_does_not_throw) TEST(websocket_transport_disconnect, disconnect_logs_exceptions) { auto client = std::make_shared(); - client->set_close_function([]()->pplx::task + client->set_close_function([](std::function callback) { - return pplx::task_from_exception(web::websockets::client::websocket_exception(_XPLATSTR("connection closing failed"))); + callback(std::make_exception_ptr(web::websockets::client::websocket_exception(_XPLATSTR("connection closing failed")))); }); std::shared_ptr writer(std::make_shared()); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::errors), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level::errors)); - ws_transport->connect("ws://url") - .then([ws_transport]() - { - return ws_transport->disconnect(); - }).get(); + auto mre = manual_reset_event(); + ws_transport->start("ws://url", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); + + ws_transport->stop([&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); auto log_entries = std::dynamic_pointer_cast(writer)->get_log_entries(); @@ -241,34 +313,59 @@ TEST(websocket_transport_disconnect, receive_not_called_after_disconnect) // receive_task_tce is captured by reference since we assign it a new value after the first disconnect. This is // safe here because we are blocking on disconnect and therefore we won't get into a state where we would be using // an invalid reference because the tce went out of scope and was destroyed. - client->set_close_function([&receive_task_tce]() + client->set_close_function([&receive_task_tce](std::function callback) { // unblock receive receive_task_tce.set(std::string("")); - return pplx::task_from_result(); + callback(nullptr); }); int num_called = 0; - - client->set_receive_function([&receive_task_tce, &receive_task_started_tce, &num_called]() -> pplx::task + client->set_receive_function([&receive_task_tce, &receive_task_started_tce, &num_called](std::function callback) { num_called++; receive_task_started_tce.set(); - return pplx::create_task(receive_task_tce); + pplx::create_task(receive_task_tce) + .then([callback](pplx::task prev) + { + prev.get(); + callback("", nullptr); + }); }); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); + + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); - ws_transport->connect("ws://fakeuri.org").get(); pplx::create_task(receive_task_started_tce).get(); - ws_transport->disconnect().get(); + + ws_transport->stop([&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); receive_task_tce = pplx::task_completion_event(); receive_task_started_tce = pplx::task_completion_event(); - ws_transport->connect("ws://fakeuri.org").get(); + + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); + pplx::create_task(receive_task_started_tce).get(); - ws_transport->disconnect().get(); + + ws_transport->stop([&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); ASSERT_EQ(2, num_called); } @@ -279,16 +376,20 @@ TEST(websocket_transport_disconnect, disconnect_is_no_op_if_transport_not_starte auto close_called = false; - client->set_close_function([&close_called]() + client->set_close_function([&close_called](std::function callback) { close_called = true; - return pplx::task_from_result(); + callback(nullptr); }); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); - ws_transport->disconnect().get(); + auto mre = manual_reset_event(); + ws_transport->stop([&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); ASSERT_FALSE(close_called); } @@ -298,20 +399,29 @@ TEST(websocket_transport_disconnect, exceptions_from_outstanding_receive_task_ob auto client = std::make_shared(); auto receive_event = std::make_shared(); - client->set_receive_function([receive_event]() + client->set_receive_function([receive_event](std::function callback) { - return pplx::create_task([receive_event]() + pplx::create_task([receive_event, callback]() { receive_event->wait(); - return pplx::task_from_exception(std::runtime_error("exception from receive")); + callback("", std::make_exception_ptr(std::runtime_error("exception from receive"))); }); }); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); + + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); - ws_transport->connect("ws://fakeuri.org").get(); - ws_transport->disconnect().get(); + ws_transport->stop([&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); // at this point the cancellation token that closes the receive loop is set to cancelled so // we can unblock the the receive task which throws an exception that should be observed otwherwise the test will crash @@ -333,8 +443,8 @@ TEST(websocket_transport_receive_loop, receive_loop_logs_if_receive_task_cancele { receive_loop_logs_exception_runner( pplx::task_canceled("canceled"), - "[info ] [websocket transport] receive task canceled.\n", - trace_level::info); + "[error ] [websocket transport] error receiving response from websocket: canceled\n", + trace_level::errors); } TEST(websocket_transport_receive_loop, receive_loop_logs_std_exception) @@ -351,22 +461,24 @@ void receive_loop_logs_exception_runner(const T& e, const std::string& expected_ event receive_event; auto client = std::make_shared(); - client->set_receive_function([&receive_event, &e]()->pplx::task + client->set_receive_function([&receive_event, &e](std::function callback) { + callback("", std::make_exception_ptr(e)); receive_event.set(); - return pplx::task_from_exception(e); }); std::shared_ptr writer(std::make_shared()); - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level), - [](const std::string&){}, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(writer, trace_level)); - ws_transport->connect("ws://url") - .then([&receive_event]() + auto mre = manual_reset_event(); + ws_transport->start("ws://url", transfer_format::text, [&mre](std::exception_ptr) { - receive_event.wait(); - }).get(); + mre.set(); + }); + mre.get(); + + receive_event.wait(); // this is race'y but there is nothing we can block on std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -375,30 +487,36 @@ void receive_loop_logs_exception_runner(const T& e, const std::string& expected_ ASSERT_NE(std::find_if(log_entries.begin(), log_entries.end(), [&expected_message](std::string entry) { return remove_date_from_log_entry(entry) == expected_message; }), - log_entries.end()); + log_entries.end()) << dump_vector(log_entries); } TEST(websocket_transport_receive_loop, process_response_callback_called_when_message_received) { auto client = std::make_shared(); - client->set_receive_function([]() -> pplx::task + client->set_receive_function([](std::function callback) { - return pplx::task_from_result(std::string("msg")); + callback("msg", nullptr); }); auto process_response_event = std::make_shared(); auto msg = std::make_shared(); - auto process_response = [msg, process_response_event](const std::string& message) + auto process_response = [msg, process_response_event](const std::string& message, std::exception_ptr exception) { + ASSERT_FALSE(exception); *msg = message; process_response_event->set(); }; - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - process_response, [](const std::exception&){}); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); + ws_transport->on_receive(process_response); - ws_transport->connect("ws://fakeuri.org").get(); + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); process_response_event->wait(1000); @@ -408,31 +526,43 @@ TEST(websocket_transport_receive_loop, process_response_callback_called_when_mes TEST(websocket_transport_receive_loop, error_callback_called_when_exception_thrown) { auto client = std::make_shared(); - client->set_receive_function([]() + client->set_receive_function([](std::function callback) { - return pplx::task_from_exception(std::runtime_error("error")); + callback("", std::make_exception_ptr(std::runtime_error("error"))); }); auto close_invoked = std::make_shared(false); - client->set_close_function([close_invoked]() + client->set_close_function([close_invoked](std::function callback) { *close_invoked = true; - return pplx::task_from_result(); + callback(nullptr); }); auto error_event = std::make_shared(); auto exception_msg = std::make_shared(); - auto error_callback = [exception_msg, error_event](const std::exception& e) + auto error_callback = [exception_msg, error_event](std::exception_ptr exception) { - *exception_msg = e.what(); + try + { + std::rethrow_exception(exception); + } + catch (const std::exception& e) + { + *exception_msg = e.what(); + } error_event->set(); }; - auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none), - [](const std::string&){}, error_callback); + auto ws_transport = websocket_transport::create([&](){ return client; }, logger(std::make_shared(), trace_level::none)); + ws_transport->on_close(error_callback); - ws_transport->connect("ws://fakeuri.org").get(); + auto mre = manual_reset_event(); + ws_transport->start("ws://fakeuri.org", transfer_format::text, [&mre](std::exception_ptr) + { + mre.set(); + }); + mre.get(); error_event->wait(1000); @@ -444,8 +574,7 @@ TEST(websocket_transport_get_transport_type, get_transport_type_returns_websocke { auto ws_transport = websocket_transport::create( [](){ return std::make_shared(); }, - logger(std::make_shared(), trace_level::none), - [](const std::string&){}, [](const std::exception&){}); + logger(std::make_shared(), trace_level::none)); ASSERT_EQ(transport_type::websockets, ws_transport->get_transport_type()); }