Skip to content

Commit b5395d8

Browse files
committed
Even Unicast and Multicast needs to handle reconnect
Signed-off-by: gxu <georgexu420@163.com>
1 parent f3eeb9f commit b5395d8

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

scaler/io/ymq/io_socket.cpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ void IOSocket::sendMessage(Message message, SendMessageCallback onMessageSent) n
3434
} else if (this->socketType() == IOSocketType::Multicast) {
3535
callback(0); // SUCCESS
3636
for (const auto& [addr, conn]: _identityToConnection) {
37+
// TODO: Currently doing N copies of the messages. Find a place to
38+
// store this message and pass in reference.
3739
if (addr.starts_with(address))
3840
conn->sendMessage(message, [](int) {});
3941
}
@@ -66,10 +68,10 @@ void IOSocket::recvMessage(RecvMessageCallback onRecvMessage) noexcept {
6668
}
6769
}
6870

69-
if (socketType() == IOSocketType::Unicast) {
70-
_pendingRecvMessages->front()({});
71-
_pendingRecvMessages->pop();
72-
}
71+
// if (socketType() == IOSocketType::Unicast) {
72+
// _pendingRecvMessages->front()({});
73+
// _pendingRecvMessages->pop();
74+
// }
7375
});
7476
}
7577

@@ -110,15 +112,21 @@ void IOSocket::onConnectionDisconnected(MessageConnectionTCP* conn) noexcept {
110112

111113
auto connIt = this->_identityToConnection.find(*conn->_remoteIOSocketIdentity);
112114

113-
if (socketType() == IOSocketType::Unicast || socketType() == IOSocketType::Multicast) {
114-
this->_identityToConnection.erase(connIt);
115-
return;
116-
}
117-
118115
_unestablishedConnection.push_back(std::move(connIt->second));
119116
this->_identityToConnection.erase(connIt);
120-
121117
auto& connPtr = _unestablishedConnection.back();
118+
119+
if (socketType() == IOSocketType::Unicast || socketType() == IOSocketType::Multicast) {
120+
auto destructWriteOp = std::move(connPtr->_writeOperations);
121+
connPtr->_writeOperations.clear();
122+
while (_pendingRecvMessages->size()) {
123+
// TODO: Replace this with error didNotReceive or something like that
124+
_pendingRecvMessages->front()({});
125+
_pendingRecvMessages->pop();
126+
}
127+
auto destructReadOp = std::move(connPtr->_receivedReadOperations);
128+
}
129+
122130
if (connPtr->_responsibleForRetry) {
123131
connectTo(connPtr->_remoteAddr, [](int) {}); // as the user callback is one-shot
124132
}

scaler/io/ymq/message_connection_tcp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,5 @@ class MessageConnectionTCP: public MessageConnection {
6767

6868
constexpr static bool isCompleteMessage(const TcpReadOperation& x);
6969
friend void IOSocket::onConnectionIdentityReceived(MessageConnectionTCP* conn) noexcept;
70+
friend void IOSocket::onConnectionDisconnected(MessageConnectionTCP* conn) noexcept;
7071
};

0 commit comments

Comments
 (0)