Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib/xbot_monitoring/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ find_package(catkin REQUIRED COMPONENTS
grid_map_core
grid_map_ros
grid_map_msgs
xbot_rpc
)


Expand Down
1 change: 1 addition & 0 deletions src/lib/xbot_monitoring/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<depend>grid_map_core</depend>
<depend>grid_map_ros</depend>
<depend>grid_map_msgs</depend>
<depend>xbot_rpc</depend>

<!-- The export tag contains other, unspecified, tags -->
<export>
Expand Down
122 changes: 119 additions & 3 deletions src/lib/xbot_monitoring/src/xbot_monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,35 @@
#include "xbot_msgs/RobotState.h"
#include <mqtt/async_client.h>
#include <nlohmann/json.hpp>
#include <vector>
#include "geometry_msgs/Twist.h"
#include "std_msgs/String.h"
#include "xbot_msgs/RegisterActionsSrv.h"
#include "xbot_msgs/ActionInfo.h"
#include "xbot_msgs/MapOverlay.h"
#include "xbot_rpc/RpcError.h"
#include "xbot_rpc/RpcRequest.h"
#include "xbot_rpc/RpcResponse.h"
#include "xbot_rpc/constants.h"
#include "xbot_rpc/provider.h"
#include "xbot_rpc/RegisterMethodsSrv.h"

using json = nlohmann::json;
using json = nlohmann::ordered_json;

void publish_sensor_metadata();
void publish_map();
void publish_map_overlay();
void publish_actions();
void publish_version();
void rpc_request_callback(const std::string &payload);

// Stores registered actions (prefix to vector<action>)
std::map<std::string, std::vector<xbot_msgs::ActionInfo>> registered_actions;

// Stores registered RPC methods
std::map<std::string, std::vector<std::string>> registered_methods;
std::mutex registered_methods_mutex;

// Maps a topic to a subscriber.
std::map<std::string, ros::Subscriber> active_subscribers;
std::map<std::string, xbot_msgs::SensorInfo> found_sensors;
Expand All @@ -47,6 +59,7 @@ std::mutex mqtt_callback_mutex;
// Publisher for cmd_vel and commands
ros::Publisher cmd_vel_pub;
ros::Publisher action_pub;
ros::Publisher rpc_request_pub;

// properties for external mqtt
bool external_mqtt_enable = false;
Expand Down Expand Up @@ -77,6 +90,7 @@ class MqttCallback : public mqtt::callback {
client_->subscribe(this->mqtt_topic_prefix + "teleop", 0);
client_->subscribe(this->mqtt_topic_prefix + "command", 0);
client_->subscribe(this->mqtt_topic_prefix + "action", 0);
client_->subscribe(this->mqtt_topic_prefix + "rpc/request", 0);
}

public:
Expand Down Expand Up @@ -107,6 +121,9 @@ class MqttCallback : public mqtt::callback {
action_msg.data = ptr->get_payload_str();
action_pub.publish(action_msg);
// END: Deprecated code (2/2)
} else if (ptr->get_topic() == this->mqtt_topic_prefix + "rpc/request") {
std::string payload = ptr->get_payload_str();
rpc_request_callback(payload);
}
}
private:
Expand All @@ -122,6 +139,23 @@ json map_overlay;
bool has_map = false;
bool has_map_overlay = false;

xbot_rpc::RpcProvider rpc_provider("xbot_monitoring", {{
RPC_METHOD("rpc.ping", {
return "pong";
}),
RPC_METHOD("rpc.methods", {
std::lock_guard<std::mutex> lk(registered_methods_mutex);
json methods = json::array();
for (const auto& [_, method_ids] : registered_methods) {
for (const auto& method_id : method_ids) {
methods.push_back(method_id);
}
}
std::sort(methods.begin(), methods.end());
return methods;
}),
}});

