Skip to content

Commit 9825478

Browse files
Add connection management methods to VectorsCollectionClient and enhance YarpRobotLoggerDevice for signal reconnections
1 parent f87f2a4 commit 9825478

File tree

5 files changed

+212
-1
lines changed

5 files changed

+212
-1
lines changed

bindings/python/YarpUtilities/src/VectorsCollection.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ void CreateVectorsCollectionClient(pybind11::module& module)
6969
py::arg("handler"))
7070
.def("connect", &VectorsCollectionClient::connect)
7171
.def("disconnect", &VectorsCollectionClient::disconnect)
72+
.def("is_connected", &VectorsCollectionClient::isConnected)
73+
.def("check_connection", &VectorsCollectionClient::checkConnection)
7274
.def("get_metadata",
7375
[](VectorsCollectionClient& impl)
7476
-> BipedalLocomotion::YarpUtilities::VectorsCollectionMetadata {

devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,29 @@ bool BipedalLocomotion::YarpRobotLoggerDevice::record()
10881088
m_lookForNewLogsThread = std::thread([this] { this->lookForNewLogs(); });
10891089
}
10901090

1091+
// Refresh all exogenous signal connections so they are re-established cleanly
1092+
auto refreshExogenousConnections = [](auto& signals) {
1093+
for (auto& [name, signal] : signals)
1094+
{
1095+
signal.disconnect();
1096+
signal.connected = false;
1097+
signal.dataArrived = false;
1098+
}
1099+
};
1100+
refreshExogenousConnections(m_vectorsCollectionSignals);
1101+
refreshExogenousConnections(m_vectorSignals);
1102+
refreshExogenousConnections(m_stringSignals);
1103+
refreshExogenousConnections(m_humanStateSignals);
1104+
refreshExogenousConnections(m_wearableTargetsSignals);
1105+
refreshExogenousConnections(m_wearableDataSignals);
1106+
refreshExogenousConnections(m_imageSignals);
1107+
1108+
// Also clear cached metadata for VectorsCollection signals
1109+
for (auto& [name, signal] : m_vectorsCollectionSignals)
1110+
{
1111+
signal.metadata.vectors.clear();
1112+
}
1113+
10911114
// run the thread for reading the exogenous signals
10921115
m_lookForNewExogenousSignalThread = std::thread([this] { this->lookForExogenousSignals(); });
10931116

@@ -1625,6 +1648,29 @@ void YarpRobotLoggerDevice::lookForExogenousSignals()
16251648
}
16261649
};
16271650

