Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 104 additions & 6 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,29 @@

#include "velox/functions/remote/client/Remote.h"

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <random>
#include <thread>

#include <folly/io/async/AsyncSocketException.h>
#include <folly/io/async/EventBase.h>
#include <thrift/lib/cpp/transport/TTransportException.h>
#include "velox/functions/remote/client/RemoteVectorFunction.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"

DEFINE_int32(
remote_function_retry_count,
3,
"Number of retries for remote function calls on transport errors");

DEFINE_int32(
remote_function_retry_max_backoff_sec,
8,
"Maximum exponential backoff in seconds for remote function retries");

namespace facebook::velox::functions {
namespace {

Expand All @@ -32,25 +49,106 @@
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteThriftVectorFunctionMetadata& metadata)
: RemoteVectorFunction(functionName, inputArgs, metadata),
functionName_(functionName),
location_(metadata.location),
thriftClient_(getThriftClient(location_, &eventBase_)) {}
client_(createClient(metadata)) {
VLOG(1) << "Created RemoteThriftFunction '" << functionName_ << "' for "
<< location_.describe();
}

std::unique_ptr<remote::RemoteFunctionResponse> invokeRemoteFunction(
const remote::RemoteFunctionRequest& request) const override {
auto remoteResponse = std::make_unique<remote::RemoteFunctionResponse>();
thriftClient_->sync_invokeFunction(*remoteResponse, request);
return remoteResponse;

int retryCount = 0;
int expIntervalSec = 1;

while (true) {
try {
VLOG(2) << "Invoking remote function '" << functionName_
<< "' (socket=" << location_.describe() << ")";

client_->invokeFunction(*remoteResponse, request);

VLOG(2) << "Remote function '" << functionName_ << "' call succeeded";
return remoteResponse;

} catch (const apache::thrift::transport::TTransportException& e) {
if (!handleRetryableError(e.what(), retryCount, expIntervalSec)) {
throw;
}
} catch (const folly::AsyncSocketException& e) {
std::string errorMsg = fmt::format(

Check warning on line 81 in velox/functions/remote/client/Remote.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "fmt::format" is directly included

Check warning on line 81 in velox/functions/remote/client/Remote.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-const-correctness

variable 'errorMsg' of type 'std::string' (aka 'basic_string<char>') can be declared 'const'
"{} (type={})", e.what(), static_cast<int>(e.getType()));
if (!handleRetryableError(errorMsg, retryCount, expIntervalSec)) {
throw;
}
}
}
}

std::string remoteLocationToString() const override {
return location_.describe();
}

private:
folly::SocketAddress location_;
folly::EventBase eventBase_;
std::unique_ptr<IRemoteFunctionClient> createClient(
const RemoteThriftVectorFunctionMetadata& metadata) {
if (metadata.clientFactory) {
clientFactory_ = metadata.clientFactory;
return clientFactory_(metadata.location, &eventBase_);
}
clientFactory_ = getDefaultRemoteFunctionClient;
return clientFactory_(metadata.location, &eventBase_);
}

std::unique_ptr<RemoteFunctionClient> thriftClient_;
// Handles retryable errors with exponential backoff.
// Returns true if retry should continue, false if retries exhausted.
bool handleRetryableError(
const std::string& errorMsg,
int& retryCount,
int& expIntervalSec) const {
LOG(ERROR) << "Transport error in remote function '" << functionName_
<< "': " << errorMsg << " (attempt=" << (retryCount + 1) << "/"
<< (FLAGS_remote_function_retry_count + 1) << ")";

if (retryCount < FLAGS_remote_function_retry_count) {
reconnectClient();
sleepWithJitter(expIntervalSec);
expIntervalSec = std::min(

Check warning on line 118 in velox/functions/remote/client/Remote.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "std::min" is directly included
expIntervalSec * 2, FLAGS_remote_function_retry_max_backoff_sec);
++retryCount;
return true;
}

LOG(ERROR) << "Remote function '" << functionName_ << "' call failed after "
<< FLAGS_remote_function_retry_count << " retries";
return false;
}

void reconnectClient() const {
LOG(WARNING) << "Reconnecting thrift client for '" << functionName_
<< "' to " << location_.describe();
client_ = clientFactory_(location_, &eventBase_);
}

void sleepWithJitter(int expIntervalSec) const {
static thread_local std::mt19937 rng(std::random_device{}());
// Use range [0.5, expIntervalSec + 0.5) to ensure meaningful backoff
std::uniform_real_distribution<double> dist(0.5, expIntervalSec + 0.5);
auto sleepIntervalSec = static_cast<long>(dist(rng));

LOG(INFO) << "Sleeping for " << sleepIntervalSec
<< " seconds before retry for '" << functionName_ << "'";
/* sleep override: intentional backoff for retry logic */
std::this_thread::sleep_for(std::chrono::seconds(sleepIntervalSec));

Check warning on line 144 in velox/functions/remote/client/Remote.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-include-cleaner

no header providing "std::chrono::seconds" is directly included
}

const std::string functionName_;
folly::SocketAddress location_;
mutable folly::EventBase eventBase_;
mutable RemoteFunctionClientFactory clientFactory_;
mutable std::unique_ptr<IRemoteFunctionClient> client_;
};

