diff --git a/nexus_capabilities/include/nexus_capabilities/context.hpp b/nexus_capabilities/include/nexus_capabilities/context.hpp index e85a5f44..75d5cfa4 100644 --- a/nexus_capabilities/include/nexus_capabilities/context.hpp +++ b/nexus_capabilities/include/nexus_capabilities/context.hpp @@ -20,44 +20,53 @@ #include "task.hpp" -#include -#include -#include - -#include - -#include -#include #include -#include - -#include -#include -#include +#include +#include +#include +#include //============================================================================== namespace nexus { class Context { -public: using GoalHandlePtr = - std::shared_ptr>; -public: using TaskState = nexus_orchestrator_msgs::msg::TaskState; - - // using reference to prevent circular references -public: rclcpp_lifecycle::LifecycleNode& node; -public: BT::Tree bt; -public: GoalHandlePtr goal_handle; + /** + * There are several choices we can use here, each having their own pros and cons. + * + * shared_ptr: + * + Api friendly + * - Will cause circular references + * - Cannot create an instance in the node constructor (cannot call `shared_from_this` + * in constructor) + * + * weak_ptr: + * - Will almost always cause circular reference. Most ROS apis don't support weak_ptr, + * attempting to `lock()` and passing the `shared_ptr` will almost always cause a + * circular reference. + * - Cannot create create an instance in the node constructor (cannot call `weak_from_this` + * in constructor) + * + * raw_ptr: + * + Api friendly + * - Unsafe + * + * reference: + * - Unsafe. Reference does not protect from UAF, also most ROS apis don't support references, + * those that use templates may work with either shared or raw pointers. + * We can't get a shared_ptr from a reference so we may end up using raw pointers. + * + * `shared_ptr` is chosen due to api friendliness, the consequences of circular reference + * is negligible since the node should never go out of scope. + */ +public: std::shared_ptr node; public: Task task; public: std::vector errors; -public: std::unique_ptr bt_logging; public: std::unordered_set signals; -public: TaskState task_state; -public: uint64_t tick_count = 0; -public: Context(rclcpp_lifecycle::LifecycleNode& node) - : node(node) {} +public: Context(std::shared_ptr node) + : node(std::move(node)) {} }; } // namespace nexus diff --git a/nexus_capabilities/src/capabilities/detection.cpp b/nexus_capabilities/src/capabilities/detection.cpp index fbe677e5..f14e9a4b 100644 --- a/nexus_capabilities/src/capabilities/detection.cpp +++ b/nexus_capabilities/src/capabilities/detection.cpp @@ -219,13 +219,13 @@ void DetectAllItems::_send_next_request() BT::NodeStatus GetDetection::tick() { - auto ctx = this->_ctx_mgr->current_context(); - auto detections = this->getInput( + const auto ctx = this->_ctx_mgr->current_context(); + const auto detections = this->getInput( "detections"); if (!detections) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [detections] is required", + ctx->node->get_logger(), "%s: port [detections] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -235,7 +235,7 @@ BT::NodeStatus GetDetection::tick() if (!idx && !id) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [idx] or [id] is required", + ctx->node->get_logger(), "%s: port [idx] or [id] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -251,7 +251,7 @@ BT::NodeStatus GetDetection::tick() if (it == detections->detections.cend()) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: cannot find detection with id [%s]", + ctx->node->get_logger(), "%s: cannot find detection with id [%s]", this->name().c_str(), id->c_str()); return BT::NodeStatus::FAILURE; } @@ -266,12 +266,13 @@ BT::NodeStatus GetDetection::tick() BT::NodeStatus GetDetectionPose::tick() { - auto ctx = this->_ctx_mgr->current_context(); - auto detection = this->getInput("detection"); + const auto ctx = this->_ctx_mgr->current_context(); + const auto detection = this->getInput( + "detection"); if (!detection) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [detection] is required", + ctx->node->get_logger(), "%s: port [detection] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } diff --git a/nexus_workcell_orchestrator/CMakeLists.txt b/nexus_workcell_orchestrator/CMakeLists.txt index 8234887e..54edb393 100644 --- a/nexus_workcell_orchestrator/CMakeLists.txt +++ b/nexus_workcell_orchestrator/CMakeLists.txt @@ -30,6 +30,7 @@ endif() add_library(${PROJECT_NAME}_plugin SHARED src/get_result.cpp src/get_joint_constraints.cpp + src/job_manager.cpp src/make_transform.cpp src/serialization.cpp src/set_result.cpp @@ -109,6 +110,10 @@ if(BUILD_TESTING) src/task_results_test.cpp ) nexus_add_test(transform_pose_test src/transform_pose_test.cpp) + nexus_add_test(job_manager_test + src/main_test.cpp + src/job_manager_test.cpp + ) endif() ament_package() diff --git a/nexus_workcell_orchestrator/src/get_result.cpp b/nexus_workcell_orchestrator/src/get_result.cpp index cbb2ea73..74686c2b 100644 --- a/nexus_workcell_orchestrator/src/get_result.cpp +++ b/nexus_workcell_orchestrator/src/get_result.cpp @@ -29,7 +29,7 @@ BT::NodeStatus GetResult::tick() if (!key) { RCLCPP_ERROR( - this->_node.get_logger(), "%s: port [key] is required", + ctx->node->get_logger(), "%s: port [key] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -41,7 +41,7 @@ BT::NodeStatus GetResult::tick() catch (const YAML::Exception& e) { RCLCPP_ERROR( - this->_node.get_logger(), "[%s]: Failed to read result [%s]", + ctx->node->get_logger(), "[%s]: Failed to read result [%s]", this->name().c_str(), e.what()); return BT::NodeStatus::FAILURE; } diff --git a/nexus_workcell_orchestrator/src/get_result.hpp b/nexus_workcell_orchestrator/src/get_result.hpp index 556f79c0..a1043eb0 100644 --- a/nexus_workcell_orchestrator/src/get_result.hpp +++ b/nexus_workcell_orchestrator/src/get_result.hpp @@ -42,14 +42,12 @@ public: static BT::PortsList providedPorts() } public: GetResult(const std::string& name, const BT::NodeConfiguration& config, - std::shared_ptr ctx_mgr, - rclcpp_lifecycle::LifecycleNode& node) - : BT::SyncActionNode{name, config}, _ctx_mgr{ctx_mgr}, _node{node} {} + std::shared_ptr ctx_mgr) + : BT::SyncActionNode{name, config}, _ctx_mgr{ctx_mgr} {} public: BT::NodeStatus tick() override; private: std::shared_ptr _ctx_mgr; -private: rclcpp_lifecycle::LifecycleNode& _node; }; } diff --git a/nexus_workcell_orchestrator/src/job.cpp b/nexus_workcell_orchestrator/src/job.cpp deleted file mode 100644 index 8dd981f0..00000000 --- a/nexus_workcell_orchestrator/src/job.cpp +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) 2023 Johnson & Johnson - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "job.hpp" - -#include "get_result.hpp" -#include "make_transform.hpp" -#include "serialization.hpp" -#include "get_joint_constraints.hpp" -#include "set_result.hpp" -#include "signals.hpp" -#include "transform_pose.hpp" - -#include - -namespace nexus::workcell_orchestrator { - -Job::Job(rclcpp_lifecycle::LifecycleNode::WeakPtr node, - tf2_ros::Buffer::SharedPtr tf2_buffer) -: _node(std::move(node)), _tf2_buffer(std::move(tf2_buffer)) -{ - // create IsPauseTriggered BT node, it is a simple action node with output port "paused". - this->_bt_factory.registerSimpleAction("IsPauseTriggered", - [this](BT::TreeNode& node) - { - node.setOutput("paused", this->_paused); - return BT::NodeStatus::SUCCESS; - }, { BT::OutputPort("paused") }); - - // register PausableSequence - this->_bt_factory.registerNodeType( - "PausableSequence"); - - this->_bt_factory.registerBuilder("SetResult", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx); - }); - this->_bt_factory.registerBuilder("GetResult", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx, - *this->_node.lock()); - }); - - this->_bt_factory.registerNodeType("MakeTransform"); - this->_bt_factory.registerBuilder( - "ApplyTransform", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, - *this->_node.lock(), - this->_tf2_buffer); - }); - - this->_bt_factory.registerBuilder( - "SerializeDetections", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, - *this->_node.lock()); - }); - this->_bt_factory.registerBuilder( - "DeserializeDetections", - [this]( - const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, - *this->_node.lock()); - }); - - this->_bt_factory.registerBuilder("WaitForSignal", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx); - }); - - this->_bt_factory.registerBuilder("SetSignal", - [this](const std::string& name, const BT::NodeConfiguration& config) - { - return std::make_unique(name, config, this->_ctx); - }); - - this->_bt_factory.registerBuilder( - "GetJointConstraints", - [this]( - const std::string& name, - const BT::NodeConfiguration& config) - { - return std::make_unique( - name, - config, - this->_node - ); - }); -} - -void Job::load_capability(const Capability& cap) -{ - cap -} - -} diff --git a/nexus_workcell_orchestrator/src/job.hpp b/nexus_workcell_orchestrator/src/job.hpp index f8d63c9f..ed14e73c 100644 --- a/nexus_workcell_orchestrator/src/job.hpp +++ b/nexus_workcell_orchestrator/src/job.hpp @@ -18,33 +18,33 @@ #ifndef NEXUS_WORKCELL_ORCHESTRATOR__JOB_HPP #define NEXUS_WORKCELL_ORCHESTRATOR__JOB_HPP -#include -#include +#include +#include +#include +#include #include -#include -#include +#include + +#include namespace nexus::workcell_orchestrator { -class Job +struct Job { -public: Job(rclcpp_lifecycle::LifecycleNode::WeakPtr node, - tf2_ros::Buffer::SharedPtr tf2_buffer); - -public: void pause() { this->_paused = true; } -public: void unpause() { this->_paused = false; } -public: void load_capability(const Capability& cap); - -private: rclcpp_lifecycle::LifecycleNode::WeakPtr _node; -private: tf2_ros::Buffer::SharedPtr _tf2_buffer; -private: BT::BehaviorTreeFactory _bt_factory; -private: std::shared_ptr _ctx; -private: bool _paused; + using TaskState = nexus_orchestrator_msgs::msg::TaskState; + using GoalHandlePtr = + std::shared_ptr>; + + std::shared_ptr ctx; + std::optional bt; + GoalHandlePtr goal_handle; + std::unique_ptr bt_logging; + TaskState task_state = TaskState(); + uint64_t tick_count = 0; }; } #endif - diff --git a/nexus_workcell_orchestrator/src/job_manager.cpp b/nexus_workcell_orchestrator/src/job_manager.cpp new file mode 100644 index 00000000..c893b16a --- /dev/null +++ b/nexus_workcell_orchestrator/src/job_manager.cpp @@ -0,0 +1,257 @@ +/* + * Copyright (C) 2023 Johnson & Johnson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "job_manager.hpp" + +namespace nexus::workcell_orchestrator { + +using TaskState = nexus_orchestrator_msgs::msg::TaskState; +using WorkcellRequest = endpoints::WorkcellRequestAction::ActionType; + +/** + * Tick the behavior tree of a job and update the task status according to the result. + */ +void JobManager::_tick_bt(Job& job) +{ + this->_ctx_mgr->set_active_context(job.ctx); + const auto bt_status = job.bt->tickRoot(); + ++job.tick_count; + this->_ctx_mgr->set_active_context(nullptr); + + // update task status according to bt status + switch (bt_status) + { + case BT::NodeStatus::RUNNING: { + job.task_state.status = TaskState::STATUS_RUNNING; + break; + } + case BT::NodeStatus::FAILURE: { + RCLCPP_INFO( + job.ctx->node->get_logger(), "Task [%s] failed", + job.ctx->task.id.c_str()); + // Publish feedback. + job.task_state.status = TaskState::STATUS_FAILED; + auto fb = std::make_shared(); + fb->state = job.task_state; + job.goal_handle->publish_feedback(std::move(fb)); + + // Abort the action request. + auto result = + std::make_shared(); + result->success = false; + result->message = "Task failed"; + job.goal_handle->abort(result); + break; + } + case BT::NodeStatus::SUCCESS: { + // Publish feedback. + job.task_state.status = TaskState::STATUS_FINISHED; + auto fb = std::make_shared(); + fb->state = job.task_state; + job.goal_handle->publish_feedback(std::move(fb)); + + auto result = + std::make_shared(); + result->success = true; + result->result = YAML::Dump(job.ctx->task.previous_results); + auto report = job.bt_logging->generate_report(); + result->message = common::ReportConverter::to_string(report); + RCLCPP_INFO(job.ctx->node->get_logger(), "%s", result->message.c_str()); + job.goal_handle->succeed(result); + break; + } + default: { + std::ostringstream oss; + oss << "Error ticking task [" << job.ctx->task.id << + "]: Behavior tree returned invalid or unknown status [" << + bt_status << "]"; + RCLCPP_ERROR_STREAM(job.ctx->node->get_logger(), oss.str()); + throw std::runtime_error(oss.str()); + } + } +} + +uint8_t JobManager::_tick_job(Job& job) +{ + auto& task_status = job.task_state.status; + switch (task_status) + { + case TaskState::STATUS_ASSIGNED: + break; + case TaskState::STATUS_QUEUED: + case TaskState::STATUS_RUNNING: { + this->_tick_bt(job); + break; + } + case TaskState::STATUS_FAILED: { + RCLCPP_WARN( + job.ctx->node->get_logger(), "Cannot tick task [%s] that has already failed", + job.ctx->task.id.c_str()); + break; + } + case TaskState::STATUS_FINISHED: { + RCLCPP_WARN( + job.ctx->node->get_logger(), "Cannot tick task [%s] that has already finished", + job.ctx->task.id.c_str()); + break; + } + default: { + RCLCPP_ERROR( + job.ctx->node->get_logger(), "Error ticking task [%s]: Unknown status [%i]", + job.ctx->task.id.c_str(), + task_status); + break; + } + } + return task_status; +} + +Job& JobManager::assign_task( + const std::string& task_id) +{ + const auto it = + std::find_if(this->_jobs.begin(), this->_jobs.end(), [&task_id]( + const Job& j) + { + return j.task_state.task_id == task_id; + }); + if (it != this->_jobs.end()) + { + throw JobError("Another task with the same id already exist"); + } + + auto& j = this->_jobs.emplace_back(Job{nullptr, std::nullopt, nullptr, + nullptr}); + j.task_state.task_id = task_id; + j.task_state.workcell_id = this->_node->get_name(); + j.task_state.status = TaskState::STATUS_ASSIGNED; + RCLCPP_INFO( + this->_node->get_logger(), + "Task [%s] is pending, call \"%s\" to start this task", + j.task_state.task_id.c_str(), + endpoints::WorkcellRequestAction::action_name( + this->_node->get_name()).c_str()); + + RCLCPP_INFO( + this->_node->get_logger(), "Assigned task [%s]", + task_id.c_str()); + return j; +} + +Job& JobManager::queue_task(const GoalHandlePtr& goal_handle, + const std::shared_ptr& ctx, BT::Tree&& bt) +{ + const auto& job_id = goal_handle->get_goal()->task.id; + const auto it = + std::find_if(this->_jobs.begin(), this->_jobs.end(), [&job_id](const Job& j) + { + return j.task_state.task_id == job_id; + }); + if (it == this->_jobs.end()) + { + // abort the goal and throw error + const auto result = std::make_shared(); + result->success = false; + result->message = "Task not found"; + goal_handle->abort(result); + throw JobError("Task not found"); + } + + it->ctx = ctx; + it->bt = std::move(bt); + it->bt_logging = + std::make_unique(*it->bt, this->_node); + it->goal_handle = goal_handle; + it->task_state.status = TaskState::STATUS_QUEUED; + RCLCPP_INFO( + it->ctx->node->get_logger(), "Task [%s] is now queued", + it->task_state.task_id.c_str()); + return *it; +} + +void JobManager::remove_assigned_task(const std::string& task_id) +{ + const auto it = std::find_if(this->_jobs.begin(), + this->_jobs.end(), [&task_id](const Job& j) + { + return j.ctx->task.id == task_id; + }); + if (it == this->_jobs.end()) + { + return; + } + if (it->task_state.status != TaskState::STATUS_ASSIGNED) + { + throw JobError("Task is not assigned"); + } + this->_jobs.erase(it); +} + +void JobManager::halt_all_jobs() +{ + RCLCPP_INFO(this->_node->get_logger(), "Halting all tasks"); + for (auto& j : this->_jobs) + { + // halt bts that are running + if (j.task_state.status == TaskState::STATUS_RUNNING) + { + this->_ctx_mgr->set_active_context(j.ctx); + j.bt->haltTree(); + this->_ctx_mgr->set_active_context(nullptr); + + RCLCPP_INFO( + j.ctx->node->get_logger(), "Task [%s] halted", j.ctx->task.id.c_str()); + // Publish feedback. + j.task_state.status = TaskState::STATUS_FAILED; + auto fb = std::make_shared(); + fb->state = j.task_state; + j.goal_handle->publish_feedback(std::move(fb)); + + // Abort the action request. + auto result = + std::make_shared(); + result->success = false; + result->message = "Task halted"; + j.goal_handle->abort(result); + break; + } + } + + this->_jobs.clear(); +} + +void JobManager::tick() +{ + size_t i = 0; + auto it = this->_jobs.begin(); + for (; i < this->_max_concurrent && it != this->_jobs.end(); ++i) + { + const auto task_status = this->_tick_job(*it); + if (task_status == TaskState::STATUS_FAILED || + task_status == TaskState::STATUS_FINISHED) + { + it = this->_jobs.erase(it); + continue; + } + else + { + ++it; + } + } +} + +} diff --git a/nexus_workcell_orchestrator/src/job_manager.hpp b/nexus_workcell_orchestrator/src/job_manager.hpp new file mode 100644 index 00000000..d00209a7 --- /dev/null +++ b/nexus_workcell_orchestrator/src/job_manager.hpp @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2023 Johnson & Johnson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef NEXUS_WORKCELL_ORCHESTRATOR__JOB_MANAGER_HPP +#define NEXUS_WORKCELL_ORCHESTRATOR__JOB_MANAGER_HPP + +#include "job.hpp" + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace nexus::workcell_orchestrator { + +class JobError : public std::runtime_error +{ +public: using std::runtime_error::runtime_error; +}; + +/** + * Manage execution of jobs according to the nexus task lifecycle design. + */ +class JobManager +{ +public: using GoalHandlePtr = + std::shared_ptr>; +public: using JobIterator = std::list::iterator; + +public: JobManager(rclcpp_lifecycle::LifecycleNode::SharedPtr node, + size_t max_concurrent) + : _node(std::move(node)), _max_concurrent(max_concurrent) {} + + /** + * Assigns a task. + * @throw JobError + */ +public: Job& assign_task(const std::string& task_id); + + /** + * Queue a task. The task must already be assigned. If there is an error, the goal will + * immediately be aborted. + * @throw JobError + */ +public: Job& queue_task(const GoalHandlePtr& goal_handle, + const std::shared_ptr& ctx, BT::Tree&& bt); + + /** + * Remove a assigned task. + * @throw JobError + */ +public: void remove_assigned_task(const std::string& task_id); + + /** + * Forcefully stop and clear all jobs. Jobs will be stopped as soon as possible, unlike + * `cancel_all_jobs`, the behavior trees cannot react to this event. + */ +public: void halt_all_jobs(); + + /** + * TODO: Implement on cancel bt node. + * Request to cancel all jobs, the behavior trees of the jobs can react to this event + * and stop the job gracefully. + */ +// public: void cancel_all_jobs(); + + /** + * Tick all active jobs. + */ +public: void tick(); + +public: const std::list& jobs() const + { + return this->_jobs; + } + +public: std::shared_ptr context_manager() const + { + return this->_ctx_mgr; + } + +private: rclcpp_lifecycle::LifecycleNode::SharedPtr _node; +private: size_t _max_concurrent; +private: std::list _jobs; +private: std::shared_ptr _ctx_mgr = + std::make_shared(); + +private: void _tick_bt(Job& job); +private: uint8_t _tick_job(Job& job); + +}; + +} + +#endif diff --git a/nexus_workcell_orchestrator/src/job_manager_test.cpp b/nexus_workcell_orchestrator/src/job_manager_test.cpp new file mode 100644 index 00000000..5584b214 --- /dev/null +++ b/nexus_workcell_orchestrator/src/job_manager_test.cpp @@ -0,0 +1,181 @@ +/* + * Copyright (C) 2023 Johnson & Johnson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "job_manager.hpp" + +#include + +#include + +namespace nexus::workcell_orchestrator::test { + +using nexus_orchestrator_msgs::msg::TaskState; +using nexus_orchestrator_msgs::action::WorkcellTask; + +TEST_CASE("JobManager") { + auto fixture = common::test::RosFixture(); + BT::BehaviorTreeFactory bt_factory; + JobManager job_mgr(fixture.node, 2); + + std::function handle_accepted = + [](const JobManager::GoalHandlePtr&) {}; + + const auto server = rclcpp_action::create_server(fixture.node, + "test", + [](const rclcpp_action::GoalUUID&, + const endpoints::WorkcellRequestAction::ActionType::Goal::ConstSharedPtr&) + { + return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; + }, + [](const JobManager::GoalHandlePtr&) + { + return rclcpp_action::CancelResponse::REJECT; + }, [&](const JobManager::GoalHandlePtr& goal_handle) + { + handle_accepted(goal_handle); + }); + const auto client = rclcpp_action::create_client(fixture.node, + "test"); + + std::string bt = + R"( + + + + + + +)"; + + fixture.spin_in_background(); + + SECTION("cannot assign same task twice") { + CHECK_NOTHROW(job_mgr.assign_task("test")); + CHECK_THROWS_AS(job_mgr.assign_task("test"), JobError); + } + + SECTION("cannot queue task that is not assigned") { + handle_accepted = [&](const JobManager::GoalHandlePtr& goal_handle) + { + CHECK_THROWS_AS(job_mgr.queue_task(goal_handle, nullptr, + bt_factory.createTreeFromText(bt)), JobError); + }; + + WorkcellTask::Goal goal; + goal.task.id = "test"; + std::promise done; + decltype(client)::element_type::SendGoalOptions goal_opts; + goal_opts.result_callback = + [&](const rclcpp_action::ClientGoalHandle::WrappedResult&) + { + done.set_value(); + }; + client->async_send_goal(goal, goal_opts); + REQUIRE(done.get_future().wait_for(std::chrono::seconds( + 1)) == std::future_status::ready); + } + + SECTION("assign task sets job data correctly") { + Job* job; + job = &job_mgr.assign_task("test"); + CHECK(!job->bt.has_value()); + CHECK(!job->bt_logging); + CHECK(job->ctx == nullptr); + CHECK(job->goal_handle == nullptr); + CHECK(job->task_state.task_id == "test"); + CHECK(job->task_state.workcell_id == fixture.node->get_name()); + CHECK(job->task_state.status == TaskState::STATUS_ASSIGNED); + CHECK(job->tick_count == 0); + + SECTION("ticking assigned but unqueued task is a noop") { + job_mgr.tick(); + CHECK(job->tick_count == 0); + } + + SECTION("can queue an assigned task") { + handle_accepted = [&](const JobManager::GoalHandlePtr& goal_handle) + { + auto ctx = std::make_shared(fixture.node); + CHECK(&job_mgr.queue_task(goal_handle, ctx, + bt_factory.createTreeFromText(bt)) == job); + CHECK(job->bt.has_value()); + CHECK(job->bt_logging); + REQUIRE(job->ctx == ctx); + CHECK(job->goal_handle == goal_handle); + CHECK(job->task_state.task_id == "test"); + CHECK(job->task_state.status == TaskState::STATUS_QUEUED); + CHECK(job->tick_count == 0); + goal_handle->succeed(std::make_shared()); + }; + + WorkcellTask::Goal goal; + goal.task.id = "test"; + std::promise done; + decltype(client)::element_type::SendGoalOptions goal_opts; + goal_opts.result_callback = + [&](const rclcpp_action::ClientGoalHandle::WrappedResult&) + { + done.set_value(); + }; + client->async_send_goal(goal, goal_opts); + REQUIRE(done.get_future().wait_for(std::chrono::seconds( + 1)) == std::future_status::ready); + } + } + + SECTION("tick at most max_concurrent jobs") { + std::vector task_ids{"test1", "test2", "test3"}; + for (const auto& task_id : task_ids) + { + job_mgr.assign_task(task_id); + } + + std::promise done; + size_t queued = 0; + handle_accepted = [&](const JobManager::GoalHandlePtr& goal_handle) + { + auto ctx = std::make_shared(fixture.node); + REQUIRE_NOTHROW(job_mgr.queue_task(goal_handle, ctx, + bt_factory.createTreeFromText(bt))); + if (++queued == task_ids.size()) + { + done.set_value(); + } + }; + + // queue all tasks + for (const auto& task_id : task_ids) + { + WorkcellTask::Goal goal; + goal.task.id = task_id; + decltype(client)::element_type::SendGoalOptions goal_opts; + client->async_send_goal(goal, goal_opts); + } + REQUIRE(done.get_future().wait_for(std::chrono::seconds( + 1)) == std::future_status::ready); + + job_mgr.tick(); + // test1 and test2 should be finished + CHECK(job_mgr.jobs().size() == 1); + + job_mgr.tick(); + // test3 should now be finished + CHECK(job_mgr.jobs().size() == 0); + } +} + +} diff --git a/nexus_workcell_orchestrator/src/set_result.cpp b/nexus_workcell_orchestrator/src/set_result.cpp index f685787b..420565ab 100644 --- a/nexus_workcell_orchestrator/src/set_result.cpp +++ b/nexus_workcell_orchestrator/src/set_result.cpp @@ -29,14 +29,14 @@ BT::NodeStatus SetResult::tick() if (!key) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [key] is required", + ctx->node->get_logger(), "%s: port [key] is required", this->name().c_str()); } auto val = this->getInput("value"); if (!val) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [value] is required", + ctx->node->get_logger(), "%s: port [value] is required", this->name().c_str()); } ctx->task.previous_results[*key] = *val; diff --git a/nexus_workcell_orchestrator/src/signals.cpp b/nexus_workcell_orchestrator/src/signals.cpp index 96725930..7ca5c80a 100644 --- a/nexus_workcell_orchestrator/src/signals.cpp +++ b/nexus_workcell_orchestrator/src/signals.cpp @@ -26,7 +26,7 @@ BT::NodeStatus WaitForSignal::onStart() if (!signal) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: [signal] port is required", + ctx->node->get_logger(), "%s: [signal] port is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -34,14 +34,14 @@ BT::NodeStatus WaitForSignal::onStart() if (already_set) { RCLCPP_DEBUG( - ctx->node.get_logger(), "%s: signal [%s] already set", + ctx->node->get_logger(), "%s: signal [%s] already set", this->name().c_str(), signal->c_str()); auto clear = this->getInput("clear"); if (clear && *clear) { ctx->signals.erase(*signal); RCLCPP_INFO( - ctx->node.get_logger(), "%s: cleared signal [%s]", + ctx->node->get_logger(), "%s: cleared signal [%s]", this->name().c_str(), signal->c_str()); } } @@ -55,7 +55,7 @@ BT::NodeStatus WaitForSignal::onRunning() if (!signal) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: [signal] port is required", + ctx->node->get_logger(), "%s: [signal] port is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } @@ -66,7 +66,7 @@ BT::NodeStatus WaitForSignal::onRunning() { ctx->signals.erase(*signal); RCLCPP_INFO( - ctx->node.get_logger(), "%s: cleared signal [%s]", + ctx->node->get_logger(), "%s: cleared signal [%s]", this->name().c_str(), signal->c_str()); } return BT::NodeStatus::SUCCESS; @@ -81,13 +81,13 @@ BT::NodeStatus SetSignal::tick() if (!signal) { RCLCPP_ERROR( - ctx->node.get_logger(), "%s: port [signal] is required", + ctx->node->get_logger(), "%s: port [signal] is required", this->name().c_str()); return BT::NodeStatus::FAILURE; } ctx->signals.emplace(*signal); RCLCPP_DEBUG( - ctx->node.get_logger(), "%s: set signal [%s]", + ctx->node->get_logger(), "%s: set signal [%s]", this->name().c_str(), signal->c_str()); return BT::NodeStatus::SUCCESS; } diff --git a/nexus_workcell_orchestrator/src/task_results_test.cpp b/nexus_workcell_orchestrator/src/task_results_test.cpp index cd6196b5..21b2b372 100644 --- a/nexus_workcell_orchestrator/src/task_results_test.cpp +++ b/nexus_workcell_orchestrator/src/task_results_test.cpp @@ -34,7 +34,7 @@ namespace nexus::workcell_orchestrator::test { TEST_CASE("get and set results") { auto fixture = common::test::RosFixture{}; auto ctx_mgr = std::make_shared(); - auto ctx = std::make_shared(Context{*fixture.node}); + auto ctx = std::make_shared(fixture.node); ctx_mgr->set_active_context(ctx); BT::BehaviorTreeFactory bt_factory; @@ -46,7 +46,7 @@ TEST_CASE("get and set results") { bt_factory.registerBuilder("GetResult", [&](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, ctx_mgr, *fixture.node); + return std::make_unique(name, config, ctx_mgr); }); auto bt = bt_factory.createTreeFromText( diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp index 63adf071..3512fb13 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.cpp @@ -214,7 +214,7 @@ auto WorkcellOrchestrator::on_activate( RCLCPP_INFO(this->get_logger(), "Workcell activated"); this->_bt_timer = this->create_wall_timer(BT_TICK_RATE, [this]() { - this->_tick_all_bts(); + this->_job_mgr.value().tick(); }); return CallbackReturn::SUCCESS; } @@ -223,8 +223,8 @@ auto WorkcellOrchestrator::on_deactivate( const rclcpp_lifecycle::State& /* previous_state */) -> CallbackReturn { RCLCPP_INFO(this->get_logger(), - "Cancelling ongoing task before deactivating workcell"); - this->_cancel_all_tasks(); + "Halting ongoing task before deactivating workcell"); + this->_job_mgr->halt_all_jobs(); this->_bt_timer->cancel(); this->_bt_timer.reset(); @@ -245,7 +245,7 @@ auto WorkcellOrchestrator::on_cleanup( this->_signal_wc_srv.reset(); this->_task_doable_srv.reset(); this->_cmd_server.reset(); - this->_ctx_mgr.reset(); + this->_job_mgr->halt_all_jobs(); RCLCPP_INFO(this->get_logger(), "Cleaned up"); return CallbackReturn::SUCCESS; } @@ -253,7 +253,7 @@ auto WorkcellOrchestrator::on_cleanup( auto WorkcellOrchestrator::_configure( const rclcpp_lifecycle::State& /* previous_state */) -> CallbackReturn { - this->_ctx_mgr = std::make_shared(); + this->_job_mgr = JobManager(this->shared_from_this(), 1); this->_bt_store.on_register_handlers.emplace_back([this](const std:: string& key, const std::filesystem::path& filepath) { @@ -274,16 +274,16 @@ auto WorkcellOrchestrator::_configure( endpoints::WorkcellRequestAction::ActionType::Goal::ConstSharedPtr goal) { RCLCPP_DEBUG(this->get_logger(), "Got workcell task request"); + const auto& jobs = this->_job_mgr->jobs(); const auto it = - std::find_if(this->_ctxs.cbegin(), this->_ctxs.cend(), - [&goal](const std::shared_ptr& ctx) + std::find_if(jobs.begin(), jobs.end(), [&goal](const Job& j) { - return ctx->task.id == goal->task.id; + return j.task_state.task_id == goal->task.id; }); - if (it != this->_ctxs.cend() && (*it)->goal_handle) + if (it == jobs.end()) { RCLCPP_ERROR(this->get_logger(), - "A task with the same id is already executing"); + "Task [%s] not assigned to this workcell", goal->task.id.c_str()); return rclcpp_action::GoalResponse::REJECT; } return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; @@ -293,26 +293,15 @@ auto WorkcellOrchestrator::_configure( { RCLCPP_DEBUG(this->get_logger(), "Got cancel request"); const auto& goal = goal_handle->get_goal(); - auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&goal](const std::shared_ptr& ctx) - { - return goal->task.id == ctx->task.id; - }); - if (it == this->_ctxs.end()) - { - RCLCPP_WARN(this->get_logger(), - "Fail to cancel task [%s]: task does not exist", goal->task.id.c_str()); - } - else + if (!goal->task.id.empty()) { - // we can just remove a task that is not running - if ((*it)->task_state.status != TaskState::STATUS_RUNNING) - { - this->_ctxs.erase(it); - } + RCLCPP_WARN( + this->get_logger(), + "Cancelling specific task is no longer supported, all tasks will be cancelled together to ensure line consistency"); } + + this->_job_mgr->halt_all_jobs(); return rclcpp_action::CancelResponse::ACCEPT; }, [this](std::shared_ptr> @@ -327,67 +316,17 @@ auto WorkcellOrchestrator::_configure( return; } - std::shared_ptr ctx = nullptr; - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&goal_handle](const std::shared_ptr& ctx) - { - return ctx->task.id == goal_handle->get_goal()->task.id; - }); - if (it != this->_ctxs.end()) - { - ctx = *it; - } - else - { - ctx = std::make_shared(*this); - } - ctx->goal_handle = goal_handle; - const auto goal = goal_handle->get_goal(); + auto ctx = std::make_shared(this->shared_from_this()); + ctx->task = this->_task_parser.parse_task(goal_handle->get_goal()->task); + auto bt = this->_create_bt(ctx); try { - ctx->task = this->_task_parser.parse_task(goal->task); - if (!this->_can_perform_task( - ctx->task)) - { - auto result = std::make_shared(); - result->message = "Workcell cannot perform task " + ctx->task.type; - result->success = false; - RCLCPP_ERROR_STREAM(this->get_logger(), result->message); - goal_handle->abort( - result); - return; - } - ctx->task_state.workcell_id = this->get_name(); - ctx->task_state.task_id = ctx->task.id; - ctx->bt = this->_create_bt(ctx); - ctx->bt_logging = std::make_unique(ctx->bt, - this->shared_from_this()); - } - catch (const std::exception& e) - { - std::ostringstream oss; - auto result = std::make_shared(); - oss << "Failed to create task: " << e.what(); - result->message = oss.str(); - result->success = false; - RCLCPP_ERROR_STREAM(this->get_logger(), result->message); - goal_handle->abort(result); - // make sure to clear previously queued task - this->_ctxs.erase(it); - return; + this->_job_mgr->queue_task(goal_handle, ctx, std::move(bt)); } - - RCLCPP_INFO(this->get_logger(), "Queuing task [%s]", - goal->task.id.c_str()); - ctx->task_state.status = TaskState::STATUS_QUEUED; - auto fb = std::make_shared(); - fb->state = ctx->task_state; - goal_handle->publish_feedback(fb); - - if (it == this->_ctxs.end()) + catch (const JobError& e) { - this->_ctxs.emplace_back(ctx); + RCLCPP_ERROR(this->get_logger(), "Failed to queue task [%s]: %s", + goal_handle->get_goal()->task.id.c_str(), e.what()); } }); @@ -441,28 +380,18 @@ auto WorkcellOrchestrator::_configure( ConstSharedPtr req, endpoints::QueueWorkcellTaskService::ServiceType::Response::SharedPtr resp) { - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&req](const std::shared_ptr& ctx) + try { - return ctx->task.id == req->task_id; - }); - if (it != this->_ctxs.end()) + this->_job_mgr->assign_task(req->task_id); + resp->success = true; + } + catch (const JobError& e) { resp->success = false; - resp->message = "A task with the same id already exists"; - RCLCPP_INFO(this->get_logger(), "Failed to queue task [%s]: %s", - req->task_id.c_str(), resp->message.c_str()); - return; + resp->message = e.what(); + RCLCPP_ERROR(this->get_logger(), "Failed to assign task [%s]: %s", + req->task_id.c_str(), e.what()); } - const auto& ctx = - this->_ctxs.emplace_back(std::make_shared(*this)); - ctx->task.id = req->task_id; - ctx->task_state.task_id = req->task_id; - ctx->task_state.workcell_id = this->get_name(); - ctx->task_state.status = TaskState::STATUS_ASSIGNED; - resp->success = true; - RCLCPP_INFO(this->get_logger(), "queued task %s", ctx->task.id.c_str()); }); this->_remove_pending_task_srv = @@ -474,24 +403,19 @@ auto WorkcellOrchestrator::_configure( { RCLCPP_DEBUG(this->get_logger(), "received request to remove pending task [%s]", req->task_id.c_str()); - const auto it = - std::find_if(this->_ctxs.begin(), this->_ctxs.end(), - [&req](const std::shared_ptr& ctx) + try { - return ctx->task.id == req->task_id; - }); - if (it == this->_ctxs.end() || - (*it)->task_state.status != TaskState::STATUS_ASSIGNED) + this->_job_mgr->remove_assigned_task(req->task_id); + resp->success = true; + } + catch (const JobError& e) { + RCLCPP_WARN(this->get_logger(), + "Failed to remove assigned task [%s]: %s", req->task_id.c_str(), + e.what()); resp->success = false; - resp->message = "task does not exist or is not pending"; - RCLCPP_DEBUG_STREAM(this->get_logger(), resp->message); - return; + resp->message = e.what(); } - this->_ctxs.erase(it); - RCLCPP_INFO(this->get_logger(), "removed task [%s]", - req->task_id.c_str()); - resp->success = true; }); this->_tf2_buffer = std::make_shared(this->get_clock()); @@ -561,12 +485,14 @@ auto WorkcellOrchestrator::_configure( this->_bt_factory->registerBuilder("SetResult", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerBuilder("GetResult", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr, *this); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerNodeType("MakeTransform"); @@ -601,13 +527,15 @@ auto WorkcellOrchestrator::_configure( this->_bt_factory->registerBuilder("WaitForSignal", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerBuilder("SetSignal", [this](const std::string& name, const BT::NodeConfiguration& config) { - return std::make_unique(name, config, this->_ctx_mgr); + return std::make_unique(name, config, + this->_job_mgr->context_manager()); }); this->_bt_factory->registerBuilder( @@ -646,7 +574,8 @@ auto WorkcellOrchestrator::_configure( cap_id.c_str() ); cap->configure( - this->shared_from_this(), this->_ctx_mgr, *_bt_factory); + this->shared_from_this(), this->_job_mgr->context_manager(), + *_bt_factory); } } catch (const CapabilityError& e) @@ -683,165 +612,6 @@ auto WorkcellOrchestrator::_configure( return CallbackReturn::SUCCESS; } -void WorkcellOrchestrator::_tick_bt(const std::shared_ptr& ctx) -{ - if (ctx->task_state.status == TaskState::STATUS_FINISHED || - ctx->task_state.status == TaskState::STATUS_FAILED) - { - RCLCPP_WARN( - this->get_logger(), "Not executing task [%s] that is already finished", - ctx->task.id.c_str()); - return; - } - - this->_ctx_mgr->set_active_context(ctx); - const auto& goal_handle = ctx->goal_handle; - ++ctx->tick_count; - - if (ctx->task_state.status == TaskState::STATUS_ASSIGNED) - { - // only print on first tick - if (ctx->tick_count == 1) - { - RCLCPP_INFO( - this->get_logger(), "Task [%s] is pending, call \"%s\" to start this task", - ctx->task.id.c_str(), - endpoints::WorkcellRequestAction::action_name(this->get_name()).c_str()); - } - return; - } - - if (ctx->task_state.status == TaskState::STATUS_QUEUED) - { - RCLCPP_INFO(this->get_logger(), "Starting task [%s]", ctx->task.id.c_str()); - ctx->task_state.status = TaskState::STATUS_RUNNING; - auto fb = std::make_shared(); - fb->state = ctx->task_state; - goal_handle->publish_feedback(fb); - } - - if (ctx->task_state.status == TaskState::STATUS_RUNNING) - { - try - { - switch (ctx->bt.tickRoot()) - { - case BT::NodeStatus::RUNNING: - return; - case BT::NodeStatus::SUCCESS: { - this->_handle_command_success(ctx); - return; - } - case BT::NodeStatus::FAILURE: { - this->_handle_command_failed(ctx); - return; - } - default: { - RCLCPP_ERROR( - this->get_logger(), - "Failed executing task [%s]: unexpected BT status", - ctx->task.id.c_str()); - this->_handle_command_failed(ctx); - return; - } - } - } - catch (const std::exception& e) - { - RCLCPP_ERROR(this->get_logger(), "BT failed with exception: %s", - e.what()); - ctx->bt.haltTree(); - this->_handle_command_failed(ctx); - return; - } - } - - RCLCPP_WARN( - this->get_logger(), "Failed executing task [%s]: unexpected status [%u]", - ctx->task.id.c_str(), ctx->task_state.status); - ctx->bt.haltTree(); - this->_handle_command_failed(ctx); - return; -} - -void WorkcellOrchestrator::_tick_all_bts() -{ - // cancelling an ongoing task may leave the workcell in a undetermined state - // so we must cancel all tasks when an ongoing task is cancelled. - for (const auto& ctx : this->_ctxs) - { - if (ctx->task_state.status == TaskState::STATUS_RUNNING && - ctx->goal_handle->is_canceling()) - { - RCLCPP_WARN( - this->get_logger(), "Cancelling all tasks because an ongoing task [%s] is being cancelled", - ctx->task.id.c_str()); - // NOTE: iterators are invalidated, it is important to - // not access them after this line. - this->_cancel_all_tasks(); - break; - } - } - - size_t count = 0; - auto it = this->_ctxs.begin(); - for (; count < MAX_PARALLEL_TASK && it != this->_ctxs.end(); - ++count, ++it) - { - this->_tick_bt(*it); - const auto task_status = (*it)->task_state.status; - // `_tick_bt` returns true if the task is finished - if (task_status == TaskState::STATUS_FINISHED || - task_status == TaskState::STATUS_FAILED) - { - // NOTE: iterator is invalidated, it is important to - // not access it after this line. - this->_ctxs.erase(it); - } - // cancel all other tasks when any task fails. note that the failing task is - // removed from the list first because we don't want to cancel an already - // failed task. - if (task_status == TaskState::STATUS_FAILED) - { - this->_cancel_all_tasks(); - break; - } - } - - auto new_state = this->_cur_state; - // check if status has changed and publish new state - if (this->_ctxs.size() > 0) - { - new_state.status = WorkcellState::STATUS_BUSY; - new_state.task_id = this->_ctxs.front()->task.id; - } - else if (this->_ctxs.size() == 0) - { - new_state.status = WorkcellState::STATUS_IDLE; - new_state.task_id = ""; - } - if (new_state.task_id != this->_cur_state.task_id || - new_state.status != this->_cur_state.status) - { - this->_cur_state = new_state; - this->_wc_state_pub->publish(this->_cur_state); - } -} - -void WorkcellOrchestrator::_cancel_all_tasks() -{ - for (const auto& ctx: this->_ctxs) - { - RCLCPP_INFO(this->get_logger(), "Canceling task [%s]", - ctx->task.id.c_str()); - this->_ctx_mgr->set_active_context(ctx); - ctx->bt.haltTree(); - this->_handle_command_failed(ctx); - } - this->_ctxs.clear(); - return; -} - void WorkcellOrchestrator::_register() { if (!this->_lifecycle_mgr->changeStateForAllNodes( @@ -930,22 +700,23 @@ void WorkcellOrchestrator::_process_signal( endpoints::SignalWorkcellService::ServiceType::Request::ConstSharedPtr req, endpoints::SignalWorkcellService::ServiceType::Response::SharedPtr resp) { - for (const auto& ctx : this->_ctxs) + for (const auto& job : this->_job_mgr->jobs()) { - if (ctx->task.id == req->task_id) + if (job.ctx->task.id == req->task_id) { RCLCPP_INFO( this->get_logger(), "Received signal [%s] for task [%s] from system orchestrator", req->signal.c_str(), req->task_id.c_str()); - ctx->signals.emplace(req->signal); + job.ctx->signals.emplace(req->signal); resp->success = true; + return; } } std::ostringstream oss; - oss << "Received signal [" << req->signal << "] for task [" << req->task_id << - "] which is not currently running."; + oss << "Failed to set signal signal [" << req->signal << "] for task [" << + req->task_id << "]: Task not found"; resp->message = oss.str(); RCLCPP_WARN_STREAM(this->get_logger(), resp->message); resp->success = false; @@ -954,41 +725,10 @@ void WorkcellOrchestrator::_process_signal( BT::Tree WorkcellOrchestrator::_create_bt(const std::shared_ptr& ctx) { // To keep things simple, the task type is used as the key for the behavior tree to use. - this->_ctx_mgr->set_active_context(ctx); return this->_bt_factory->createTreeFromFile(this->_bt_store.get_bt( ctx->task.type)); } -void WorkcellOrchestrator::_handle_command_success( - const std::shared_ptr& ctx) -{ - RCLCPP_INFO(this->get_logger(), "Task finished successfully"); - ctx->task_state.status = TaskState::STATUS_FINISHED; - auto result = - std::make_shared(); - result->success = true; - result->result = YAML::Dump(ctx->task.previous_results); - auto report = ctx->bt_logging->generate_report(); - result->message = common::ReportConverter::to_string(report); - RCLCPP_INFO(this->get_logger(), "%s", result->message.c_str()); - ctx->goal_handle->succeed(result); -} - -void WorkcellOrchestrator::_handle_command_failed( - const std::shared_ptr& ctx) -{ - RCLCPP_ERROR(this->get_logger(), "Task failed"); - ctx->task_state.status = TaskState::STATUS_FAILED; - // Publish feedback. - auto fb = std::make_shared(); - fb->state = ctx->task_state; - ctx->goal_handle->publish_feedback(std::move(fb)); - // Abort the action request. - auto result = - std::make_shared(); - ctx->goal_handle->abort(result); -} - void WorkcellOrchestrator::_handle_task_doable( endpoints::IsTaskDoableService::ServiceType::Request::ConstSharedPtr req, endpoints::IsTaskDoableService::ServiceType::Response::SharedPtr resp) diff --git a/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp b/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp index 8e2b539d..12b8717c 100644 --- a/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp +++ b/nexus_workcell_orchestrator/src/workcell_orchestrator.hpp @@ -18,6 +18,7 @@ #ifndef NEXUS_WORKCELL_ORCHESTRATOR__WORKCELL_ORCHESTRATOR_HPP #define NEXUS_WORKCELL_ORCHESTRATOR__WORKCELL_ORCHESTRATOR_HPP +#include "job_manager.hpp" #include "task_parser.hpp" #include @@ -119,28 +120,18 @@ private: rclcpp::Service:: SharedPtr _remove_pending_task_srv; private: std::atomic _paused; -private: std::list> _ctxs; -private: std::shared_ptr _ctx_mgr; private: std::unique_ptr> _lifecycle_mgr{ nullptr}; private: std::filesystem::path _bt_path; // mapping of mapped task type and the original private: std::unordered_map _task_remaps; private: TaskParser _task_parser; +private: std::optional _job_mgr; private: CallbackReturn _configure( const rclcpp_lifecycle::State& previous_state); - /** - * Tick a behavior tree. - */ -private: void _tick_bt(const std::shared_ptr& ctx); - -private: void _tick_all_bts(); - -private: void _cancel_all_tasks(); - /** * Register this workcell to the system orchestrator. */