1651+
auto checkExogeneousConnections
1652+
= [this](
1653+
std::unordered_map<std::string, VectorsCollectionSignal>& signals) -> void {
1654+
for (auto& [name, signal] : signals)
1655+
{
1656+
if (!signal.connected)
1657+
{
1658+
continue;
1659+
}
1660+
1661+
std::lock_guard<std::mutex> lock(signal.mutex);
1662+
if (!signal.client.checkConnection())
1663+
{
1664+
log()->warn("[YarpRobotLoggerDevice::lookForExogenousSignals] Connection lost "
1665+
"for exogenous signal '{}'. Will attempt to reconnect.",
1666+
name);
1667+
signal.connected = false;
1668+
signal.dataArrived = false;
1669+
signal.metadata.vectors.clear();
1670+
}
1671+
}
1672+
};
1673+
16281674
while (m_lookForNewExogenousSignalIsRunning)
16291675
{
16301676
// detect if a clock has been reset
@@ -1647,7 +1693,8 @@ void YarpRobotLoggerDevice::lookForExogenousSignals()
16471693
connectToExogeneous(m_wearableDataSignals);
16481694
connectToExogeneous(m_imageSignals);
16491695

1650-
// TODO check for updated metadata from already connected signals
1696+
// check if already connected VectorsCollection signals are still alive
1697+
checkExogeneousConnections(m_vectorsCollectionSignals);
16511698

16521699
// Start the logging for exogenous images
16531700
for (auto& [name, signal] : m_imageSignals)

src/YarpUtilities/include/BipedalLocomotion/YarpUtilities/VectorsCollectionClient.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,23 @@ class VectorsCollectionClient
6565
*/
6666
bool disconnect();
6767

68+
/**
69+
* Check if the client is currently connected (cached state).
70+
* @return true if the client believes it is connected, false otherwise.
71+
* @note This returns the cached connection state. Use checkConnection() to actively verify
72+
* YARP port connectivity.
73+
*/
74+
bool isConnected() const;
75+
76+
/**
77+
* Actively verify that the YARP port connections are still alive.
78+
* If any connection (data or RPC) is found to be broken, the internal state is updated to
79+
* disconnected.
80+
* @return true if all connections are still alive, false otherwise.
81+
* @note After this method returns false, call connect() to re-establish the connections.
82+
*/
83+
bool checkConnection();
84+
6885
/**
6986
* Check if new metadata is available.
7087
* @return true if new metadata is available, false otherwise.

src/YarpUtilities/src/VectorsCollectionClient.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,37 @@ bool VectorsCollectionClient::disconnect()
123123
return true;
124124
}
125125

126+
bool VectorsCollectionClient::isConnected() const
127+
{
128+
return m_pimpl->isConnected;
129+
}
130+
131+
bool VectorsCollectionClient::checkConnection()
132+
{
133+
if (!m_pimpl->isConnected)
134+
{
135+
return false;
136+
}
137+
138+
// Actively verify that both YARP connections (data and RPC) are still alive
139+
const bool dataConnected
140+
= yarp::os::Network::isConnected(m_pimpl->remotePortName, m_pimpl->localPortName);
141+
const bool rpcConnected
142+
= yarp::os::Network::isConnected(m_pimpl->localRpcPortName, m_pimpl->remoteRpcPortName);
143+
144+
if (!dataConnected || !rpcConnected)
145+
{
146+
log()->warn("[VectorsCollectionClient::checkConnection] Connection lost. Data port "
147+
"connected: {}, RPC port connected: {}.",
148+
dataConnected,
149+
rpcConnected);
150+
m_pimpl->isConnected = false;
151+
return false;
152+
}
153+
154+
return true;
155+
}
156+
126157
bool VectorsCollectionClient::connect()
127158
{
128159
constexpr auto rpcCarrier = "tcp";

src/YarpUtilities/tests/VectorsCollectionTest.cpp

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,13 +406,42 @@ TEST_CASE_METHOD(VectorsCollectionFixture, "VectorsCollectionClient - Connection
406406

407407
REQUIRE(client.initialize(clientHandler));
408408

409+
SECTION("isConnected returns false before connect") {
410+
// Test: Client should report disconnected state before connect
411+
REQUIRE_FALSE(client.isConnected());
412+
}
413+
414+
SECTION("checkConnection returns false before connect") {
415+
// Test: checkConnection should return false when not connected
416+
REQUIRE_FALSE(client.checkConnection());
417+
}
418+
409419
SECTION("Successful connection to available server") {
410420
// Test: Client should connect when server is available
411421
// Behavior: Establishes both data and RPC port connections
412422

413423
// Allow server ports to fully open
414424
std::this_thread::sleep_for(std::chrono::milliseconds(100));
415425

426+
REQUIRE(client.connect());
427+
REQUIRE(client.isConnected());
428+
}
429+
430+
SECTION("checkConnection returns true when connected") {
431+
// Test: checkConnection should return true when both ports are alive
432+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
433+
REQUIRE(client.connect());
434+
REQUIRE(client.checkConnection());
435+
REQUIRE(client.isConnected());
436+
}
437+
438+
SECTION("Graceful connection failure when server unavailable") {
439+
// Test: Client should connect when server is available
440+
// Behavior: Establishes both data and RPC port connections
441+
442+
// Allow server ports to fully open
443+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
444+
416445
REQUIRE(client.connect());
417446
}
418447

@@ -859,3 +888,88 @@ TEST_CASE_METHOD(VectorsCollectionFixture,
859888
BipedalLocomotion::YarpUtilities::VectorsCollectionMetadata meta;
860889
REQUIRE_FALSE(client.getMetadata(meta));
861890
}
891+
892+
TEST_CASE_METHOD(VectorsCollectionFixture,
893+
"VectorsCollectionClient - checkConnection detects server shutdown")
894+
{
895+
// Use a heap-allocated server so we can destroy it mid-test
896+
auto server = std::make_unique<VectorsCollectionServer>();
897+
REQUIRE(server->initialize(serverHandler));
898+
server->populateMetadata("sig", {"a", "b"});
899+
REQUIRE(server->finalizeMetadata());
900+
901+
VectorsCollectionClient client;
902+
REQUIRE(client.initialize(clientHandler));
903+
904+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
905+
REQUIRE(client.connect());
906+
REQUIRE(client.isConnected());
907+
REQUIRE(client.checkConnection());
908+
909+
// Destroy the server, which closes its ports
910+
server.reset();
911+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
912+
913+
// checkConnection should detect the broken connections
914+
REQUIRE_FALSE(client.checkConnection());
915+
REQUIRE_FALSE(client.isConnected());
916+
917+
// After detecting disconnection, connect should fail (no server)
918+
REQUIRE_FALSE(client.connect());
919+
}
920+
921+
TEST_CASE_METHOD(VectorsCollectionFixture,
922+
"VectorsCollectionClient - reconnect after server restart")
923+
{
924+
// Start a server, connect, then destroy and recreate it
925+
auto server = std::make_unique<VectorsCollectionServer>();
926+
REQUIRE(server->initialize(serverHandler));
927+
server->populateMetadata("sig", {"x"});
928+
REQUIRE(server->finalizeMetadata());
929+
930+
VectorsCollectionClient client;
931+
REQUIRE(client.initialize(clientHandler));
932+
933+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
934+
REQUIRE(client.connect());
935+
REQUIRE(client.checkConnection());
936+
937+
// Send some data
938+
server->prepareData();
939+
REQUIRE(server->populateData("sig", std::vector<double>{1.0}));
940+
server->sendData();
941+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
942+
943+
auto* data = client.readData(false);
944+
REQUIRE(data != nullptr);
945+
REQUIRE(data->vectors.at("sig")[0] == Catch::Approx(1.0));
946+
947+
// Destroy the server
948+
server.reset();
949+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
950+
REQUIRE_FALSE(client.checkConnection());
951+
952+
// Create a new server on the same ports
953+
server = std::make_unique<VectorsCollectionServer>();
954+
REQUIRE(server->initialize(serverHandler));
955+
server->populateMetadata("sig", {"x"});
956+
REQUIRE(server->finalizeMetadata());
957+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
958+
959+
// Reconnect
960+
REQUIRE(client.connect());
961+
REQUIRE(client.isConnected());
962+
REQUIRE(client.checkConnection());
963+
964+
// Verify data flows again
965+
server->prepareData();
966+
REQUIRE(server->populateData("sig", std::vector<double>{42.0}));
967+
server->sendData();
968+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
969+
970+
data = client.readData(false);
971+
if (data != nullptr)
972+
{
973+
REQUIRE(data->vectors.at("sig")[0] == Catch::Approx(42.0));
974+
}
975+
}

0 commit comments

Comments
 (0)