void setupMqttClient() {
// setup mqtt client for app use
{
Expand Down Expand Up @@ -550,6 +584,82 @@ bool registerActions(xbot_msgs::RegisterActionsSrvRequest &req, xbot_msgs::Regis
return true;
}

void rpc_publish_error(const int16_t code, const std::string &message, const nlohmann::basic_json<> &id = nullptr) {
json err_resp = {{"jsonrpc", "2.0"},
{"error", {{"code", code}, {"message", message}}},
{"id", id}};
try_publish("rpc/response", err_resp.dump(2));
}

void rpc_request_callback(const std::string &payload) {
// Parse
json req;
try {
req = json::parse(payload);
} catch (const json::parse_error &e) {
return rpc_publish_error(xbot_rpc::RpcError::ERROR_INVALID_JSON, "Could not parse request JSON");
}

// Validate
if (!req.is_object()) {
return rpc_publish_error(xbot_rpc::RpcError::ERROR_INVALID_REQUEST, "Request is not a JSON object");
}
json id = req.contains("id") ? req["id"] : nullptr;
if (id != nullptr && !id.is_string()) {
return rpc_publish_error(xbot_rpc::RpcError::ERROR_INVALID_REQUEST, "ID is not a string", id);
} else if (!req.contains("jsonrpc") || !req["jsonrpc"].is_string() || req["jsonrpc"] != "2.0") {
return rpc_publish_error(xbot_rpc::RpcError::ERROR_INVALID_REQUEST, "Invalid JSON-RPC version");
} else if (!req.contains("method") || !req["method"].is_string()) {
return rpc_publish_error(xbot_rpc::RpcError::ERROR_INVALID_REQUEST, "Method is not a string", req["id"]);
}

// Check if the method is registered
const std::string method = req["method"];
bool is_registered = false;
{
std::lock_guard<std::mutex> lk(registered_methods_mutex);
for (const auto& [_, method_ids] : registered_methods) {
if (std::find(method_ids.begin(), method_ids.end(), method) != method_ids.end()) {
is_registered = true;
break;
}
}
}
if (!is_registered) {
return rpc_publish_error(xbot_rpc::RpcError::ERROR_METHOD_NOT_FOUND, "Method \"" + method + "\" not found", req["id"]);
}

// Forward to the providers as ROS message
xbot_rpc::RpcRequest msg;
msg.method = method;
msg.params = req.contains("params") ? req["params"].dump() : "";
msg.id = id != nullptr ? id : "";
rpc_request_pub.publish(msg);
}
Comment thread
rovo89 marked this conversation as resolved.

void rpc_response_callback(const xbot_rpc::RpcResponse::ConstPtr &msg) {
json result;
try {
result = json::parse(msg->result);
} catch (const json::parse_error &e) {
return rpc_publish_error(xbot_rpc::RpcError::ERROR_INTERNAL, "Internal error while parsing result JSON: " + std::string(e.what()), msg->id);
}

json j = {{"jsonrpc", "2.0"}, {"result", result}, {"id", msg->id}};
try_publish("rpc/response", j.dump(2));
}

void rpc_error_callback(const xbot_rpc::RpcError::ConstPtr &msg) {
rpc_publish_error(msg->code, msg->message, msg->id);
}
Comment thread
rovo89 marked this conversation as resolved.

bool register_methods(xbot_rpc::RegisterMethodsSrvRequest &req, xbot_rpc::RegisterMethodsSrvResponse &res) {
std::lock_guard<std::mutex> lk(registered_methods_mutex);
registered_methods[req.node_id] = req.methods;
ROS_INFO_STREAM("new methods registered: " << req.node_id << " registered " << req.methods.size() << " methods.");
return true;
}

int main(int argc, char **argv) {
ros::init(argc, argv, "xbot_monitoring");
has_map = false;
Expand Down Expand Up @@ -592,10 +702,16 @@ int main(int argc, char **argv) {
cmd_vel_pub = n->advertise<geometry_msgs::Twist>("xbot_monitoring/remote_cmd_vel", 1);
action_pub = n->advertise<std_msgs::String>("xbot/action", 1);

rpc_request_pub = n->advertise<xbot_rpc::RpcRequest>(xbot_rpc::TOPIC_REQUEST, 100);
ros::Subscriber rpc_response_sub = n->subscribe(xbot_rpc::TOPIC_RESPONSE, 100, rpc_response_callback);
ros::Subscriber rpc_error_sub = n->subscribe(xbot_rpc::TOPIC_ERROR, 100, rpc_error_callback);
ros::ServiceServer register_methods_service = n->advertiseService(xbot_rpc::SERVICE_REGISTER_METHODS, register_methods);

ros::AsyncSpinner spinner(1);
spinner.start();

rpc_provider.init();

ros::Rate sensor_check_rate(10.0);

boost::regex topic_regex("/xbot_monitoring/sensors/.*/info");
Expand All @@ -605,7 +721,7 @@ int main(int argc, char **argv) {
ros::master::V_TopicInfo topics;
ros::master::getTopics(topics);
std::for_each(topics.begin(), topics.end(), [&](const ros::master::TopicInfo &item) {

if (!boost::regex_match(item.name, topic_regex) || active_subscribers.count(item.name) != 0)
return;

Expand All @@ -618,7 +734,7 @@ int main(int argc, char **argv) {
// Sensor already known and sensor-info equals?
if(exist != 0 && found_sensors[topic] == *msg)
return;

{
// Sensor is new or sensor-info differ from the buffered one
std::unique_lock<std::mutex> lk(mqtt_callback_mutex);
Expand Down
2 changes: 0 additions & 2 deletions src/lib/xbot_msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ add_message_files(
Map.msg
MapArea.msg
ActionInfo.msg
ActionRequest.msg
ActionResponse.msg
MapOverlay.msg
MapOverlayPolygon.msg
)
Expand Down
3 changes: 0 additions & 3 deletions src/lib/xbot_msgs/msg/ActionRequest.msg

This file was deleted.

3 changes: 0 additions & 3 deletions src/lib/xbot_msgs/msg/ActionResponse.msg

This file was deleted.

57 changes: 57 additions & 0 deletions src/lib/xbot_rpc/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
cmake_minimum_required(VERSION 3.0.2)
project(xbot_rpc)

add_compile_options(-std=c++17)

find_package(catkin REQUIRED COMPONENTS
std_msgs
message_generation)

add_message_files(
FILES
RpcRequest.msg
RpcResponse.msg
RpcError.msg
)

add_service_files(
FILES
RegisterMethodsSrv.srv
)

generate_messages(
DEPENDENCIES std_msgs
)

catkin_package(
INCLUDE_DIRS include
LIBRARIES xbot_rpc
CATKIN_DEPENDS std_msgs message_runtime
)

add_subdirectory(../json json EXCLUDE_FROM_ALL)

include_directories(
${catkin_INCLUDE_DIRS}
include
)

add_library(xbot_rpc
src/provider.cpp
)

target_link_libraries(xbot_rpc nlohmann_json::nlohmann_json)

add_dependencies(xbot_rpc
${PROJECT_NAME}_gencpp # <-- depends on generated C++ message headers
${catkin_EXPORTED_TARGETS}
)

install(TARGETS xbot_rpc
LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION}
)

install(DIRECTORY include/${PROJECT_NAME}/
DESTINATION ${CATKIN_PACKAGE_INCLUDE_DESTINATION}
FILES_MATCHING PATTERN "*.h"
)
10 changes: 10 additions & 0 deletions src/lib/xbot_rpc/include/xbot_rpc/constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

namespace xbot_rpc {

inline constexpr const char* TOPIC_REQUEST = "/xbot/rpc/request";
inline constexpr const char* TOPIC_RESPONSE = "/xbot/rpc/response";
inline constexpr const char* TOPIC_ERROR = "/xbot/rpc/error";
inline constexpr const char* SERVICE_REGISTER_METHODS = "/xbot/rpc/register";

} // namespace xbot_rpc
62 changes: 62 additions & 0 deletions src/lib/xbot_rpc/include/xbot_rpc/provider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#pragma once

#include <ros/ros.h>
#include <xbot_rpc/RpcError.h>
#include <xbot_rpc/RpcRequest.h>
#include <xbot_rpc/RpcResponse.h>
#include <xbot_rpc/constants.h>

#include <nlohmann/json.hpp>

#define RPC_METHOD(id, body) \
{ id, [](const std::string& method, const nlohmann::basic_json<>& params) body }

namespace xbot_rpc {

typedef std::function<nlohmann::basic_json<>(const std::string& method, const nlohmann::basic_json<>& params)> callback_t;

class RpcException : public std::exception {
public:
const int16_t code;
const std::string message;

RpcException(int16_t code, const std::string& message)
: code(code), message(message) {
}

const char* what() const noexcept override {
return message.c_str();
}
};

class RpcProvider {
private:
std::string node_id;
std::map<std::string, callback_t> methods;

ros::Subscriber request_sub;
ros::Publisher response_pub;
ros::Publisher error_pub;
ros::ServiceClient registration_client;

void handleRequest(const xbot_rpc::RpcRequest::ConstPtr& request);
void publishResponse(const xbot_rpc::RpcRequest::ConstPtr& request, const nlohmann::basic_json<>& response);
void publishError(const xbot_rpc::RpcRequest::ConstPtr& request, int16_t code, const std::string& message);

public:
RpcProvider(const std::string& node_id, const std::map<std::string, callback_t>& methods = {}) : node_id(node_id), methods(methods) {}

void init();

void addMethod(const std::string& id, callback_t callback) {
methods.emplace(id, callback);
}

void addMethod(const std::pair<std::string, callback_t>& method) {
methods.insert(method);
}

void publishMethods();
};

} // namespace xbot_rpc
12 changes: 12 additions & 0 deletions src/lib/xbot_rpc/msg/RpcError.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
int16 code
string message
string id

int16 ERROR_INVALID_JSON = -32700 # Invalid JSON was received by the server.
# An error occurred on the server while parsing the JSON text.
int16 ERROR_INVALID_REQUEST = -32600 # The JSON sent is not a valid Request object.
int16 ERROR_METHOD_NOT_FOUND = -32601 # The method does not exist / is not available.
int16 ERROR_INVALID_PARAMS = -32602 # Invalid method parameter(s).
int16 ERROR_INTERNAL = -32603 # Internal JSON-RPC error.
int16 ERROR_SERVER_FIRST = -32000 # Reserved for implementation-defined server-errors.
int16 ERROR_SERVER_LAST = -32099 # Reserved for implementation-defined server-errors.
3 changes: 3 additions & 0 deletions src/lib/xbot_rpc/msg/RpcRequest.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
string method
string params
string id
Comment thread
rovo89 marked this conversation as resolved.
2 changes: 2 additions & 0 deletions src/lib/xbot_rpc/msg/RpcResponse.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
string result
string id
Comment thread
rovo89 marked this conversation as resolved.
Loading