std::shared_ptr<exec::VectorFunction> createRemoteFunction(
Expand Down
6 changes: 6 additions & 0 deletions velox/functions/remote/client/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <folly/SocketAddress.h>
#include "velox/functions/remote/client/RemoteVectorFunction.h"
#include "velox/functions/remote/client/ThriftClient.h"

namespace facebook::velox::functions {

Expand All @@ -27,6 +28,11 @@ struct RemoteThriftVectorFunctionMetadata
/// Note that this can hold a network location (ip/port pair) or a unix domain
/// socket path (see SocketAddress::makeFromPath()).
folly::SocketAddress location;

/// Optional factory for creating remote function clients. If not set, the
/// default thrift client factory is used. This enables dependency injection
/// for testing with mock clients.
RemoteFunctionClientFactory clientFactory;
};

/// Registers a new remote function. It will use the meatadata defined in
Expand Down
43 changes: 43 additions & 0 deletions velox/functions/remote/client/ThriftClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,51 @@ namespace facebook::velox::functions {
using RemoteFunctionClient =
apache::thrift::Client<remote::RemoteFunctionService>;

/// Abstract interface for the remote function client, enabling dependency
/// injection and mocking in tests.
class IRemoteFunctionClient {
public:
virtual ~IRemoteFunctionClient() = default;

/// Invokes the remote function synchronously.
virtual void invokeFunction(
remote::RemoteFunctionResponse& response,
const remote::RemoteFunctionRequest& request) = 0;
};

/// Default implementation that wraps the actual thrift client.
class ThriftRemoteFunctionClient : public IRemoteFunctionClient {
public:
explicit ThriftRemoteFunctionClient(
std::unique_ptr<RemoteFunctionClient> client)
: client_(std::move(client)) {}

void invokeFunction(
remote::RemoteFunctionResponse& response,
const remote::RemoteFunctionRequest& request) override {
client_->sync_invokeFunction(response, request);
}

private:
std::unique_ptr<RemoteFunctionClient> client_;
};

/// Factory function type for creating remote function clients.
/// Parameters: location (socket address), eventBase (for async operations)
/// Returns: A unique_ptr to an IRemoteFunctionClient implementation.
using RemoteFunctionClientFactory = std::function<std::unique_ptr<
IRemoteFunctionClient>(folly::SocketAddress, folly::EventBase*)>;

std::unique_ptr<RemoteFunctionClient> getThriftClient(
folly::SocketAddress location,
folly::EventBase* eventBase);

/// Default factory that creates ThriftRemoteFunctionClient instances.
inline std::unique_ptr<IRemoteFunctionClient> getDefaultRemoteFunctionClient(
folly::SocketAddress location,
folly::EventBase* eventBase) {
return std::make_unique<ThriftRemoteFunctionClient>(
getThriftClient(location, eventBase));
}

} // namespace facebook::velox::functions
Loading
Loading