From c39814985e6f5ea4e6d306ccb1adbb43edb1aaa6 Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Wed, 11 Oct 2023 14:34:21 +0800 Subject: [PATCH 1/9] add JobManager to manage task lifecycle; first succesful build Signed-off-by: Teo Koon Peng --- .../include/nexus_capabilities/context.hpp | 61 +-- .../src/capabilities/detection.cpp | 17 +- nexus_workcell_orchestrator/CMakeLists.txt | 1 + .../src/get_result.cpp | 4 +- .../src/get_result.hpp | 6 +- nexus_workcell_orchestrator/src/job.cpp | 117 ----- nexus_workcell_orchestrator/src/job.hpp | 36 +- .../src/job_manager.cpp | 274 +++++++++++ .../src/job_manager.hpp | 99 ++++ .../src/set_result.cpp | 4 +- nexus_workcell_orchestrator/src/signals.cpp | 14 +- .../src/task_results_test.cpp | 4 +- .../src/workcell_orchestrator.cpp | 428 +++++------------- .../src/workcell_orchestrator.hpp | 13 +- 14 files changed, 562 insertions(+), 516 deletions(-) delete mode 100644 nexus_workcell_orchestrator/src/job.cpp create mode 100644 nexus_workcell_orchestrator/src/job_manager.cpp create mode 100644 nexus_workcell_orchestrator/src/job_manager.hpp diff --git a/nexus_capabilities/include/nexus_capabilities/context.hpp b/nexus_capabilities/include/nexus_capabilities/context.hpp index e85a5f44..75d5cfa4 100644 --- a/nexus_capabilities/include/nexus_capabilities/context.hpp +++ b/nexus_capabilities/include/nexus_capabilities/context.hpp @@ -20,44 +20,53 @@ #include "task.hpp" -#include -#include -#include - -#include - -#include -#include #include -#include - -#include -#include -#include +#include +#include +#include +#include //============================================================================== namespace nexus { class Context { -public: using GoalHandlePtr = - std::shared_ptr>; -public: using TaskState = nexus_orchestrator_msgs::msg::TaskState; - - // using reference to prevent circular references -public: rclcpp_lifecycle::LifecycleNode& node; -public: BT::Tree bt; -public: GoalHandlePtr goal_handle; + /** + * There are several choices we can use here, each having their own pros and cons. + * + * shared_ptr: + * + Api friendly + * - Will cause circular references + * - Cannot create an instance in the node constructor (cannot call `shared_from_this` + * in constructor) + * + * weak_ptr: + * - Will almost always cause circular reference. Most ROS apis don't support weak_ptr, + * attempting to `lock()` and passing the `shared_ptr` will almost always cause a + * circular reference. + * - Cannot create create an instance in the node constructor (cannot call `weak_from_this` + * in constructor) + * + * raw_ptr: + * + Api friendly + * - Unsafe + * + * reference: + * - Unsafe. Reference does not protect from UAF, also most ROS apis don't support references, + * those that use templates may work with either shared or raw pointers. + * We can't get a shared_ptr from a reference so we may end up using raw pointers. + * + * `shared_ptr` is chosen due to api friendliness, the consequences of circular reference + * is negligible since the node should never go out of scope. + */ +public: std::shared_ptr node; public: Task task; public: std::vector errors; -public: std::unique_ptr bt_logging; public: std::unordered_set signals; -public: TaskState task_state; -public: uint64_t tick_count = 0; -public: Context(rclcpp_lifecycle::LifecycleNode& node) - : node(node) {} +public: Context(std::shared_ptr node) + : node(std::move(node)) {} }; } // namespace nexus diff --git a/nexus_capabilities/src/capabilities/detection.cpp b/nexus_capabilities/src/capabilities/detection.cpp index fbe677e5..f14e9a4b 100644 --- a/nexus_capabilities/src/capabilities/detection.cpp +++ b/nexus_capabilities/src/capabilities/detection.cpp @@ -219,13 +219,13 @@ void DetectAllItems::_send_next_request() BT::NodeStatus GetDetection::tick() { - auto ctx = this->_ctx_mgr->current_context(); - auto detections = this->getInput( + const auto ctx = this->_ctx_mgr->current_context(); + const auto detections = this->getInput( "detections"); if (!detections) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [detections] is required", + ctx->node->get_logger(), "%s: port [detections] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -235,7 +235,7 @@ BT::NodeStatus GetDetection::tick() if (!idx && !id) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [idx] or [id] is required", + ctx->node->get_logger(), "%s: port [idx] or [id] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -251,7 +251,7 @@ BT::NodeStatus GetDetection::tick() if (it == detections->detections.cend()) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: cannot find detection with id [%s]", + ctx->node->get_logger(), "%s: cannot find detection with id [%s]", this->name().c_str(), id->c_str()); return BT::NodeStatus::FAILURE; } @@ -266,12 +266,13 @@ BT::NodeStatus GetDetection::tick() BT::NodeStatus GetDetectionPose::tick() { - auto ctx = this->_ctx_mgr->current_context(); - auto detection = this->getInput("detection"); + const auto ctx = this->_ctx_mgr->current_context(); + const auto detection = this->getInput( + "detection"); if (!detection) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [detection] is required", + ctx->node->get_logger(), "%s: port [detection] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } diff --git a/nexus_workcell_orchestrator/CMakeLists.txt b/nexus_workcell_orchestrator/CMakeLists.txt index badd69ef..d5699f13 100644 --- a/nexus_workcell_orchestrator/CMakeLists.txt +++ b/nexus_workcell_orchestrator/CMakeLists.txt @@ -29,6 +29,7 @@ endif() add_library(${PROJECT_NAME}_plugin SHARED src/get_result.cpp src/get_joint_constraints.cpp + src/job_manager.cpp src/make_transform.cpp src/serialization.cpp src/set_result.cpp diff --git a/nexus_workcell_orchestrator/src/get_result.cpp b/nexus_workcell_orchestrator/src/get_result.cpp index cbb2ea73..74686c2b 100644 --- a/nexus_workcell_orchestrator/src/get_result.cpp +++ b/nexus_workcell_orchestrator/src/get_result.cpp @@ -29,7 +29,7 @@ BT::NodeStatus GetResult::tick() if (!key) { RCLCPP_ERROR( - this->_node.get_logger(), "%s: port [key] is required", + ctx->node->get_logger(), "%s: port [key] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -41,7 +41,7 @@ BT::NodeStatus GetResult::tick() catch (const YAML::Exception& e) { RCLCPP_ERROR( - this->_node.get_logger(), "[%s]: Failed to read result [%s]", + ctx->node->get_logger(), "[%s]: Failed to read result [%s]", this->name().c_str(), e.what()); return BT::NodeStatus::FAILURE; } diff --git a/nexus_workcell_orchestrator/src/get_result.hpp b/nexus_workcell_orchestrator/src/get_result.hpp index 556f79c0..a1043eb0 100644 --- a/nexus_workcell_orchestrator/src/get_result.hpp +++ b/nexus_workcell_orchestrator/src/get_result.hpp @@ -42,14 +42,12 @@ public: static BT::PortsList providedPorts() } public: GetResult(const std::string& name, const BT::NodeConfiguration& config, - std::shared_ptr ctx_mgr, - rclcpp_lifecycle::LifecycleNode& node) - : BT::SyncActionNode{name, config}, _ctx_mgr{ctx_mgr}, _node{node} {} + std::shared_ptr ctx_mgr) + : BT::SyncActionNode{name, config}, _ctx_mgr{ctx_mgr} {} public: BT::NodeStatus tick() override; private: std::shared_ptr _ctx_mgr; -private: rclcpp_lifecycle::LifecycleNode& _node; }; } diff --git a/nexus_workcell_orchestrator/src/job.cpp b/nexus_workcell_orchestrator/src/job.cpp deleted file mode 100644 index 8dd981f0..00000000 --- a/nexus_workcell_orchestrator/src/job.cpp +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) 2023 Johnson & Johnson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "job.hpp" - -#include "get_result.hpp" -#include "make_transform.hpp" -#include "serialization.hpp" -#include "get_joint_constraints.hpp" -#include "set_result.hpp" -#include "signals.hpp" -#include "transform_pose.hpp" - -#include - -namespace nexus::workcell_orchestrator { - -Job::Job(rclcpp_lifecycle::LifecycleNode::WeakPtr node, - tf2_ros::Buffer::SharedPtr tf2_buffer) -: _node(std::move(node)), _tf2_buffer(std::move(tf2_buffer)) -{ - // create IsPauseTriggered BT node, it is a simple action node with output port "paused". - this->_bt_factory.registerSimpleAction("IsPauseTriggered", - [this](BT::TreeNode& node) - { - node.setOutput("paused", this->_paused); - return BT::NodeStatus::SUCCESS; - }, { BT::OutputPort("paused") }); - - // register PausableSequence - this->_bt_factory.registerNodeType( - "PausableSequence"); - - this->_bt_factory.registerBuilder("SetResult", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx); - }); - this->_bt_factory.registerBuilder("GetResult", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx, - *this->_node.lock()); - }); - - this->_bt_factory.registerNodeType("MakeTransform"); - this->_bt_factory.registerBuilder( - "ApplyTransform", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, - *this->_node.lock(), - this->_tf2_buffer); - }); - - this->_bt_factory.registerBuilder( - "SerializeDetections", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, - *this->_node.lock()); - }); - this->_bt_factory.registerBuilder( - "DeserializeDetections", - [this]( - const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, - *this->_node.lock()); - }); - - this->_bt_factory.registerBuilder("WaitForSignal", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx); - }); - - this->_bt_factory.registerBuilder("SetSignal", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx); - }); - - this->_bt_factory.registerBuilder( - "GetJointConstraints", - [this]( - const std::string& name, - const BT::NodeConfiguration& config) - { - return std::make_unique( - name, - config, - this->_node - ); - }); -} - -void Job::load_capability(const Capability& cap) -{ - cap -} - -} diff --git a/nexus_workcell_orchestrator/src/job.hpp b/nexus_workcell_orchestrator/src/job.hpp index f8d63c9f..c91f71db 100644 --- a/nexus_workcell_orchestrator/src/job.hpp +++ b/nexus_workcell_orchestrator/src/job.hpp @@ -18,33 +18,33 @@ #ifndef NEXUS_WORKCELL_ORCHESTRATOR__JOB_HPP #define NEXUS_WORKCELL_ORCHESTRATOR__JOB_HPP -#include -#include +#include +#include +#include +#include #include -#include -#include +#include + +#include namespace nexus::workcell_orchestrator { -class Job +struct Job { -public: Job(rclcpp_lifecycle::LifecycleNode::WeakPtr node, - tf2_ros::Buffer::SharedPtr tf2_buffer); - -public: void pause() { this->_paused = true; } -public: void unpause() { this->_paused = false; } -public: void load_capability(const Capability& cap); - -private: rclcpp_lifecycle::LifecycleNode::WeakPtr _node; -private: tf2_ros::Buffer::SharedPtr _tf2_buffer; -private: BT::BehaviorTreeFactory _bt_factory; -private: std::shared_ptr _ctx; -private: bool _paused; + using TaskState = nexus_orchestrator_msgs::msg::TaskState; + using GoalHandlePtr = + std::shared_ptr>; + + std::shared_ptr ctx; + std::optional bt; + GoalHandlePtr goal_handle; + std::optional bt_logging; + TaskState task_state = TaskState(); + uint64_t tick_count = 0; }; } #endif - diff --git a/nexus_workcell_orchestrator/src/job_manager.cpp b/nexus_workcell_orchestrator/src/job_manager.cpp new file mode 100644 index 00000000..5b7fe4a8 --- /dev/null +++ b/nexus_workcell_orchestrator/src/job_manager.cpp @@ -0,0 +1,274 @@ +/* + * Copyright (C) 2023 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "job_manager.hpp" + +namespace nexus::workcell_orchestrator { + +using TaskState = nexus_orchestrator_msgs::msg::TaskState; +using WorkcellRequest = endpoints::WorkcellRequestAction::ActionType; + +/** + * Tick the behavior tree of a job and update the task status according to the result. + */ +void JobManager::_tick_bt(Job& job) +{ + this->_ctx_mgr->set_active_context(job.ctx); + const auto bt_status = job.bt->tickRoot(); + ++job.tick_count; + this->_ctx_mgr->set_active_context(nullptr); + + // update task status according to bt status + switch (bt_status) + { + case BT::NodeStatus::RUNNING: { + job.task_state.status = TaskState::STATUS_RUNNING; + break; + } + case BT::NodeStatus::FAILURE: { + RCLCPP_INFO( + job.ctx->node->get_logger(), "Task [%s] failed", + job.ctx->task.id.c_str()); + // Publish feedback. + job.task_state.status = TaskState::STATUS_FAILED; + auto fb = std::make_shared(); + fb->state = job.task_state; + job.goal_handle->publish_feedback(std::move(fb)); + + // Abort the action request. + auto result = + std::make_shared(); + result->success = false; + result->message = "Task failed"; + job.goal_handle->abort(result); + break; + } + case BT::NodeStatus::SUCCESS: { + // Publish feedback. + job.task_state.status = TaskState::STATUS_FINISHED; + auto fb = std::make_shared(); + fb->state = job.task_state; + job.goal_handle->publish_feedback(std::move(fb)); + + auto result = + std::make_shared(); + result->success = true; + result->result = YAML::Dump(job.ctx->task.previous_results); + auto report = job.bt_logging->generate_report(); + result->message = common::ReportConverter::to_string(report); + RCLCPP_INFO(job.ctx->node->get_logger(), "%s", result->message.c_str()); + job.goal_handle->succeed(result); + break; + } + default: { + std::ostringstream oss; + oss << "Error ticking task [" << job.ctx->task.id << + "]: Behavior tree returned invalid or unknown status [" << + bt_status << "]"; + RCLCPP_ERROR_STREAM(job.ctx->node->get_logger(), oss.str()); + throw std::runtime_error(oss.str()); + } + } +} + +uint8_t JobManager::_tick_job(Job& job) +{ + auto& task_status = job.task_state.status; + switch (task_status) + { + case TaskState::STATUS_ASSIGNED: { + RCLCPP_INFO_THROTTLE( + job.ctx->node->get_logger(), + *job.ctx->node->get_clock(), + std::chrono::milliseconds(1000).count(), + "Task [%s] is pending, call \"%s\" to start this task", + job.ctx->task.id.c_str(), + endpoints::WorkcellRequestAction::action_name( + job.ctx->node->get_name()).c_str()); + break; + } + case TaskState::STATUS_QUEUED: + case TaskState::STATUS_RUNNING: { + this->_tick_bt(job); + break; + } + case TaskState::STATUS_FAILED: { + RCLCPP_WARN( + job.ctx->node->get_logger(), "Cannot tick task [%s] that has already failed", + job.ctx->task.id.c_str()); + break; + } + case TaskState::STATUS_FINISHED: { + RCLCPP_WARN( + job.ctx->node->get_logger(), "Cannot tick task [%s] that has already finished", + job.ctx->task.id.c_str()); + break; + } + default: { + RCLCPP_ERROR( + job.ctx->node->get_logger(), "Error ticking task [%s]: Unknown status [%i]", + job.ctx->task.id.c_str(), + task_status); + break; + } + } + return task_status; +} + +std::pair JobManager::assign_task( + const std::string& task_id) +{ + const auto it = + std::find_if(this->_jobs.begin(), this->_jobs.end(), [&task_id]( + const Job& j) + { + return j.ctx->task.id == task_id; + }); + if (it == this->_jobs.end()) + { + std::ostringstream oss; + oss << "Failed to assign task [" << task_id << + "]: Another task with the same id already exist"; + std::string err_msg = oss.str(); + RCLCPP_ERROR_STREAM(this->_node->get_logger(), err_msg); + return {it, err_msg}; + } + + auto& j = this->_jobs.emplace_back(Job{nullptr, std::nullopt, nullptr, + std::nullopt}); + j.task_state.task_id = j.ctx->task.id; + j.task_state.workcell_id = j.ctx->node->get_name(); + j.task_state.status = TaskState::STATUS_ASSIGNED; + + RCLCPP_INFO( + this->_node->get_logger(), "Assigned task [%s]", + task_id.c_str()); + return {it, ""}; +} + +std::string JobManager::queue_task(const GoalHandlePtr& goal_handle, + const std::shared_ptr& ctx, BT::Tree&& bt) +{ + const auto& job_id = goal_handle->get_goal()->task.id; + const auto it = + std::find_if(this->_jobs.begin(), this->_jobs.end(), [&job_id](const Job& j) + { + return j.ctx->task.id == job_id; + }); + if (it == this->_jobs.end()) + { + std::ostringstream oss; + oss << "Error queuing task [" << job_id << "]: Task not found"; + std::string err_msg = oss.str(); + RCLCPP_ERROR_STREAM(this->_node->get_logger(), err_msg); + + // abort the goal + const auto result = std::make_shared(); + result->success = false; + result->message = "Task not found"; + goal_handle->abort(result); + return "Task not found"; + } + + it->ctx = ctx; + it->bt = std::move(bt); + it->bt_logging.emplace(common::BtLogging(*it->bt, this->_node)); + it->goal_handle = goal_handle; + it->task_state.status = TaskState::STATUS_QUEUED; + return ""; +} + +std::pair +JobManager::remove_assigned_task(const std::string& task_id) +{ + const auto it = std::find_if(this->_jobs.begin(), + this->_jobs.end(), [&task_id](const Job& j) + { + return j.ctx->task.id == task_id; + }); + if (it == this->_jobs.end()) + { + std::ostringstream oss; + oss << "Failed to remove assigned task [" << task_id << "]: Task not found"; + std::string err_msg = oss.str(); + RCLCPP_WARN_STREAM(this->_node->get_logger(), err_msg); + return {it, err_msg}; + } + if (it->task_state.status != TaskState::STATUS_ASSIGNED) + { + std::ostringstream oss; + oss << "Failed to remove assigned task [" << task_id << + "]: Task is not assigned"; + std::string err_msg = oss.str(); + RCLCPP_WARN_STREAM(this->_node->get_logger(), err_msg); + return {this->_jobs.end(), err_msg}; + } + this->_jobs.erase(it); + return {it, ""}; +} + +void JobManager::halt_all_jobs() +{ + RCLCPP_INFO(this->_node->get_logger(), "Halting all tasks"); + for (auto& j : this->_jobs) + { + // halt bts that are running + if (j.task_state.status == TaskState::STATUS_RUNNING) + { + this->_ctx_mgr->set_active_context(j.ctx); + j.bt->haltTree(); + this->_ctx_mgr->set_active_context(nullptr); + + RCLCPP_INFO( + j.ctx->node->get_logger(), "Task [%s] halted", j.ctx->task.id.c_str()); + // Publish feedback. + j.task_state.status = TaskState::STATUS_FAILED; + auto fb = std::make_shared(); + fb->state = j.task_state; + j.goal_handle->publish_feedback(std::move(fb)); + + // Abort the action request. + auto result = + std::make_shared(); + result->success = false; + result->message = "Task halted"; + j.goal_handle->abort(result); + break; + } + } + + this->_jobs.clear(); +} + +void JobManager::tick() +{ + size_t i = 0; + auto it = this->_jobs.begin(); + for (auto& j = *it; i < this->_max_concurrent && it != this->_jobs.end(); + ++it, ++i) + { + const auto task_status = this->_tick_job(j); + if (task_status == TaskState::STATUS_FAILED || + task_status == TaskState::STATUS_FINISHED) + { + this->_jobs.erase(it); + continue; + } + } +} + +} diff --git a/nexus_workcell_orchestrator/src/job_manager.hpp b/nexus_workcell_orchestrator/src/job_manager.hpp new file mode 100644 index 00000000..bad266d9 --- /dev/null +++ b/nexus_workcell_orchestrator/src/job_manager.hpp @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2023 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef NEXUS_WORKCELL_ORCHESTRATOR__JOB_MANAGER_HPP +#define NEXUS_WORKCELL_ORCHESTRATOR__JOB_MANAGER_HPP + +#include "job.hpp" + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace nexus::workcell_orchestrator { + +/** + * Manage execution of jobs according to the nexus task lifecycle design. + */ +class JobManager +{ +public: using GoalHandlePtr = + std::shared_ptr>; +public: using JobIterator = std::list::iterator; + +public: JobManager(rclcpp_lifecycle::LifecycleNode::SharedPtr node, + size_t max_concurrent) + : _node(std::move(node)), _max_concurrent(max_concurrent) {} + +public: std::pair assign_task( + const std::string& task_id); + +public: std::string queue_task(const GoalHandlePtr& goal_handle, + const std::shared_ptr& ctx, BT::Tree&& bt); + +public: std::pair remove_assigned_task( + const std::string& task_id); + + /** + * Forcefully stop and clear all jobs. Jobs will be stopped as soon as possible, unlike + * `cancel_all_jobs`, the behavior trees cannot react to this event. + */ +public: void halt_all_jobs(); + + /** + * TODO: Implement on cancel bt node. + * Request to cancel all jobs, the behavior trees of the jobs can react to this event + * and stop the job gracefully. + */ +// public: void cancel_all_jobs(); + + /** + * Tick all active jobs. + */ +public: void tick(); + +public: const std::list& jobs() const + { + return this->_jobs; + } + +public: std::shared_ptr context_manager() const + { + return this->_ctx_mgr; + } + +private: rclcpp_lifecycle::LifecycleNode::SharedPtr _node; +private: size_t _max_concurrent; +private: std::list _jobs; +private: std::shared_ptr _ctx_mgr; + +private: void _tick_bt(Job& job); +private: uint8_t _tick_job(Job& job); + +}; + +} + +#endif diff --git a/nexus_workcell_orchestrator/src/set_result.cpp b/nexus_workcell_orchestrator/src/set_result.cpp index f685787b..420565ab 100644 --- a/nexus_workcell_orchestrator/src/set_result.cpp +++ b/nexus_workcell_orchestrator/src/set_result.cpp @@ -29,14 +29,14 @@ BT::NodeStatus SetResult::tick() if (!key) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [key] is required", + ctx->node->get_logger(), "%s: port [key] is required", this->name().c_str()); } auto val = this->getInput("value"); if (!val) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [value] is required", + ctx->node->get_logger(), "%s: port [value] is required", this->name().c_str()); } ctx->task.previous_results[*key] = *val; diff --git a/nexus_workcell_orchestrator/src/signals.cpp b/nexus_workcell_orchestrator/src/signals.cpp index 96725930..7ca5c80a 100644 --- a/nexus_workcell_orchestrator/src/signals.cpp +++ b/nexus_workcell_orchestrator/src/signals.cpp @@ -26,7 +26,7 @@ BT::NodeStatus WaitForSignal::onStart() if (!signal) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: [signal] port is required", + ctx->node->get_logger(), "%s: [signal] port is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -34,14 +34,14 @@ BT::NodeStatus WaitForSignal::onStart() if (already_set) { RCLCPP_DEBUG( - ctx->node.get_logger(), "%s: signal [%s] already set", + ctx->node->get_logger(), "%s: signal [%s] already set", this->name().c_str(), signal->c_str()); auto clear = this->getInput("clear"); if (clear && *clear) { ctx->signals.erase(*signal); RCLCPP_INFO( - ctx->node.get_logger(), "%s: cleared signal [%s]", + ctx->node->get_logger(), "%s: cleared signal [%s]", this->name().c_str(), signal->c_str()); } } @@ -55,7 +55,7 @@ BT::NodeStatus WaitForSignal::onRunning() if (!signal) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: [signal] port is required", + ctx->node->get_logger(), "%s: [signal] port is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -66,7 +66,7 @@ BT::NodeStatus WaitForSignal::onRunning() { ctx->signals.erase(*signal); RCLCPP_INFO( - ctx->node.get_logger(), "%s: cleared signal [%s]", + ctx->node->get_logger(), "%s: cleared signal [%s]", this->name().c_str(), signal->c_str()); } return BT::NodeStatus::SUCCESS; @@ -81,13 +81,13 @@ BT::NodeStatus SetSignal::tick() if (!signal) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [signal] is required", + ctx->node->get_logger(), "%s: port [signal] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } ctx->signals.emplace(*signal); RCLCPP_DEBUG( - ctx->node.get_logger(), "%s: set signal [%s]", + ctx->node->get_logger(), "%s: set signal [%s]", this->name().c_str(), signal->c_str()); return BT::NodeStatus::SUCCESS; } diff --git a/nexus_workcell_orchestrator/src/task_results_test.cpp b/nexus_workcell_orchestrator/src/task_results_test.cpp index 3c93b91f..e4e4e5df 100644 --- a/nexus_workcell_orchestrator/src/task_results_test.cpp +++ b/nexus_workcell_orchestrator/src/task_results_test.cpp @@ -34,7 +34,7 @@ namespace nexus::workcell_orchestrator::test { TEST_CASE("get and set results") { auto fixture = common::test::RosFixture{}; auto ctx_mgr = std::make_shared(); - auto ctx = std::make_shared(Context{*fixture.node}); + auto ctx = std::make_shared(fixture.node); ctx_mgr->set_active_context(ctx); BT::BehaviorTreeFactory bt_factory; @@ -46,7 +46,7 @@ TEST_CASE("get and set results") { bt_factory.registerBuilder("GetResult", [&](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, ctx_mgr, *fixture.node); + return std::make_unique(name, config, ctx_mgr); }); auto bt = bt_factory.createTreeFromText( diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp index 53d0f8b7..30fd8e59 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp @@ -214,7 +214,7 @@ auto WorkcellOrchestrator::on_activate( RCLCPP_INFO(this->get_logger(), "Workcell activated"); this->_bt_timer = this->create_wall_timer(BT_TICK_RATE, [this]() { - this->_tick_all_bts(); + this->_job_mgr->tick(); }); return CallbackReturn::SUCCESS; } @@ -223,8 +223,8 @@ auto WorkcellOrchestrator::on_deactivate( const rclcpp_lifecycle::State& /* previous_state */) -> CallbackReturn { RCLCPP_INFO(this->get_logger(), - "Cancelling ongoing task before deactivating workcell"); - this->_cancel_all_tasks(); + "Halting ongoing task before deactivating workcell"); + this->_job_mgr->halt_all_jobs(); this->_bt_timer->cancel(); this->_bt_timer.reset(); @@ -245,7 +245,7 @@ auto WorkcellOrchestrator::on_cleanup( this->_signal_wc_srv.reset(); this->_task_doable_srv.reset(); this->_cmd_server.reset(); - this->_ctx_mgr.reset(); + this->_job_mgr->halt_all_jobs(); RCLCPP_INFO(this->get_logger(), "Cleaned up"); return CallbackReturn::SUCCESS; } @@ -253,7 +253,7 @@ auto WorkcellOrchestrator::on_cleanup( auto WorkcellOrchestrator::_configure( const rclcpp_lifecycle::State& /* previous_state */) -> CallbackReturn { - this->_ctx_mgr = std::make_shared(); + this->_job_mgr = JobManager(this->shared_from_this(), 1); this->_bt_store.on_register_handlers.emplace_back([this](const std:: string& key, const std::filesystem::path& filepath) { @@ -274,16 +274,16 @@ auto WorkcellOrchestrator::_configure( endpoints::WorkcellRequestAction::ActionType::Goal::ConstSharedPtr goal) { RCLCPP_DEBUG(this->get_logger(), "Got workcell task request"); + const auto& jobs = this->_job_mgr->jobs(); const auto it = - std::find_if(this->_ctxs.cbegin(), this->_ctxs.cend(), - [&goal](const std::shared_ptr& ctx) + std::find_if(jobs.begin(), jobs.end(), [&goal](const Job& j) { - return ctx->task.id == goal->task.id; + return j.ctx->task.id == goal->task.id; }); - if (it != this->_ctxs.cend() && (*it)->goal_handle) + if (it == jobs.end()) { RCLCPP_ERROR(this->get_logger(), - "A task with the same id is already executing"); + "Task [%s] not assigned to this workcell", goal->task.id.c_str()); return rclcpp_action::GoalResponse::REJECT; } return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; @@ -293,26 +293,15 @@ auto WorkcellOrchestrator::_configure( { RCLCPP_DEBUG(this->get_logger(), "Got cancel request"); const auto& goal = goal_handle->get_goal(); - auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&goal](const std::shared_ptr& ctx) - { - return goal->task.id == ctx->task.id; - }); - if (it == this->_ctxs.end()) + if (!goal->task.id.empty()) { - RCLCPP_WARN(this->get_logger(), - "Fail to cancel task [%s]: task does not exist", goal->task.id.c_str()); - } - else - { - // we can just remove a task that is not running - if ((*it)->task_state.status != TaskState::STATUS_RUNNING) - { - this->_ctxs.erase(it); - } + RCLCPP_WARN( + this->get_logger(), + "Cancelling specific task is no longer supported, all tasks will be cancelled together to ensure line consistency"); } + + this->_job_mgr->halt_all_jobs(); return rclcpp_action::CancelResponse::ACCEPT; }, [this](std::shared_ptr> @@ -327,68 +316,73 @@ auto WorkcellOrchestrator::_configure( return; } - std::shared_ptr ctx = nullptr; - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&goal_handle](const std::shared_ptr& ctx) - { - return ctx->task.id == goal_handle->get_goal()->task.id; - }); - if (it != this->_ctxs.end()) - { - ctx = *it; - } - else - { - ctx = std::make_shared(*this); - } - ctx->goal_handle = goal_handle; - const auto goal = goal_handle->get_goal(); - try - { - ctx->task = this->_task_parser.parse_task(goal->task); - if (!this->_can_perform_task( - ctx->task)) - { - auto result = std::make_shared(); - result->message = "Workcell cannot perform task " + ctx->task.type; - result->success = false; - RCLCPP_ERROR_STREAM(this->get_logger(), result->message); - goal_handle->abort( - result); - return; - } - ctx->task_state.workcell_id = this->get_name(); - ctx->task_state.task_id = ctx->task.id; - ctx->bt = this->_create_bt(ctx); - ctx->bt_logging = std::make_unique(ctx->bt, - this->shared_from_this()); - } - catch (const std::exception& e) - { - std::ostringstream oss; - auto result = std::make_shared(); - oss << "Failed to create task: " << e.what(); - result->message = oss.str(); - result->success = false; - RCLCPP_ERROR_STREAM(this->get_logger(), result->message); - goal_handle->abort(result); - // make sure to clear previously queued task - this->_ctxs.erase(it); - return; - } - - RCLCPP_INFO(this->get_logger(), "Queuing task [%s]", - goal->task.id.c_str()); - ctx->task_state.status = TaskState::STATUS_QUEUED; - auto fb = std::make_shared(); - fb->state = ctx->task_state; - goal_handle->publish_feedback(fb); - - if (it == this->_ctxs.end()) - { - this->_ctxs.emplace_back(ctx); - } + auto ctx = std::make_shared(this->shared_from_this()); + ctx->task = this->_task_parser.parse_task(goal_handle->get_goal()->task); + auto bt = this->_create_bt(ctx); + this->_job_mgr->queue_task(goal_handle, ctx, std::move(bt)); + + // std::shared_ptr ctx = nullptr; + // const auto it = + // std::find_if(this->_ctxs.begin(), this->_ctxs.end(), + // [&goal_handle](const std::shared_ptr& ctx) + // { + // return ctx->task.id == goal_handle->get_goal()->task.id; + // }); + // if (it != this->_ctxs.end()) + // { + // ctx = *it; + // } + // else + // { + // ctx = std::make_shared(*this); + // } + // ctx->goal_handle = goal_handle; + // const auto goal = goal_handle->get_goal(); + // try + // { + // ctx->task = this->_task_parser.parse_task(goal->task); + // if (!this->_can_perform_task( + // ctx->task)) + // { + // auto result = std::make_shared(); + // result->message = "Workcell cannot perform task " + ctx->task.type; + // result->success = false; + // RCLCPP_ERROR_STREAM(this->get_logger(), result->message); + // goal_handle->abort( + // result); + // return; + // } + // ctx->task_state.workcell_id = this->get_name(); + // ctx->task_state.task_id = ctx->task.id; + // ctx->bt = this->_create_bt(ctx); + // ctx->bt_logging = std::make_unique(ctx->bt, + // this->shared_from_this()); + // } + // catch (const std::exception& e) + // { + // std::ostringstream oss; + // auto result = std::make_shared(); + // oss << "Failed to create task: " << e.what(); + // result->message = oss.str(); + // result->success = false; + // RCLCPP_ERROR_STREAM(this->get_logger(), result->message); + // goal_handle->abort(result); + // // make sure to clear previously queued task + // this->_ctxs.erase(it); + // return; + // } + + // RCLCPP_INFO(this->get_logger(), "Queuing task [%s]", + // goal->task.id.c_str()); + // ctx->task_state.status = TaskState::STATUS_QUEUED; + // auto fb = std::make_shared(); + // fb->state = ctx->task_state; + // goal_handle->publish_feedback(fb); + + // if (it == this->_ctxs.end()) + // { + // this->_ctxs.emplace_back(ctx); + // } }); this->_wc_state_pub = @@ -441,28 +435,16 @@ auto WorkcellOrchestrator::_configure( ConstSharedPtr req, endpoints::QueueWorkcellTaskService::ServiceType::Response::SharedPtr resp) { - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&req](const std::shared_ptr& ctx) - { - return ctx->task.id == req->task_id; - }); - if (it != this->_ctxs.end()) + const auto [_job, err] = this->_job_mgr->assign_task(req->task_id); + if (!err.empty()) { + resp->message = err; resp->success = false; - resp->message = "A task with the same id already exists"; - RCLCPP_INFO(this->get_logger(), "Failed to queue task [%s]: %s", - req->task_id.c_str(), resp->message.c_str()); return; } - const auto& ctx = - this->_ctxs.emplace_back(std::make_shared(*this)); - ctx->task.id = req->task_id; - ctx->task_state.task_id = req->task_id; - ctx->task_state.workcell_id = this->get_name(); - ctx->task_state.status = TaskState::STATUS_ASSIGNED; + resp->success = true; - RCLCPP_INFO(this->get_logger(), "queued task %s", ctx->task.id.c_str()); + return; }); this->_remove_pending_task_srv = @@ -474,24 +456,16 @@ auto WorkcellOrchestrator::_configure( { RCLCPP_DEBUG(this->get_logger(), "received request to remove pending task [%s]", req->task_id.c_str()); - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&req](const std::shared_ptr& ctx) - { - return ctx->task.id == req->task_id; - }); - if (it == this->_ctxs.end() || - (*it)->task_state.status != TaskState::STATUS_ASSIGNED) + const auto [_job, + err] = this->_job_mgr->remove_assigned_task(req->task_id); + if (!err.empty()) { resp->success = false; - resp->message = "task does not exist or is not pending"; - RCLCPP_DEBUG_STREAM(this->get_logger(), resp->message); + resp->message = err; return; } - this->_ctxs.erase(it); - RCLCPP_INFO(this->get_logger(), "removed task [%s]", - req->task_id.c_str()); resp->success = true; + return; }); this->_tf2_buffer = std::make_shared(this->get_clock()); @@ -561,12 +535,14 @@ auto WorkcellOrchestrator::_configure( this->_bt_factory->registerBuilder("SetResult", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerBuilder("GetResult", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr, *this); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerNodeType("MakeTransform"); @@ -601,13 +577,15 @@ auto WorkcellOrchestrator::_configure( this->_bt_factory->registerBuilder("WaitForSignal", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerBuilder("SetSignal", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerBuilder( @@ -646,7 +624,8 @@ auto WorkcellOrchestrator::_configure( cap_id.c_str() ); cap->configure( - this->shared_from_this(), this->_ctx_mgr, *_bt_factory); + this->shared_from_this(), this->_job_mgr->context_manager(), + *_bt_factory); } } catch (const CapabilityError& e) @@ -683,165 +662,6 @@ auto WorkcellOrchestrator::_configure( return CallbackReturn::SUCCESS; } -void WorkcellOrchestrator::_tick_bt(const std::shared_ptr& ctx) -{ - if (ctx->task_state.status == TaskState::STATUS_FINISHED || - ctx->task_state.status == TaskState::STATUS_FAILED) - { - RCLCPP_WARN( - this->get_logger(), "Not executing task [%s] that is already finished", - ctx->task.id.c_str()); - return; - } - - this->_ctx_mgr->set_active_context(ctx); - const auto& goal_handle = ctx->goal_handle; - ++ctx->tick_count; - - if (ctx->task_state.status == TaskState::STATUS_ASSIGNED) - { - // only print on first tick - if (ctx->tick_count == 1) - { - RCLCPP_INFO( - this->get_logger(), "Task [%s] is pending, call \"%s\" to start this task", - ctx->task.id.c_str(), - endpoints::WorkcellRequestAction::action_name(this->get_name()).c_str()); - } - return; - } - - if (ctx->task_state.status == TaskState::STATUS_QUEUED) - { - RCLCPP_INFO(this->get_logger(), "Starting task [%s]", ctx->task.id.c_str()); - ctx->task_state.status = TaskState::STATUS_RUNNING; - auto fb = std::make_shared(); - fb->state = ctx->task_state; - goal_handle->publish_feedback(fb); - } - - if (ctx->task_state.status == TaskState::STATUS_RUNNING) - { - try - { - switch (ctx->bt.tickRoot()) - { - case BT::NodeStatus::RUNNING: - return; - case BT::NodeStatus::SUCCESS: { - this->_handle_command_success(ctx); - return; - } - case BT::NodeStatus::FAILURE: { - this->_handle_command_failed(ctx); - return; - } - default: { - RCLCPP_ERROR( - this->get_logger(), - "Failed executing task [%s]: unexpected BT status", - ctx->task.id.c_str()); - this->_handle_command_failed(ctx); - return; - } - } - } - catch (const std::exception& e) - { - RCLCPP_ERROR(this->get_logger(), "BT failed with exception: %s", - e.what()); - ctx->bt.haltTree(); - this->_handle_command_failed(ctx); - return; - } - } - - RCLCPP_WARN( - this->get_logger(), "Failed executing task [%s]: unexpected status [%u]", - ctx->task.id.c_str(), ctx->task_state.status); - ctx->bt.haltTree(); - this->_handle_command_failed(ctx); - return; -} - -void WorkcellOrchestrator::_tick_all_bts() -{ - // cancelling an ongoing task may leave the workcell in a undetermined state - // so we must cancel all tasks when an ongoing task is cancelled. - for (const auto& ctx : this->_ctxs) - { - if (ctx->task_state.status == TaskState::STATUS_RUNNING && - ctx->goal_handle->is_canceling()) - { - RCLCPP_WARN( - this->get_logger(), "Cancelling all tasks because an ongoing task [%s] is being cancelled", - ctx->task.id.c_str()); - // NOTE: iterators are invalidated, it is important to - // not access them after this line. - this->_cancel_all_tasks(); - break; - } - } - - size_t count = 0; - auto it = this->_ctxs.begin(); - for (; count < MAX_PARALLEL_TASK && it != this->_ctxs.end(); - ++count, ++it) - { - this->_tick_bt(*it); - const auto task_status = (*it)->task_state.status; - // `_tick_bt` returns true if the task is finished - if (task_status == TaskState::STATUS_FINISHED || - task_status == TaskState::STATUS_FAILED) - { - // NOTE: iterator is invalidated, it is important to - // not access it after this line. - this->_ctxs.erase(it); - } - // cancel all other tasks when any task fails. note that the failing task is - // removed from the list first because we don't want to cancel an already - // failed task. - if (task_status == TaskState::STATUS_FAILED) - { - this->_cancel_all_tasks(); - break; - } - } - - auto new_state = this->_cur_state; - // check if status has changed and publish new state - if (this->_ctxs.size() > 0) - { - new_state.status = WorkcellState::STATUS_BUSY; - new_state.task_id = this->_ctxs.front()->task.id; - } - else if (this->_ctxs.size() == 0) - { - new_state.status = WorkcellState::STATUS_IDLE; - new_state.task_id = ""; - } - if (new_state.task_id != this->_cur_state.task_id || - new_state.status != this->_cur_state.status) - { - this->_cur_state = new_state; - this->_wc_state_pub->publish(this->_cur_state); - } -} - -void WorkcellOrchestrator::_cancel_all_tasks() -{ - for (const auto& ctx: this->_ctxs) - { - RCLCPP_INFO(this->get_logger(), "Canceling task [%s]", - ctx->task.id.c_str()); - this->_ctx_mgr->set_active_context(ctx); - ctx->bt.haltTree(); - this->_handle_command_failed(ctx); - } - this->_ctxs.clear(); - return; -} - void WorkcellOrchestrator::_register() { if (!this->_lifecycle_mgr->changeStateForAllNodes( @@ -929,22 +749,23 @@ void WorkcellOrchestrator::_process_signal( endpoints::SignalWorkcellService::ServiceType::Request::ConstSharedPtr req, endpoints::SignalWorkcellService::ServiceType::Response::SharedPtr resp) { - for (const auto& ctx : this->_ctxs) + for (const auto& job : this->_job_mgr->jobs()) { - if (ctx->task.id == req->task_id) + if (job.ctx->task.id == req->task_id) { RCLCPP_INFO( this->get_logger(), "Received signal [%s] for task [%s] from system orchestrator", req->signal.c_str(), req->task_id.c_str()); - ctx->signals.emplace(req->signal); + job.ctx->signals.emplace(req->signal); resp->success = true; + return; } } std::ostringstream oss; - oss << "Received signal [" << req->signal << "] for task [" << req->task_id << - "] which is not currently running."; + oss << "Failed to set signal signal [" << req->signal << "] for task [" << + req->task_id << "]: Task not found"; resp->message = oss.str(); RCLCPP_WARN_STREAM(this->get_logger(), resp->message); resp->success = false; @@ -953,41 +774,10 @@ void WorkcellOrchestrator::_process_signal( BT::Tree WorkcellOrchestrator::_create_bt(const std::shared_ptr& ctx) { // To keep things simple, the task type is used as the key for the behavior tree to use. - this->_ctx_mgr->set_active_context(ctx); return this->_bt_factory->createTreeFromFile(this->_bt_store.get_bt( ctx->task.type)); } -void WorkcellOrchestrator::_handle_command_success( - const std::shared_ptr& ctx) -{ - RCLCPP_INFO(this->get_logger(), "Task finished successfully"); - ctx->task_state.status = TaskState::STATUS_FINISHED; - auto result = - std::make_shared(); - result->success = true; - result->result = YAML::Dump(ctx->task.previous_results); - auto report = ctx->bt_logging->generate_report(); - result->message = common::ReportConverter::to_string(report); - RCLCPP_INFO(this->get_logger(), "%s", result->message.c_str()); - ctx->goal_handle->succeed(result); -} - -void WorkcellOrchestrator::_handle_command_failed( - const std::shared_ptr& ctx) -{ - RCLCPP_ERROR(this->get_logger(), "Task failed"); - ctx->task_state.status = TaskState::STATUS_FAILED; - // Publish feedback. - auto fb = std::make_shared(); - fb->state = ctx->task_state; - ctx->goal_handle->publish_feedback(std::move(fb)); - // Abort the action request. - auto result = - std::make_shared(); - ctx->goal_handle->abort(result); -} - void WorkcellOrchestrator::_handle_task_doable( endpoints::IsTaskDoableService::ServiceType::Request::ConstSharedPtr req, endpoints::IsTaskDoableService::ServiceType::Response::SharedPtr resp) diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp index 8e2b539d..12b8717c 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp @@ -18,6 +18,7 @@ #ifndef NEXUS_WORKCELL_ORCHESTRATOR__WORKCELL_ORCHESTRATOR_HPP #define NEXUS_WORKCELL_ORCHESTRATOR__WORKCELL_ORCHESTRATOR_HPP +#include "job_manager.hpp" #include "task_parser.hpp" #include @@ -119,28 +120,18 @@ private: rclcpp::Service:: SharedPtr _remove_pending_task_srv; private: std::atomic _paused; -private: std::list> _ctxs; -private: std::shared_ptr _ctx_mgr; private: std::unique_ptr> _lifecycle_mgr{ nullptr}; private: std::filesystem::path _bt_path; // mapping of mapped task type and the original private: std::unordered_map _task_remaps; private: TaskParser _task_parser; +private: std::optional _job_mgr; private: CallbackReturn _configure( const rclcpp_lifecycle::State& previous_state); - /** - * Tick a behavior tree. - */ -private: void _tick_bt(const std::shared_ptr& ctx); - -private: void _tick_all_bts(); - -private: void _cancel_all_tasks(); - /** * Register this workcell to the system orchestrator. */ From 4a064385cb1c4cd996dbdc490f3efbfa7fa8e392 Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Wed, 11 Oct 2023 14:36:45 +0800 Subject: [PATCH 2/9] fix copyright Signed-off-by: Teo Koon Peng --- nexus_workcell_orchestrator/src/job_manager.cpp | 2 +- nexus_workcell_orchestrator/src/job_manager.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nexus_workcell_orchestrator/src/job_manager.cpp b/nexus_workcell_orchestrator/src/job_manager.cpp index 5b7fe4a8..06bb644e 100644 --- a/nexus_workcell_orchestrator/src/job_manager.cpp +++ b/nexus_workcell_orchestrator/src/job_manager.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Open Source Robotics Foundation + * Copyright (C) 2023 Johnson & Johnson * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/nexus_workcell_orchestrator/src/job_manager.hpp b/nexus_workcell_orchestrator/src/job_manager.hpp index bad266d9..d73145d0 100644 --- a/nexus_workcell_orchestrator/src/job_manager.hpp +++ b/nexus_workcell_orchestrator/src/job_manager.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Open Source Robotics Foundation + * Copyright (C) 2023 Johnson & Johnson * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 48cef6f7865ad3bfd8e07711958f28ee00c0ffd9 Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Wed, 11 Oct 2023 17:38:47 +0800 Subject: [PATCH 3/9] add tests Signed-off-by: Teo Koon Peng --- nexus_workcell_orchestrator/CMakeLists.txt | 5 +- .../src/job_manager.cpp | 70 +++---- .../src/job_manager.hpp | 31 ++- .../src/job_manager_test.cpp | 177 ++++++++++++++++++ .../src/transform_pose_test.cpp | 1 - .../src/workcell_orchestrator.cpp | 106 +++-------- 6 files changed, 257 insertions(+), 133 deletions(-) create mode 100644 nexus_workcell_orchestrator/src/job_manager_test.cpp diff --git a/nexus_workcell_orchestrator/CMakeLists.txt b/nexus_workcell_orchestrator/CMakeLists.txt index d5699f13..57caf7c0 100644 --- a/nexus_workcell_orchestrator/CMakeLists.txt +++ b/nexus_workcell_orchestrator/CMakeLists.txt @@ -92,7 +92,7 @@ if(BUILD_TESTING) # Adds a ament catch2 test with some common libraries. # Usage: nexus_add_test( ...) function(nexus_add_test target) - ament_add_catch2(${target} ${ARGN}) + ament_add_catch2(${target} src/main_test.cpp ${ARGN}) target_link_libraries(${target} PRIVATE ${PROJECT_NAME}_plugin nexus_common::nexus_common_test @@ -100,14 +100,13 @@ if(BUILD_TESTING) endfunction() nexus_add_test(serialization_tests - src/main_test.cpp src/serialization_test.cpp ) nexus_add_test(task_results_tests - src/main_test.cpp src/task_results_test.cpp ) nexus_add_test(transform_pose_test src/transform_pose_test.cpp) + nexus_add_test(job_manager_test src/job_manager_test.cpp) endif() ament_package() diff --git a/nexus_workcell_orchestrator/src/job_manager.cpp b/nexus_workcell_orchestrator/src/job_manager.cpp index 06bb644e..9a77555b 100644 --- a/nexus_workcell_orchestrator/src/job_manager.cpp +++ b/nexus_workcell_orchestrator/src/job_manager.cpp @@ -92,13 +92,13 @@ uint8_t JobManager::_tick_job(Job& job) { case TaskState::STATUS_ASSIGNED: { RCLCPP_INFO_THROTTLE( - job.ctx->node->get_logger(), - *job.ctx->node->get_clock(), + this->_node->get_logger(), + *this->_node->get_clock(), std::chrono::milliseconds(1000).count(), "Task [%s] is pending, call \"%s\" to start this task", - job.ctx->task.id.c_str(), + job.task_state.task_id.c_str(), endpoints::WorkcellRequestAction::action_name( - job.ctx->node->get_name()).c_str()); + this->_node->get_name()).c_str()); break; } case TaskState::STATUS_QUEUED: @@ -129,59 +129,49 @@ uint8_t JobManager::_tick_job(Job& job) return task_status; } -std::pair JobManager::assign_task( +Job& JobManager::assign_task( const std::string& task_id) { const auto it = std::find_if(this->_jobs.begin(), this->_jobs.end(), [&task_id]( const Job& j) { - return j.ctx->task.id == task_id; + return j.task_state.task_id == task_id; }); - if (it == this->_jobs.end()) + if (it != this->_jobs.end()) { - std::ostringstream oss; - oss << "Failed to assign task [" << task_id << - "]: Another task with the same id already exist"; - std::string err_msg = oss.str(); - RCLCPP_ERROR_STREAM(this->_node->get_logger(), err_msg); - return {it, err_msg}; + throw JobError("Another task with the same id already exist"); } auto& j = this->_jobs.emplace_back(Job{nullptr, std::nullopt, nullptr, std::nullopt}); - j.task_state.task_id = j.ctx->task.id; - j.task_state.workcell_id = j.ctx->node->get_name(); + j.task_state.task_id = task_id; + j.task_state.workcell_id = this->_node->get_name(); j.task_state.status = TaskState::STATUS_ASSIGNED; RCLCPP_INFO( this->_node->get_logger(), "Assigned task [%s]", task_id.c_str()); - return {it, ""}; + return j; } -std::string JobManager::queue_task(const GoalHandlePtr& goal_handle, +Job& JobManager::queue_task(const GoalHandlePtr& goal_handle, const std::shared_ptr& ctx, BT::Tree&& bt) { const auto& job_id = goal_handle->get_goal()->task.id; const auto it = std::find_if(this->_jobs.begin(), this->_jobs.end(), [&job_id](const Job& j) { - return j.ctx->task.id == job_id; + return j.task_state.task_id == job_id; }); if (it == this->_jobs.end()) { - std::ostringstream oss; - oss << "Error queuing task [" << job_id << "]: Task not found"; - std::string err_msg = oss.str(); - RCLCPP_ERROR_STREAM(this->_node->get_logger(), err_msg); - - // abort the goal + // abort the goal and throw error const auto result = std::make_shared(); result->success = false; result->message = "Task not found"; goal_handle->abort(result); - return "Task not found"; + throw JobError("Task not found"); } it->ctx = ctx; @@ -189,11 +179,10 @@ std::string JobManager::queue_task(const GoalHandlePtr& goal_handle, it->bt_logging.emplace(common::BtLogging(*it->bt, this->_node)); it->goal_handle = goal_handle; it->task_state.status = TaskState::STATUS_QUEUED; - return ""; + return *it; } -std::pair -JobManager::remove_assigned_task(const std::string& task_id) +void JobManager::remove_assigned_task(const std::string& task_id) { const auto it = std::find_if(this->_jobs.begin(), this->_jobs.end(), [&task_id](const Job& j) @@ -202,23 +191,13 @@ JobManager::remove_assigned_task(const std::string& task_id) }); if (it == this->_jobs.end()) { - std::ostringstream oss; - oss << "Failed to remove assigned task [" << task_id << "]: Task not found"; - std::string err_msg = oss.str(); - RCLCPP_WARN_STREAM(this->_node->get_logger(), err_msg); - return {it, err_msg}; + return; } if (it->task_state.status != TaskState::STATUS_ASSIGNED) { - std::ostringstream oss; - oss << "Failed to remove assigned task [" << task_id << - "]: Task is not assigned"; - std::string err_msg = oss.str(); - RCLCPP_WARN_STREAM(this->_node->get_logger(), err_msg); - return {this->_jobs.end(), err_msg}; + throw JobError("Task is not assigned"); } this->_jobs.erase(it); - return {it, ""}; } void JobManager::halt_all_jobs() @@ -258,16 +237,19 @@ void JobManager::tick() { size_t i = 0; auto it = this->_jobs.begin(); - for (auto& j = *it; i < this->_max_concurrent && it != this->_jobs.end(); - ++it, ++i) + for (; i < this->_max_concurrent && it != this->_jobs.end(); ++i) { - const auto task_status = this->_tick_job(j); + const auto task_status = this->_tick_job(*it); if (task_status == TaskState::STATUS_FAILED || task_status == TaskState::STATUS_FINISHED) { - this->_jobs.erase(it); + it = this->_jobs.erase(it); continue; } + else + { + ++it; + } } } diff --git a/nexus_workcell_orchestrator/src/job_manager.hpp b/nexus_workcell_orchestrator/src/job_manager.hpp index d73145d0..d00209a7 100644 --- a/nexus_workcell_orchestrator/src/job_manager.hpp +++ b/nexus_workcell_orchestrator/src/job_manager.hpp @@ -26,14 +26,19 @@ #include #include +#include #include #include #include #include -#include namespace nexus::workcell_orchestrator { +class JobError : public std::runtime_error +{ +public: using std::runtime_error::runtime_error; +}; + /** * Manage execution of jobs according to the nexus task lifecycle design. */ @@ -47,14 +52,25 @@ public: JobManager(rclcpp_lifecycle::LifecycleNode::SharedPtr node, size_t max_concurrent) : _node(std::move(node)), _max_concurrent(max_concurrent) {} -public: std::pair assign_task( - const std::string& task_id); + /** + * Assigns a task. + * @throw JobError + */ +public: Job& assign_task(const std::string& task_id); -public: std::string queue_task(const GoalHandlePtr& goal_handle, + /** + * Queue a task. The task must already be assigned. If there is an error, the goal will + * immediately be aborted. + * @throw JobError + */ +public: Job& queue_task(const GoalHandlePtr& goal_handle, const std::shared_ptr& ctx, BT::Tree&& bt); -public: std::pair remove_assigned_task( - const std::string& task_id); + /** + * Remove a assigned task. + * @throw JobError + */ +public: void remove_assigned_task(const std::string& task_id); /** * Forcefully stop and clear all jobs. Jobs will be stopped as soon as possible, unlike @@ -87,7 +103,8 @@ public: std::shared_ptr context_manager() const private: rclcpp_lifecycle::LifecycleNode::SharedPtr _node; private: size_t _max_concurrent; private: std::list _jobs; -private: std::shared_ptr _ctx_mgr; +private: std::shared_ptr _ctx_mgr = + std::make_shared(); private: void _tick_bt(Job& job); private: uint8_t _tick_job(Job& job); diff --git a/nexus_workcell_orchestrator/src/job_manager_test.cpp b/nexus_workcell_orchestrator/src/job_manager_test.cpp new file mode 100644 index 00000000..20282c7f --- /dev/null +++ b/nexus_workcell_orchestrator/src/job_manager_test.cpp @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2023 Johnson & Johnson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "job_manager.hpp" + +#include + +#include + +namespace nexus::workcell_orchestrator::test { + +using nexus_orchestrator_msgs::msg::TaskState; +using nexus_orchestrator_msgs::action::WorkcellTask; + +TEST_CASE("JobManager") { + auto fixture = common::test::RosFixture(); + BT::BehaviorTreeFactory bt_factory; + JobManager job_mgr(fixture.node, 2); + + std::function handle_accepted = + [](const JobManager::GoalHandlePtr&) {}; + + const auto server = rclcpp_action::create_server(fixture.node, + "test", + [](const rclcpp_action::GoalUUID&, + const endpoints::WorkcellRequestAction::ActionType::Goal::ConstSharedPtr&) + { + return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; + }, + [](const JobManager::GoalHandlePtr&) + { + return rclcpp_action::CancelResponse::REJECT; + }, [&](const JobManager::GoalHandlePtr& goal_handle) + { + handle_accepted(goal_handle); + }); + const auto client = rclcpp_action::create_client(fixture.node, + "test"); + + std::string bt = + R"( + + + + + + +)"; + + fixture.spin_in_background(); + + SECTION("cannot assign same task twice") { + CHECK_NOTHROW(job_mgr.assign_task("test")); + CHECK_THROWS_AS(job_mgr.assign_task("test"), JobError); + } + + SECTION("cannot queue task that is not assigned") { + handle_accepted = [&](const JobManager::GoalHandlePtr& goal_handle) + { + CHECK_THROWS_AS(job_mgr.queue_task(goal_handle, nullptr, + bt_factory.createTreeFromText(bt)), JobError); + }; + + WorkcellTask::Goal goal; + goal.task.id = "test"; + std::promise done; + decltype(client)::element_type::SendGoalOptions goal_opts; + goal_opts.result_callback = + [&](const rclcpp_action::ClientGoalHandle::WrappedResult&) + { + done.set_value(); + }; + client->async_send_goal(goal, goal_opts); + REQUIRE(done.get_future().wait_for(std::chrono::seconds( + 1)) == std::future_status::ready); + } + + SECTION("assign task sets job data correctly") { + Job* job; + job = &job_mgr.assign_task("test"); + CHECK(!job->bt.has_value()); + CHECK(!job->bt_logging.has_value()); + CHECK(job->ctx == nullptr); + CHECK(job->goal_handle == nullptr); + CHECK(job->task_state.task_id == "test"); + CHECK(job->task_state.workcell_id == fixture.node->get_name()); + CHECK(job->task_state.status == TaskState::STATUS_ASSIGNED); + CHECK(job->tick_count == 0); + + SECTION("ticking assigned but unqueued task is a noop") { + job_mgr.tick(); + CHECK(job->tick_count == 0); + } + + SECTION("can queue an assigned task") { + handle_accepted = [&](const JobManager::GoalHandlePtr& goal_handle) + { + auto ctx = std::make_shared(fixture.node); + CHECK(&job_mgr.queue_task(goal_handle, ctx, + bt_factory.createTreeFromText(bt)) == job); + CHECK(job->bt.has_value()); + CHECK(job->bt_logging.has_value()); + REQUIRE(job->ctx == ctx); + CHECK(job->goal_handle == goal_handle); + CHECK(job->task_state.task_id == "test"); + CHECK(job->task_state.status == TaskState::STATUS_QUEUED); + CHECK(job->tick_count == 0); + goal_handle->succeed(std::make_shared()); + }; + + WorkcellTask::Goal goal; + goal.task.id = "test"; + std::promise done; + decltype(client)::element_type::SendGoalOptions goal_opts; + goal_opts.result_callback = + [&](const rclcpp_action::ClientGoalHandle::WrappedResult&) + { + done.set_value(); + }; + client->async_send_goal(goal, goal_opts); + REQUIRE(done.get_future().wait_for(std::chrono::seconds( + 1)) == std::future_status::ready); + } + } + + SECTION("tick at most max_concurrent jobs") { + std::vector task_ids{"test1", "test2", "test3"}; + for (const auto& task_id : task_ids) + { + job_mgr.assign_task(task_id); + } + + std::promise done; + size_t queued = 0; + handle_accepted = [&](const JobManager::GoalHandlePtr& goal_handle) + { + auto ctx = std::make_shared(fixture.node); + REQUIRE_NOTHROW(job_mgr.queue_task(goal_handle, ctx, + bt_factory.createTreeFromText(bt))); + if (++queued == task_ids.size()) + { + done.set_value(); + } + }; + + // queue all tasks + for (const auto& task_id : task_ids) + { + WorkcellTask::Goal goal; + goal.task.id = task_id; + decltype(client)::element_type::SendGoalOptions goal_opts; + client->async_send_goal(goal, goal_opts); + } + REQUIRE(done.get_future().wait_for(std::chrono::seconds( + 1)) == std::future_status::ready); + + job_mgr.tick(); + // test1 and test2 should be finished + CHECK(job_mgr.jobs().size() == 1); + } +} + +} diff --git a/nexus_workcell_orchestrator/src/transform_pose_test.cpp b/nexus_workcell_orchestrator/src/transform_pose_test.cpp index 2a10960a..2d688ede 100644 --- a/nexus_workcell_orchestrator/src/transform_pose_test.cpp +++ b/nexus_workcell_orchestrator/src/transform_pose_test.cpp @@ -15,7 +15,6 @@ * */ -#define CATCH_CONFIG_MAIN #include #include "transform_pose.hpp" diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp index 30fd8e59..d7d1f1d7 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp @@ -319,70 +319,15 @@ auto WorkcellOrchestrator::_configure( auto ctx = std::make_shared(this->shared_from_this()); ctx->task = this->_task_parser.parse_task(goal_handle->get_goal()->task); auto bt = this->_create_bt(ctx); - this->_job_mgr->queue_task(goal_handle, ctx, std::move(bt)); - - // std::shared_ptr ctx = nullptr; - // const auto it = - // std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - // [&goal_handle](const std::shared_ptr& ctx) - // { - // return ctx->task.id == goal_handle->get_goal()->task.id; - // }); - // if (it != this->_ctxs.end()) - // { - // ctx = *it; - // } - // else - // { - // ctx = std::make_shared(*this); - // } - // ctx->goal_handle = goal_handle; - // const auto goal = goal_handle->get_goal(); - // try - // { - // ctx->task = this->_task_parser.parse_task(goal->task); - // if (!this->_can_perform_task( - // ctx->task)) - // { - // auto result = std::make_shared(); - // result->message = "Workcell cannot perform task " + ctx->task.type; - // result->success = false; - // RCLCPP_ERROR_STREAM(this->get_logger(), result->message); - // goal_handle->abort( - // result); - // return; - // } - // ctx->task_state.workcell_id = this->get_name(); - // ctx->task_state.task_id = ctx->task.id; - // ctx->bt = this->_create_bt(ctx); - // ctx->bt_logging = std::make_unique(ctx->bt, - // this->shared_from_this()); - // } - // catch (const std::exception& e) - // { - // std::ostringstream oss; - // auto result = std::make_shared(); - // oss << "Failed to create task: " << e.what(); - // result->message = oss.str(); - // result->success = false; - // RCLCPP_ERROR_STREAM(this->get_logger(), result->message); - // goal_handle->abort(result); - // // make sure to clear previously queued task - // this->_ctxs.erase(it); - // return; - // } - - // RCLCPP_INFO(this->get_logger(), "Queuing task [%s]", - // goal->task.id.c_str()); - // ctx->task_state.status = TaskState::STATUS_QUEUED; - // auto fb = std::make_shared(); - // fb->state = ctx->task_state; - // goal_handle->publish_feedback(fb); - - // if (it == this->_ctxs.end()) - // { - // this->_ctxs.emplace_back(ctx); - // } + try + { + this->_job_mgr->queue_task(goal_handle, ctx, std::move(bt)); + } + catch (const JobError& e) + { + RCLCPP_ERROR(this->get_logger(), "Failed to queue task [%s]: %s", + goal_handle->get_goal()->task.id.c_str(), e.what()); + } }); this->_wc_state_pub = @@ -435,16 +380,18 @@ auto WorkcellOrchestrator::_configure( ConstSharedPtr req, endpoints::QueueWorkcellTaskService::ServiceType::Response::SharedPtr resp) { - const auto [_job, err] = this->_job_mgr->assign_task(req->task_id); - if (!err.empty()) + try + { + this->_job_mgr->assign_task(req->task_id); + resp->success = true; + } + catch (const JobError& e) { - resp->message = err; resp->success = false; - return; + resp->message = e.what(); + RCLCPP_ERROR(this->get_logger(), "Failed to assign task [%s]: %s", + req->task_id.c_str(), e.what()); } - - resp->success = true; - return; }); this->_remove_pending_task_srv = @@ -456,16 +403,19 @@ auto WorkcellOrchestrator::_configure( { RCLCPP_DEBUG(this->get_logger(), "received request to remove pending task [%s]", req->task_id.c_str()); - const auto [_job, - err] = this->_job_mgr->remove_assigned_task(req->task_id); - if (!err.empty()) + try + { + this->_job_mgr->remove_assigned_task(req->task_id); + resp->success = true; + } + catch (const JobError& e) { + RCLCPP_WARN(this->get_logger(), + "Failed to remove assigned task [%s]: %s", req->task_id.c_str(), + e.what()); resp->success = false; - resp->message = err; - return; + resp->message = e.what(); } - resp->success = true; - return; }); this->_tf2_buffer = std::make_shared(this->get_clock()); From 613410657856c521b9c0b24816390b97043cabea Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Wed, 11 Oct 2023 17:42:18 +0800 Subject: [PATCH 4/9] add tests Signed-off-by: Teo Koon Peng --- nexus_workcell_orchestrator/src/job_manager_test.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nexus_workcell_orchestrator/src/job_manager_test.cpp b/nexus_workcell_orchestrator/src/job_manager_test.cpp index 20282c7f..21cf583f 100644 --- a/nexus_workcell_orchestrator/src/job_manager_test.cpp +++ b/nexus_workcell_orchestrator/src/job_manager_test.cpp @@ -171,6 +171,10 @@ TEST_CASE("JobManager") { job_mgr.tick(); // test1 and test2 should be finished CHECK(job_mgr.jobs().size() == 1); + + job_mgr.tick(); + // test3 should now be finished + CHECK(job_mgr.jobs().size() == 0); } } From b8a21b3e194970e0dde400b4e918c9e0b14d6e34 Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Tue, 24 Oct 2023 08:54:02 +0800 Subject: [PATCH 5/9] fix tests Signed-off-by: Teo Koon Peng --- nexus_workcell_orchestrator/src/job.hpp | 2 +- nexus_workcell_orchestrator/src/job_manager.cpp | 5 +++-- nexus_workcell_orchestrator/src/job_manager_test.cpp | 4 ++-- nexus_workcell_orchestrator/src/workcell_orchestrator.cpp | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/nexus_workcell_orchestrator/src/job.hpp b/nexus_workcell_orchestrator/src/job.hpp index c91f71db..ed14e73c 100644 --- a/nexus_workcell_orchestrator/src/job.hpp +++ b/nexus_workcell_orchestrator/src/job.hpp @@ -40,7 +40,7 @@ struct Job std::shared_ptr ctx; std::optional bt; GoalHandlePtr goal_handle; - std::optional bt_logging; + std::unique_ptr bt_logging; TaskState task_state = TaskState(); uint64_t tick_count = 0; }; diff --git a/nexus_workcell_orchestrator/src/job_manager.cpp b/nexus_workcell_orchestrator/src/job_manager.cpp index 9a77555b..078a8052 100644 --- a/nexus_workcell_orchestrator/src/job_manager.cpp +++ b/nexus_workcell_orchestrator/src/job_manager.cpp @@ -144,7 +144,7 @@ Job& JobManager::assign_task( } auto& j = this->_jobs.emplace_back(Job{nullptr, std::nullopt, nullptr, - std::nullopt}); + nullptr}); j.task_state.task_id = task_id; j.task_state.workcell_id = this->_node->get_name(); j.task_state.status = TaskState::STATUS_ASSIGNED; @@ -176,7 +176,8 @@ Job& JobManager::queue_task(const GoalHandlePtr& goal_handle, it->ctx = ctx; it->bt = std::move(bt); - it->bt_logging.emplace(common::BtLogging(*it->bt, this->_node)); + it->bt_logging = + std::make_unique(*it->bt, this->_node); it->goal_handle = goal_handle; it->task_state.status = TaskState::STATUS_QUEUED; return *it; diff --git a/nexus_workcell_orchestrator/src/job_manager_test.cpp b/nexus_workcell_orchestrator/src/job_manager_test.cpp index 21cf583f..121dc489 100644 --- a/nexus_workcell_orchestrator/src/job_manager_test.cpp +++ b/nexus_workcell_orchestrator/src/job_manager_test.cpp @@ -93,7 +93,7 @@ TEST_CASE("JobManager") { Job* job; job = &job_mgr.assign_task("test"); CHECK(!job->bt.has_value()); - CHECK(!job->bt_logging.has_value()); + CHECK(!job->bt_logging); CHECK(job->ctx == nullptr); CHECK(job->goal_handle == nullptr); CHECK(job->task_state.task_id == "test"); @@ -113,7 +113,7 @@ TEST_CASE("JobManager") { CHECK(&job_mgr.queue_task(goal_handle, ctx, bt_factory.createTreeFromText(bt)) == job); CHECK(job->bt.has_value()); - CHECK(job->bt_logging.has_value()); + CHECK(job->bt_logging); REQUIRE(job->ctx == ctx); CHECK(job->goal_handle == goal_handle); CHECK(job->task_state.task_id == "test"); diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp index d7d1f1d7..4e4cf2fe 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp @@ -278,7 +278,7 @@ auto WorkcellOrchestrator::_configure( const auto it = std::find_if(jobs.begin(), jobs.end(), [&goal](const Job& j) { - return j.ctx->task.id == goal->task.id; + return j.task_state.task_id == goal->task.id; }); if (it == jobs.end()) { From 292c01d23f7577d73706fc7f43433a2deed5de8d Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Tue, 7 Nov 2023 16:01:14 +0800 Subject: [PATCH 6/9] log when task state is changed instead of every tick Signed-off-by: Teo Koon Peng --- .../src/job_manager.cpp | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/nexus_workcell_orchestrator/src/job_manager.cpp b/nexus_workcell_orchestrator/src/job_manager.cpp index 078a8052..c893b16a 100644 --- a/nexus_workcell_orchestrator/src/job_manager.cpp +++ b/nexus_workcell_orchestrator/src/job_manager.cpp @@ -90,17 +90,8 @@ uint8_t JobManager::_tick_job(Job& job) auto& task_status = job.task_state.status; switch (task_status) { - case TaskState::STATUS_ASSIGNED: { - RCLCPP_INFO_THROTTLE( - this->_node->get_logger(), - *this->_node->get_clock(), - std::chrono::milliseconds(1000).count(), - "Task [%s] is pending, call \"%s\" to start this task", - job.task_state.task_id.c_str(), - endpoints::WorkcellRequestAction::action_name( - this->_node->get_name()).c_str()); + case TaskState::STATUS_ASSIGNED: break; - } case TaskState::STATUS_QUEUED: case TaskState::STATUS_RUNNING: { this->_tick_bt(job); @@ -148,6 +139,12 @@ Job& JobManager::assign_task( j.task_state.task_id = task_id; j.task_state.workcell_id = this->_node->get_name(); j.task_state.status = TaskState::STATUS_ASSIGNED; + RCLCPP_INFO( + this->_node->get_logger(), + "Task [%s] is pending, call \"%s\" to start this task", + j.task_state.task_id.c_str(), + endpoints::WorkcellRequestAction::action_name( + this->_node->get_name()).c_str()); RCLCPP_INFO( this->_node->get_logger(), "Assigned task [%s]", @@ -180,6 +177,9 @@ Job& JobManager::queue_task(const GoalHandlePtr& goal_handle, std::make_unique(*it->bt, this->_node); it->goal_handle = goal_handle; it->task_state.status = TaskState::STATUS_QUEUED; + RCLCPP_INFO( + it->ctx->node->get_logger(), "Task [%s] is now queued", + it->task_state.task_id.c_str()); return *it; } From a49b4a856570e9850e093b1ab64fa0c41017d782 Mon Sep 17 00:00:00 2001 From: Teo Koon Peng Date: Tue, 7 Nov 2023 16:03:33 +0800 Subject: [PATCH 7/9] avoid UB if `_job_mgr` has no value Signed-off-by: Teo Koon Peng --- nexus_workcell_orchestrator/src/workcell_orchestrator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp index 4e4cf2fe..8ea88631 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp @@ -214,7 +214,7 @@ auto WorkcellOrchestrator::on_activate( RCLCPP_INFO(this->get_logger(), "Workcell activated"); this->_bt_timer = this->create_wall_timer(BT_TICK_RATE, [this]() { - this->_job_mgr->tick(); + this->_job_mgr.value().tick(); }); return CallbackReturn::SUCCESS; } From efa929c1ee88e80c25cdc38eaeaffb227436eaed Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Wed, 27 Nov 2024 13:03:31 +0800 Subject: [PATCH 8/9] Fix catch2 header Signed-off-by: Luca Della Vedova --- nexus_workcell_orchestrator/src/job_manager_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus_workcell_orchestrator/src/job_manager_test.cpp b/nexus_workcell_orchestrator/src/job_manager_test.cpp index 121dc489..5584b214 100644 --- a/nexus_workcell_orchestrator/src/job_manager_test.cpp +++ b/nexus_workcell_orchestrator/src/job_manager_test.cpp @@ -19,7 +19,7 @@ #include -#include +#include namespace nexus::workcell_orchestrator::test { From de2ae385ee1bdeea35335184606e937fd949bddb Mon Sep 17 00:00:00 2001 From: Luca Della Vedova Date: Tue, 3 Dec 2024 10:23:54 +0800 Subject: [PATCH 9/9] Revert CMake test change Signed-off-by: Luca Della Vedova --- nexus_workcell_orchestrator/CMakeLists.txt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/nexus_workcell_orchestrator/CMakeLists.txt b/nexus_workcell_orchestrator/CMakeLists.txt index ecd68a8a..54edb393 100644 --- a/nexus_workcell_orchestrator/CMakeLists.txt +++ b/nexus_workcell_orchestrator/CMakeLists.txt @@ -94,7 +94,7 @@ if(BUILD_TESTING) # Adds a ament catch2 test with some common libraries. # Usage: nexus_add_test( ...) function(nexus_add_test target) - ament_add_catch2(${target} src/main_test.cpp ${ARGN}) + ament_add_catch2(${target} ${ARGN}) target_link_libraries(${target} PRIVATE ${PROJECT_NAME}_plugin nexus_common::nexus_common_test @@ -102,13 +102,18 @@ if(BUILD_TESTING) endfunction() nexus_add_test(serialization_tests + src/main_test.cpp src/serialization_test.cpp ) nexus_add_test(task_results_tests + src/main_test.cpp src/task_results_test.cpp ) nexus_add_test(transform_pose_test src/transform_pose_test.cpp) - nexus_add_test(job_manager_test src/job_manager_test.cpp) + nexus_add_test(job_manager_test + src/main_test.cpp + src/job_manager_test.cpp + ) endif() ament_package()