Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class RdmaEndPoint {
enum Status {
INITIALIZING,
UNCONNECTED,
CONNECTING,
CONNECTED,
};

Expand Down Expand Up @@ -132,9 +133,12 @@ class RdmaEndPoint {
std::vector<ibv_qp *> qp_list_;

std::string peer_nic_path_;
std::vector<uint32_t> peer_qp_num_list_;

volatile int *wr_depth_list_;
int max_wr_depth_;
size_t max_sge_per_wr_;
size_t max_inline_bytes_;

volatile bool active_;
volatile int *cq_outstanding_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cassert>
#include <cstddef>

#include "common.h"
#include "config.h"

namespace mooncake {
Expand Down Expand Up @@ -48,6 +49,9 @@ int RdmaEndPoint::construct(ibv_cq *cq, size_t num_qp_list,
cq_outstanding_ = (volatile int *)cq->cq_context;

max_wr_depth_ = (int)max_wr_depth;
max_sge_per_wr_ = max_sge_per_wr;
max_inline_bytes_ = max_inline_bytes;

wr_depth_list_ = new volatile int[num_qp_list];
if (!wr_depth_list_) {
LOG(ERROR) << "Failed to allocate memory for work request depth list";
Expand Down Expand Up @@ -111,44 +115,97 @@ void RdmaEndPoint::setPeerNicPath(const std::string &peer_nic_path) {
}

int RdmaEndPoint::setupConnectionsByActive() {
RWSpinlock::WriteGuard guard(lock_);
if (connected()) {
LOG(INFO) << "Connection has been established";
return 0;
}
HandShakeDesc local_desc, peer_desc;
std::string peer_server_name, peer_nic_name;
bool do_rpc = false;

{
RWSpinlock::WriteGuard guard(lock_);
if (connected()) {
LOG(INFO) << "Connection has been established";
return 0;
}

// loopback mode
if (context_.nicPath() == peer_nic_path_) {
auto segment_desc =
context_.engine().meta()->getSegmentDescByID(LOCAL_SEGMENT_ID);
if (segment_desc) {
for (auto &nic : segment_desc->devices)
if (nic.name == context_.deviceName())
return doSetupConnection(nic.gid, nic.lid, qpNum());
// loopback mode
if (context_.nicPath() == peer_nic_path_) {
auto segment_desc =
context_.engine().meta()->getSegmentDescByID(LOCAL_SEGMENT_ID);
if (segment_desc) {
for (auto &nic : segment_desc->devices)
if (nic.name == context_.deviceName())
return doSetupConnection(nic.gid, nic.lid, qpNum());
}
LOG(ERROR) << "Peer NIC " << context_.deviceName()
<< " not found in localhost";
return ERR_DEVICE_NOT_FOUND;
}
LOG(ERROR) << "Peer NIC " << context_.deviceName()
<< " not found in localhost";
return ERR_DEVICE_NOT_FOUND;
}

HandShakeDesc local_desc, peer_desc;
local_desc.local_nic_path = context_.nicPath();
local_desc.peer_nic_path = peer_nic_path_;
local_desc.qp_num = qpNum();
// Only proceed with RPC if we are the first to transition from
// UNCONNECTED. This prevents duplicate concurrent handshake attempts
// from the same endpoint.
auto current_status = status_.load(std::memory_order_relaxed);
if (current_status == UNCONNECTED) {
status_.store(CONNECTING, std::memory_order_relaxed);
do_rpc = true;

peer_server_name = getServerNameFromNicPath(peer_nic_path_);
peer_nic_name = getNicNameFromNicPath(peer_nic_path_);
if (peer_server_name.empty() || peer_nic_name.empty()) {
LOG(ERROR) << "Parse peer nic path failed: " << peer_nic_path_;
disconnectUnlocked();
return ERR_INVALID_ARGUMENT;
}

auto peer_server_name = getServerNameFromNicPath(peer_nic_path_);
auto peer_nic_name = getNicNameFromNicPath(peer_nic_path_);
if (peer_server_name.empty() || peer_nic_name.empty()) {
LOG(ERROR) << "Parse peer nic path failed: " << peer_nic_path_;
return ERR_INVALID_ARGUMENT;
local_desc.local_nic_path = context_.nicPath();
local_desc.peer_nic_path = peer_nic_path_;
local_desc.qp_num = qpNum();
}
}

if (!do_rpc) {
Comment thread
caozhanhao marked this conversation as resolved.
LOG(INFO) << "Another thread is already performing the endpoint "
"handshake, waiting for it to complete";
uint64_t start_time = getCurrentTimeInNano();
while (status_.load(std::memory_order_acquire) == CONNECTING) {
PAUSE();
Comment thread
caozhanhao marked this conversation as resolved.
Outdated
// Prevent infinite wait with a 10-second timeout
if (getCurrentTimeInNano() - start_time > 10 * 1000000000ull) {
return ERR_ENDPOINT;
}
}
Comment thread
caozhanhao marked this conversation as resolved.
RWSpinlock::ReadGuard guard(lock_);
return connected() ? 0 : ERR_ENDPOINT;
}

// Perform the RPC without holding the lock to avoid deadlock and allow
// "simultaneous open" handshake handling.
int rc = context_.engine().sendHandshake(peer_server_name, local_desc,
peer_desc);
if (rc) return rc;

// Re-acquire lock after RPC to finalize state transition
RWSpinlock::WriteGuard guard(lock_);

// Handle simultaneous open: if the peer initiates a connection during our
// RPC and it is passively established in setupConnectionsByPassive, simply
// reuse the existing endpoint.
if (connected()) {
if (peer_qp_num_list_ == peer_desc.qp_num) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can peer_qp_num_list vectors be directly compared?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can compare them directly.

qp_num_list contains the peer QP numbers and is order-sensitive. If we received a different qp_num_list from the same peer NIC path, we can confirm the peer QP has been re-initialized (e.g. due to a process restart), and therefore a connection re-establishment is required.

If both the peer NIC path and qp_num_list remain unchanged, we treat it as a duplicate connection setup and avoid resetting the connection. This situation can occur when multiple endpoints are being established concurrently from the same peer NIC path. Reusing the existing connection is necessary; otherwise, later requests could overwrite and invalidate previously created QPs (as reported in TENT's PR #1705, that PR also compares qp_num_list). Although this PR introduces a 'wait existing handshake' mechanism that should prevent concurrent endpoint setup, I've chosen to keep the reuse logic to ensure safety and maintain consistency with TENT. Because concurrent endpoint setup is more likely to happen (if there were cases the 'wait existing handshake' fails to cover) than the rare edge case described below.

There is, however, a theoretically possible but rare edge case where peer QPs are re-initialized but happen to reuse the exact same QP numbers with the exact same order. In such cases, we would incorrectly skip the connection re-establishment.

A more robust solution could involve introducing a session id, but it seems adding unnecessary complexity at current stage. My initial thought is that simply attaching a session ID from the active side would not eliminate the need for qp_num_list comparison in setupConnectionsByActive. Because during simultaneous open, a node acts as both the active and passive side simultaneously. Consequently, the session ID sent by the active side thread would differ from the one received by the another passive thread handling the peer's request. Perhaps a more viable solution would be for both sides to generate and exchange their own unique IDs -- similar to how QP numbers are assigned -- forming a (local_session_id, peer_session_id) pair to uniquely identify the connection. However, integrating this into the handshake process in a robust manner doesn't seem straightforward.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return 0;
}
}
Comment thread
caozhanhao marked this conversation as resolved.
Comment thread
caozhanhao marked this conversation as resolved.

if (rc) {
if (status_.load(std::memory_order_relaxed) == CONNECTING) {
disconnectUnlocked();
}
return rc;
}
if (!peer_desc.reply_msg.empty()) {
LOG(ERROR) << "Reject the handshake request by peer "
<< local_desc.peer_nic_path;
if (status_.load(std::memory_order_relaxed) == CONNECTING) {
disconnectUnlocked();
}
return ERR_REJECT_HANDSHAKE;
}

Expand All @@ -160,29 +217,89 @@ int RdmaEndPoint::setupConnectionsByActive() {
<< ", local.peer_nic_path: " << local_desc.peer_nic_path
<< ", peer.local_nic_path: " << peer_desc.local_nic_path
<< ", peer.peer_nic_path: " << peer_desc.peer_nic_path;
if (status_.load(std::memory_order_relaxed) == CONNECTING) {
disconnectUnlocked();
}
return ERR_REJECT_HANDSHAKE;
}

auto segment_desc =
context_.engine().meta()->getSegmentDescByName(peer_server_name);
if (segment_desc) {
for (auto &nic : segment_desc->devices)
if (nic.name == peer_nic_name)
return doSetupConnection(nic.gid, nic.lid, peer_desc.qp_num);
for (auto &nic : segment_desc->devices) {
if (nic.name == peer_nic_name) {
int ret = doSetupConnection(nic.gid, nic.lid, peer_desc.qp_num);
if (ret != 0 &&
status_.load(std::memory_order_relaxed) == CONNECTING) {
disconnectUnlocked();
}
return ret;
}
}
}
LOG(ERROR) << "Peer NIC " << peer_nic_name << " not found in "
<< peer_server_name;
if (status_.load(std::memory_order_relaxed) == CONNECTING) {
disconnectUnlocked();
}
return ERR_DEVICE_NOT_FOUND;
}

int RdmaEndPoint::setupConnectionsByPassive(const HandShakeDesc &peer_desc,
HandShakeDesc &local_desc) {
RWSpinlock::WriteGuard guard(lock_);
if (connected()) {
// If already connected with the same peer QP info, return success
if (peer_qp_num_list_ == peer_desc.qp_num) {
local_desc.local_nic_path = context_.nicPath();
local_desc.peer_nic_path = peer_nic_path_;
local_desc.qp_num = qpNum();
return 0;
}
// Different peer (e.g., peer restarted)
LOG(WARNING) << "Re-establish connection: " << toString();

// If ERDMA is defined, keep the same logic as before.
#ifdef CONFIG_ERDMA
// Save original construction parameters
size_t num_qp = qp_list_.size();
auto max_wr_depth = max_wr_depth_;
auto max_sge_per_wr = max_sge_per_wr_;
auto max_inline_bytes = max_inline_bytes_;

// Deconstruct and reconstruct to get fresh QPs (same as delete+create)
int ret = deconstruct();
if (ret) {
LOG(ERROR) << "Failed to deconstruct endpoint: " << ret;
return ret;
}

// Get CQ from context for reconstruction
ibv_cq *cq = context_.cq();
if (!cq) {
LOG(ERROR) << "No CQ available for endpoint reconstruction";
return ERR_ENDPOINT;
}

// Reconstruct with same parameters as original construction
ret = construct(cq, num_qp, max_sge_per_wr, max_wr_depth,
max_inline_bytes);
if (ret) {
LOG(ERROR) << "Failed to reconstruct endpoint: " << ret;
return ret;
}
#else
disconnectUnlocked();
#endif
}

// Handle simultaneous open: if the state is CONNECTING, we can safely
// proceed to establish the connection on this same endpoint. Because we're
// holding the lock, even if there are already Active RPCs sent to the same
// peer nic path by setupConnectionsByActive, it will be blocked after the
// RPC return. Once the lock is released, they will simply observe the
// CONNECTED state and safely reuse the QP.

if (peer_desc.peer_nic_path != context_.nicPath() ||
peer_desc.local_nic_path != peer_nic_path_) {
local_desc.reply_msg =
Expand All @@ -209,10 +326,20 @@ int RdmaEndPoint::setupConnectionsByPassive(const HandShakeDesc &peer_desc,
auto segment_desc =
context_.engine().meta()->getSegmentDescByName(peer_server_name);
if (segment_desc) {
for (auto &nic : segment_desc->devices)
if (nic.name == peer_nic_name)
return doSetupConnection(nic.gid, nic.lid, peer_desc.qp_num,
&local_desc.reply_msg);
for (auto &nic : segment_desc->devices) {
if (nic.name == peer_nic_name) {
int ret = doSetupConnection(nic.gid, nic.lid, peer_desc.qp_num,
&local_desc.reply_msg);
if (ret != 0) {
// Restore UNCONNECTED state on failure if we were
// CONNECTING
if (status_.load(std::memory_order_relaxed) == CONNECTING) {
disconnectUnlocked();
}
}
return ret;
}
}
}
local_desc.reply_msg =
"Peer nic not found in that server: " + peer_nic_path_;
Expand Down Expand Up @@ -244,6 +371,7 @@ void RdmaEndPoint::disconnectUnlocked() {
wr_depth_list_[i] = 0;
}
}
peer_qp_num_list_.clear();
status_.store(UNCONNECTED, std::memory_order_release);
}

Expand Down Expand Up @@ -346,6 +474,7 @@ int RdmaEndPoint::doSetupConnection(const std::string &peer_gid,
if (ret) return ret;
}

peer_qp_num_list_ = std::move(peer_qp_num_list);
status_.store(CONNECTED, std::memory_order_relaxed);
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,7 @@ int RdmaTransport::onSetupRdmaConnections(const HandShakeDesc &peer_desc,
}
if (!context) return ERR_INVALID_ARGUMENT;

#ifdef CONFIG_ERDMA
Comment thread
caozhanhao marked this conversation as resolved.
if (context->deleteEndpoint(peer_desc.local_nic_path)) return ERR_ENDPOINT;
#endif
// Use existing endpoint or create new one.
auto endpoint = context->endpoint(peer_desc.local_nic_path);
if (!endpoint) return ERR_ENDPOINT;
return endpoint->setupConnectionsByPassive(peer_desc, local_desc);
Expand Down
Loading