From 37d33872fe21bd36459024888bddd65f587b318c Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 12:49:17 +0900 Subject: [PATCH 1/9] refactor(cie_thread_configurator): simplify spawn_cie_thread with isolated rclcpp context Remove node dependency from spawn_cie_thread function and create an isolated rclcpp context within each spawned thread. This change allows threads to be spawned without requiring the caller to manage ROS node dependencies, while maintaining callback group registration capabilities through autonomous context initialization and shutdown. The implementation: - Initializes a dedicated rclcpp Context with auto logging disabled - Creates a temporary node for publishing callback group information - Properly shuts down the context after publishing - Executes the user-provided function with forwarded arguments - Simplifies the API by removing node parameter requirements Update sample application to demonstrate usage with a non-ROS worker thread that leverages the simplified spawn_cie_thread API. --- cie_sample_application/CMakeLists.txt | 3 +- .../cie_sample_application/sample_node.hpp | 4 +++ cie_sample_application/package.xml | 1 + cie_sample_application/src/sample_node.cpp | 19 +++++++++++ .../cie_thread_configurator.hpp | 34 +++++++++++++++++++ 5 files changed, 60 insertions(+), 1 deletion(-) diff --git a/cie_sample_application/CMakeLists.txt b/cie_sample_application/CMakeLists.txt index 771b0b3..53e6889 100644 --- a/cie_sample_application/CMakeLists.txt +++ b/cie_sample_application/CMakeLists.txt @@ -11,6 +11,7 @@ find_package(rclcpp_components REQUIRED) find_package(std_msgs REQUIRED) find_package(callback_isolated_executor REQUIRED) +find_package(cie_thread_configurator REQUIRED) if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) @@ -26,7 +27,7 @@ endif() include_directories(include) add_library(sample_node SHARED src/sample_node.cpp) -ament_target_dependencies(sample_node rclcpp rclcpp_components std_msgs) +ament_target_dependencies(sample_node rclcpp rclcpp_components std_msgs cie_thread_configurator) rclcpp_components_register_nodes(sample_node "SampleNode") add_executable(sample_node_main src/sample_node_main.cpp) diff --git a/cie_sample_application/include/cie_sample_application/sample_node.hpp b/cie_sample_application/include/cie_sample_application/sample_node.hpp index 709f139..741f175 100644 --- a/cie_sample_application/include/cie_sample_application/sample_node.hpp +++ b/cie_sample_application/include/cie_sample_application/sample_node.hpp @@ -7,11 +7,13 @@ class SampleNode : public rclcpp::Node { public: explicit SampleNode( const rclcpp::NodeOptions &options = rclcpp::NodeOptions()); + ~SampleNode(); private: void timer_callback(); void timer_callback2(); void subscription_callback(const std_msgs::msg::Int32::SharedPtr msg); + void non_ros_thread_func(int value); rclcpp::TimerBase::SharedPtr timer_; rclcpp::TimerBase::SharedPtr timer2_; @@ -26,4 +28,6 @@ class SampleNode : public rclcpp::Node { size_t count_; size_t count2_; + + std::thread non_ros_thread_; }; diff --git a/cie_sample_application/package.xml b/cie_sample_application/package.xml index 026e5da..fb2b800 100644 --- a/cie_sample_application/package.xml +++ b/cie_sample_application/package.xml @@ -24,6 +24,7 @@ std_msgs callback_isolated_executor + cie_thread_configurator ament_cmake diff --git a/cie_sample_application/src/sample_node.cpp b/cie_sample_application/src/sample_node.cpp index 27744be..12af2ec 100644 --- a/cie_sample_application/src/sample_node.cpp +++ b/cie_sample_application/src/sample_node.cpp @@ -3,6 +3,7 @@ #include #include "cie_sample_application/sample_node.hpp" +#include "cie_thread_configurator/cie_thread_configurator.hpp" #include "rclcpp_components/register_node_macro.hpp" using namespace std::chrono_literals; @@ -31,6 +32,15 @@ SampleNode::SampleNode(const rclcpp::NodeOptions &options) std::bind(&SampleNode::subscription_callback, this, std::placeholders::_1), sub_options); + + non_ros_thread_ = cie_thread_configurator::spawn_cie_thread( + "sample_test_thread", &SampleNode::non_ros_thread_func, this, 42); +} + +SampleNode::~SampleNode() { + if (non_ros_thread_.joinable()) { + non_ros_thread_.join(); + } } void SampleNode::timer_callback() { @@ -61,4 +71,13 @@ void SampleNode::subscription_callback( msg->data); } +void SampleNode::non_ros_thread_func(int value) { + long tid = syscall(SYS_gettid); + for (int i = 0; i < 5; ++i) { + std::this_thread::sleep_for(2s); + RCLCPP_INFO(this->get_logger(), "Test thread running (tid=%ld), value: %d", + tid, value); + } +} + RCLCPP_COMPONENTS_REGISTER_NODE(SampleNode) diff --git a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp index 730e5f5..5e02cb3 100644 --- a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp +++ b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "cie_config_msgs/msg/callback_group_info.hpp" @@ -30,4 +31,37 @@ void publish_callback_group_info( // Get hardware information from lscpu command std::map get_hardware_info(); +/// Spawn a thread whose scheduling policy can be managed through +template +std::thread spawn_cie_thread(const char *thread_name, F &&f, Args &&...args) { + std::thread t( + [thread_name, func = std::forward(f), + captured_args = std::make_tuple(std::forward(args)...)]() mutable { + // Create a unique rclcpp context in case `rclcpp::init()` is not called + rclcpp::InitOptions init_options; + init_options.shutdown_on_signal = false; + init_options.auto_initialize_logging(false); + auto context = std::make_shared(); + context->init(0, nullptr, init_options); + + rclcpp::NodeOptions options; + options.context(context); + auto node = std::make_shared( + "cie_thread_client", "/cie_thread_configurator", options); + + auto publisher = + node->create_publisher( + "/cie_thread_configurator/callback_group_info", + rclcpp::QoS(1000).keep_all()); + auto tid = static_cast(syscall(SYS_gettid)); + publish_callback_group_info(publisher, tid, thread_name); + + context->shutdown("Publishing is finished."); + + std::apply(std::move(func), std::move(captured_args)); + }); + + return t; +} + } // namespace cie_thread_configurator From 95d6cbbe6828de11a9774d848c32ee9c18927ab3 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 13:08:49 +0900 Subject: [PATCH 2/9] test: add sample_non_ros_process --- cie_sample_application/CMakeLists.txt | 4 ++++ .../src/sample_non_ros_process.cpp | 12 ++++++++++++ 2 files changed, 16 insertions(+) create mode 100644 cie_sample_application/src/sample_non_ros_process.cpp diff --git a/cie_sample_application/CMakeLists.txt b/cie_sample_application/CMakeLists.txt index 53e6889..d8b2787 100644 --- a/cie_sample_application/CMakeLists.txt +++ b/cie_sample_application/CMakeLists.txt @@ -34,8 +34,12 @@ add_executable(sample_node_main src/sample_node_main.cpp) target_link_libraries(sample_node_main sample_node) ament_target_dependencies(sample_node_main rclcpp callback_isolated_executor) +add_executable(sample_non_ros_process src/sample_non_ros_process.cpp) +ament_target_dependencies(sample_non_ros_process cie_thread_configurator) + install(TARGETS sample_node_main + sample_non_ros_process DESTINATION lib/${PROJECT_NAME} ) diff --git a/cie_sample_application/src/sample_non_ros_process.cpp b/cie_sample_application/src/sample_non_ros_process.cpp new file mode 100644 index 0000000..4398748 --- /dev/null +++ b/cie_sample_application/src/sample_non_ros_process.cpp @@ -0,0 +1,12 @@ +#include + +#include "cie_thread_configurator/cie_thread_configurator.hpp" + +void worker_function() { std::cout << "Worker thread running" << std::endl; } + +int main(int /*argc*/, char ** /*argv*/) { + auto thread = cie_thread_configurator::spawn_cie_thread("standalone_worker", + worker_function); + thread.join(); + return 0; +} From 515d996e435dc46b5ad565793fd02ec4eb31f30b Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 13:10:39 +0900 Subject: [PATCH 3/9] fix --- cie_sample_application/src/sample_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cie_sample_application/src/sample_node.cpp b/cie_sample_application/src/sample_node.cpp index 12af2ec..ad9fab2 100644 --- a/cie_sample_application/src/sample_node.cpp +++ b/cie_sample_application/src/sample_node.cpp @@ -34,7 +34,7 @@ SampleNode::SampleNode(const rclcpp::NodeOptions &options) sub_options); non_ros_thread_ = cie_thread_configurator::spawn_cie_thread( - "sample_test_thread", &SampleNode::non_ros_thread_func, this, 42); + "sample_non_ros_thread", &SampleNode::non_ros_thread_func, this, 42); } SampleNode::~SampleNode() { From f2af6000d8788fa7cd7456d6f2ac5e25ccff0018 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 13:12:36 +0900 Subject: [PATCH 4/9] fix --- .../include/cie_thread_configurator/cie_thread_configurator.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp index 5e02cb3..e74a77f 100644 --- a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp +++ b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp @@ -32,6 +32,7 @@ void publish_callback_group_info( std::map get_hardware_info(); /// Spawn a thread whose scheduling policy can be managed through +/// cie_thread_configurator template std::thread spawn_cie_thread(const char *thread_name, F &&f, Args &&...args) { std::thread t( From c30409d2ee49509b369c6e927ac73b932e499771 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 13:36:21 +0900 Subject: [PATCH 5/9] fix --- cie_thread_configurator/src/prerun_node.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cie_thread_configurator/src/prerun_node.cpp b/cie_thread_configurator/src/prerun_node.cpp index aeee45b..7b64fa9 100644 --- a/cie_thread_configurator/src/prerun_node.cpp +++ b/cie_thread_configurator/src/prerun_node.cpp @@ -20,8 +20,12 @@ PrerunNode::PrerunNode() : Node("prerun_node") { void PrerunNode::topic_callback( const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg) { if (callback_group_ids_.find(msg->callback_group_id) != - callback_group_ids_.end()) + callback_group_ids_.end()) { + RCLCPP_ERROR(this->get_logger(), + "Duplicate callback_group_id received: tid=%ld | %s", + msg->thread_id, msg->callback_group_id.c_str()); return; + } RCLCPP_INFO(this->get_logger(), "Received CallbackGroupInfo: tid=%ld | %s", msg->thread_id, msg->callback_group_id.c_str()); From 9570c1b040ef701470174b4dd8b340cf44d70f95 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 13:38:13 +0900 Subject: [PATCH 6/9] fix --- .../cie_thread_configurator/cie_thread_configurator.hpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp index e74a77f..354aa31 100644 --- a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp +++ b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp @@ -32,7 +32,9 @@ void publish_callback_group_info( std::map get_hardware_info(); /// Spawn a thread whose scheduling policy can be managed through -/// cie_thread_configurator +/// cie_thread_configurator. +/// Caution: the `thread_name` must be unique among threads managed by +/// cie_thread_configurator. template std::thread spawn_cie_thread(const char *thread_name, F &&f, Args &&...args) { std::thread t( From 6942e4b23ff4872447c25e515397379ce711612c Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 18:55:37 +0900 Subject: [PATCH 7/9] feat: NonRosThreadInfo --- cie_config_msgs/CMakeLists.txt | 5 +- cie_config_msgs/msg/NonRosThreadInfo.msg | 2 + .../cie_thread_configurator.hpp | 57 ++++---- .../cie_thread_configurator/prerun_node.hpp | 10 +- .../thread_configurator_node.hpp | 22 ++-- cie_thread_configurator/src/main.cpp | 1 + cie_thread_configurator/src/prerun_node.cpp | 44 ++++++- .../src/thread_configurator_node.cpp | 122 +++++++++++++----- 8 files changed, 191 insertions(+), 72 deletions(-) create mode 100644 cie_config_msgs/msg/NonRosThreadInfo.msg diff --git a/cie_config_msgs/CMakeLists.txt b/cie_config_msgs/CMakeLists.txt index 2cd6a5c..6ff3bd8 100644 --- a/cie_config_msgs/CMakeLists.txt +++ b/cie_config_msgs/CMakeLists.txt @@ -20,6 +20,9 @@ if(BUILD_TESTING) ament_lint_auto_find_test_dependencies() endif() -rosidl_generate_interfaces(${PROJECT_NAME} "msg/CallbackGroupInfo.msg") +rosidl_generate_interfaces(${PROJECT_NAME} + "msg/CallbackGroupInfo.msg" + "msg/NonRosThreadInfo.msg" +) ament_package() diff --git a/cie_config_msgs/msg/NonRosThreadInfo.msg b/cie_config_msgs/msg/NonRosThreadInfo.msg new file mode 100644 index 0000000..4005f8b --- /dev/null +++ b/cie_config_msgs/msg/NonRosThreadInfo.msg @@ -0,0 +1,2 @@ +string thread_name +int64 thread_id diff --git a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp index 354aa31..84d9927 100644 --- a/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp +++ b/cie_thread_configurator/include/cie_thread_configurator/cie_thread_configurator.hpp @@ -7,6 +7,7 @@ #include #include "cie_config_msgs/msg/callback_group_info.hpp" +#include "cie_config_msgs/msg/non_ros_thread_info.hpp" namespace cie_thread_configurator { @@ -37,32 +38,36 @@ std::map get_hardware_info(); /// cie_thread_configurator. template std::thread spawn_cie_thread(const char *thread_name, F &&f, Args &&...args) { - std::thread t( - [thread_name, func = std::forward(f), - captured_args = std::make_tuple(std::forward(args)...)]() mutable { - // Create a unique rclcpp context in case `rclcpp::init()` is not called - rclcpp::InitOptions init_options; - init_options.shutdown_on_signal = false; - init_options.auto_initialize_logging(false); - auto context = std::make_shared(); - context->init(0, nullptr, init_options); - - rclcpp::NodeOptions options; - options.context(context); - auto node = std::make_shared( - "cie_thread_client", "/cie_thread_configurator", options); - - auto publisher = - node->create_publisher( - "/cie_thread_configurator/callback_group_info", - rclcpp::QoS(1000).keep_all()); - auto tid = static_cast(syscall(SYS_gettid)); - publish_callback_group_info(publisher, tid, thread_name); - - context->shutdown("Publishing is finished."); - - std::apply(std::move(func), std::move(captured_args)); - }); + std::thread t([thread_name, func = std::forward(f), + captured_args = + std::make_tuple(std::forward(args)...)]() mutable { + // Create a unique rclcpp context in case `rclcpp::init()` is not called + rclcpp::InitOptions init_options; + init_options.shutdown_on_signal = false; + init_options.auto_initialize_logging(false); + auto context = std::make_shared(); + context->init(0, nullptr, init_options); + + rclcpp::NodeOptions options; + options.context(context); + auto node = std::make_shared( + "cie_thread_client", "/cie_thread_configurator", options); + + auto publisher = + node->create_publisher( + "/cie_thread_configurator/non_ros_thread_info", + rclcpp::QoS(1000).keep_all()); + auto tid = static_cast(syscall(SYS_gettid)); + + auto message = std::make_shared(); + message->thread_id = tid; + message->thread_name = thread_name; + publisher->publish(*message); + + context->shutdown("Publishing is finished."); + + std::apply(std::move(func), std::move(captured_args)); + }); return t; } diff --git a/cie_thread_configurator/include/cie_thread_configurator/prerun_node.hpp b/cie_thread_configurator/include/cie_thread_configurator/prerun_node.hpp index 54bafd5..d9acaac 100644 --- a/cie_thread_configurator/include/cie_thread_configurator/prerun_node.hpp +++ b/cie_thread_configurator/include/cie_thread_configurator/prerun_node.hpp @@ -8,6 +8,7 @@ #include "yaml-cpp/yaml.h" #include "cie_config_msgs/msg/callback_group_info.hpp" +#include "cie_config_msgs/msg/non_ros_thread_info.hpp" class PrerunNode : public rclcpp::Node { public: @@ -15,10 +16,15 @@ class PrerunNode : public rclcpp::Node { void dump_yaml_config(std::filesystem::path path); private: - void - topic_callback(const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg); + void callback_group_callback( + const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg); + void non_ros_thread_callback( + const cie_config_msgs::msg::NonRosThreadInfo::SharedPtr msg); rclcpp::Subscription::SharedPtr subscription_; + rclcpp::Subscription::SharedPtr + non_ros_thread_subscription_; std::set callback_group_ids_; + std::set non_ros_thread_names_; }; diff --git a/cie_thread_configurator/include/cie_thread_configurator/thread_configurator_node.hpp b/cie_thread_configurator/include/cie_thread_configurator/thread_configurator_node.hpp index 194d513..ea64992 100644 --- a/cie_thread_configurator/include/cie_thread_configurator/thread_configurator_node.hpp +++ b/cie_thread_configurator/include/cie_thread_configurator/thread_configurator_node.hpp @@ -7,10 +7,11 @@ #include "yaml-cpp/yaml.h" #include "cie_config_msgs/msg/callback_group_info.hpp" +#include "cie_config_msgs/msg/non_ros_thread_info.hpp" class ThreadConfiguratorNode : public rclcpp::Node { - struct CallbackGroupConfig { - std::string callback_group_id; + struct ThreadConfig { + std::string thread_str; // callback_group_id or thread_name int64_t thread_id = -1; std::vector affinity; std::string policy; @@ -35,18 +36,21 @@ class ThreadConfiguratorNode : public rclcpp::Node { private: bool set_affinity_by_cgroup(int64_t thread_id, const std::vector &cpus); - bool issue_syscalls(const CallbackGroupConfig &config); - void - topic_callback(const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg); + bool issue_syscalls(const ThreadConfig &config); + void callback_group_callback( + const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg); + void non_ros_thread_callback( + const cie_config_msgs::msg::NonRosThreadInfo::SharedPtr msg); rclcpp::Subscription::SharedPtr subscription_; + rclcpp::Subscription::SharedPtr + non_ros_thread_subscription_; - std::vector callback_group_configs_; - std::unordered_map - id_to_callback_group_config_; + std::vector thread_configs_; + std::unordered_map id_to_thread_config_; int unapplied_num_; int cgroup_num_; - std::vector deadline_configs_; + std::vector deadline_configs_; }; diff --git a/cie_thread_configurator/src/main.cpp b/cie_thread_configurator/src/main.cpp index dd3c4ae..7264471 100644 --- a/cie_thread_configurator/src/main.cpp +++ b/cie_thread_configurator/src/main.cpp @@ -71,6 +71,7 @@ static void spin_thread_configurator_node(const std::string &config_filename) { } std::cout << config["callback_groups"] << std::endl; + std::cout << config["non_ros_threads"] << std::endl; auto node = std::make_shared(config); auto executor = std::make_shared(); diff --git a/cie_thread_configurator/src/prerun_node.cpp b/cie_thread_configurator/src/prerun_node.cpp index 7b64fa9..552b5a5 100644 --- a/cie_thread_configurator/src/prerun_node.cpp +++ b/cie_thread_configurator/src/prerun_node.cpp @@ -4,6 +4,7 @@ #include #include "cie_config_msgs/msg/callback_group_info.hpp" +#include "cie_config_msgs/msg/non_ros_thread_info.hpp" #include "rclcpp/rclcpp.hpp" #include "yaml-cpp/yaml.h" @@ -14,10 +15,17 @@ PrerunNode::PrerunNode() : Node("prerun_node") { subscription_ = this->create_subscription( "/cie_thread_configurator/callback_group_info", 100, - std::bind(&PrerunNode::topic_callback, this, std::placeholders::_1)); + std::bind(&PrerunNode::callback_group_callback, this, + std::placeholders::_1)); + + non_ros_thread_subscription_ = + this->create_subscription( + "/cie_thread_configurator/non_ros_thread_info", 100, + std::bind(&PrerunNode::non_ros_thread_callback, this, + std::placeholders::_1)); } -void PrerunNode::topic_callback( +void PrerunNode::callback_group_callback( const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg) { if (callback_group_ids_.find(msg->callback_group_id) != callback_group_ids_.end()) { @@ -33,6 +41,22 @@ void PrerunNode::topic_callback( callback_group_ids_.insert(msg->callback_group_id); } +void PrerunNode::non_ros_thread_callback( + const cie_config_msgs::msg::NonRosThreadInfo::SharedPtr msg) { + if (non_ros_thread_names_.find(msg->thread_name) != + non_ros_thread_names_.end()) { + RCLCPP_ERROR(this->get_logger(), + "Duplicate thread_name received: tid=%ld | %s", msg->thread_id, + msg->thread_name.c_str()); + return; + } + + RCLCPP_INFO(this->get_logger(), "Received NonRosThreadInfo: tid=%ld | %s", + msg->thread_id, msg->thread_name.c_str()); + + non_ros_thread_names_.insert(msg->thread_name); +} + void PrerunNode::dump_yaml_config(std::filesystem::path path) { YAML::Emitter out; @@ -64,6 +88,22 @@ void PrerunNode::dump_yaml_config(std::filesystem::path path) { out << YAML::Newline; } + out << YAML::EndSeq; + + // Add non_ros_threads section + out << YAML::Key << "non_ros_threads"; + out << YAML::Value << YAML::BeginSeq; + + for (const std::string &thread_name : non_ros_thread_names_) { + out << YAML::BeginMap; + out << YAML::Key << "id" << YAML::Value << thread_name; + out << YAML::Key << "affinity" << YAML::Value << YAML::Null; + out << YAML::Key << "policy" << YAML::Value << "SCHED_OTHER"; + out << YAML::Key << "priority" << YAML::Value << 0; + out << YAML::EndMap; + out << YAML::Newline; + } + out << YAML::EndSeq; out << YAML::EndMap; diff --git a/cie_thread_configurator/src/thread_configurator_node.cpp b/cie_thread_configurator/src/thread_configurator_node.cpp index 83cc46f..f7b33cd 100644 --- a/cie_thread_configurator/src/thread_configurator_node.cpp +++ b/cie_thread_configurator/src/thread_configurator_node.cpp @@ -9,6 +9,7 @@ #include #include "cie_config_msgs/msg/callback_group_info.hpp" +#include "cie_config_msgs/msg/non_ros_thread_info.hpp" #include "rclcpp/rclcpp.hpp" #include "yaml-cpp/yaml.h" @@ -18,8 +19,13 @@ ThreadConfiguratorNode::ThreadConfiguratorNode(const YAML::Node &yaml) : Node("thread_configurator_node"), unapplied_num_(0), cgroup_num_(0) { YAML::Node callback_groups = yaml["callback_groups"]; - unapplied_num_ = callback_groups.size(); - callback_group_configs_.resize(callback_groups.size()); + YAML::Node non_ros_threads = yaml["non_ros_threads"]; + + size_t callback_groups_size = callback_groups ? callback_groups.size() : 0; + size_t non_ros_threads_size = non_ros_threads ? non_ros_threads.size() : 0; + + unapplied_num_ = callback_groups_size + non_ros_threads_size; + thread_configs_.resize(callback_groups_size + non_ros_threads_size); // For backward compatibility: remove trailing "Waitable@"s auto remove_trailing_waitable = [](std::string s) { @@ -37,25 +43,37 @@ ThreadConfiguratorNode::ThreadConfiguratorNode(const YAML::Node &yaml) return s; }; - for (size_t i = 0; i < callback_groups.size(); i++) { - const auto &callback_group = callback_groups[i]; - auto &config = callback_group_configs_[i]; - - config.callback_group_id = - remove_trailing_waitable(callback_group["id"].as()); - for (auto &cpu : callback_group["affinity"]) + // Common lambda to load thread configuration from YAML + auto load_thread_config = [](ThreadConfig &config, const YAML::Node &node) { + config.thread_str = node["id"].as(); + for (auto &cpu : node["affinity"]) config.affinity.push_back(cpu.as()); - config.policy = callback_group["policy"].as(); + config.policy = node["policy"].as(); if (config.policy == "SCHED_DEADLINE") { - config.runtime = callback_group["runtime"].as(); - config.period = callback_group["period"].as(); - config.deadline = callback_group["deadline"].as(); + config.runtime = node["runtime"].as(); + config.period = node["period"].as(); + config.deadline = node["deadline"].as(); } else { - config.priority = callback_group["priority"].as(); + config.priority = node["priority"].as(); } + }; + + size_t config_index = 0; - id_to_callback_group_config_[config.callback_group_id] = &config; + // Load callback_groups configuration + for (size_t i = 0; i < callback_groups_size; i++) { + auto &config = thread_configs_[config_index++]; + load_thread_config(config, callback_groups[i]); + config.thread_str = remove_trailing_waitable(config.thread_str); + id_to_thread_config_[config.thread_str] = &config; + } + + // Load non_ros_threads configuration + for (size_t i = 0; i < non_ros_threads_size; i++) { + auto &config = thread_configs_[config_index++]; + load_thread_config(config, non_ros_threads[i]); + id_to_thread_config_[config.thread_str] = &config; } auto qos = rclcpp::QoS(rclcpp::QoSInitialization( @@ -64,7 +82,13 @@ ThreadConfiguratorNode::ThreadConfiguratorNode(const YAML::Node &yaml) subscription_ = this->create_subscription( "/cie_thread_configurator/callback_group_info", qos, - std::bind(&ThreadConfiguratorNode::topic_callback, this, + std::bind(&ThreadConfiguratorNode::callback_group_callback, this, + std::placeholders::_1)); + + non_ros_thread_subscription_ = + this->create_subscription( + "/cie_thread_configurator/non_ros_thread_info", qos, + std::bind(&ThreadConfiguratorNode::non_ros_thread_callback, this, std::placeholders::_1)); } @@ -79,13 +103,11 @@ ThreadConfiguratorNode::~ThreadConfiguratorNode() { bool ThreadConfiguratorNode::all_applied() { return unapplied_num_ == 0; } void ThreadConfiguratorNode::print_all_unapplied() { - RCLCPP_WARN(this->get_logger(), - "Following callback groups are not yet configured"); + RCLCPP_WARN(this->get_logger(), "Following threads are not yet configured"); - for (auto &config : callback_group_configs_) { + for (auto &config : thread_configs_) { if (!config.applied) { - RCLCPP_WARN(this->get_logger(), " - %s", - config.callback_group_id.c_str()); + RCLCPP_WARN(this->get_logger(), " - %s", config.thread_str.c_str()); } } } @@ -122,7 +144,7 @@ bool ThreadConfiguratorNode::set_affinity_by_cgroup( return true; } -bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { +bool ThreadConfiguratorNode::issue_syscalls(const ThreadConfig &config) { if (config.policy == "SCHED_OTHER" || config.policy == "SCHED_BATCH" || config.policy == "SCHED_IDLE") { struct sched_param param; @@ -137,7 +159,7 @@ bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { if (sched_setscheduler(config.thread_id, m[config.policy], ¶m) == -1) { RCLCPP_ERROR( this->get_logger(), "Failed to configure policy (id=%s, tid=%ld): %s", - config.callback_group_id.c_str(), config.thread_id, strerror(errno)); + config.thread_str.c_str(), config.thread_id, strerror(errno)); return false; } @@ -145,7 +167,7 @@ bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { if (setpriority(PRIO_PROCESS, config.thread_id, config.priority) == -1) { RCLCPP_ERROR(this->get_logger(), "Failed to configure nice value (id=%s, tid=%ld): %s", - config.callback_group_id.c_str(), config.thread_id, + config.thread_str.c_str(), config.thread_id, strerror(errno)); return false; } @@ -162,7 +184,7 @@ bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { if (sched_setscheduler(config.thread_id, m[config.policy], ¶m) == -1) { RCLCPP_ERROR( this->get_logger(), "Failed to configure policy (id=%s, tid=%ld): %s", - config.callback_group_id.c_str(), config.thread_id, strerror(errno)); + config.thread_str.c_str(), config.thread_id, strerror(errno)); return false; } @@ -182,7 +204,7 @@ bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { if (sched_setattr(config.thread_id, &attr, 0) == -1) { RCLCPP_ERROR( this->get_logger(), "Failed to configure policy (id=%s, tid=%ld): %s", - config.callback_group_id.c_str(), config.thread_id, strerror(errno)); + config.thread_str.c_str(), config.thread_id, strerror(errno)); return false; } } @@ -192,7 +214,7 @@ bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { if (!set_affinity_by_cgroup(config.thread_id, config.affinity)) { RCLCPP_ERROR(this->get_logger(), "Failed to configure affinity (id=%s, tid=%ld): %s", - config.callback_group_id.c_str(), config.thread_id, + config.thread_str.c_str(), config.thread_id, "Please disable cgroup v2 if used: " "`systemd.unified_cgroup_hierarchy=0`"); return false; @@ -205,7 +227,7 @@ bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { if (sched_setaffinity(config.thread_id, sizeof(set), &set) == -1) { RCLCPP_ERROR(this->get_logger(), "Failed to configure affinity (id=%s, tid=%ld): %s", - config.callback_group_id.c_str(), config.thread_id, + config.thread_str.c_str(), config.thread_id, strerror(errno)); return false; } @@ -215,10 +237,10 @@ bool ThreadConfiguratorNode::issue_syscalls(const CallbackGroupConfig &config) { return true; } -void ThreadConfiguratorNode::topic_callback( +void ThreadConfiguratorNode::callback_group_callback( const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg) { - auto it = id_to_callback_group_config_.find(msg->callback_group_id); - if (it == id_to_callback_group_config_.end()) { + auto it = id_to_thread_config_.find(msg->callback_group_id); + if (it == id_to_thread_config_.end()) { RCLCPP_INFO(this->get_logger(), "Received CallbackGroupInfo: but the yaml file does not " "contain configuration for id=%s (tid=%ld)", @@ -226,7 +248,7 @@ void ThreadConfiguratorNode::topic_callback( return; } - CallbackGroupConfig *config = it->second; + ThreadConfig *config = it->second; if (config->applied) { RCLCPP_INFO( this->get_logger(), @@ -252,6 +274,42 @@ void ThreadConfiguratorNode::topic_callback( unapplied_num_--; } +void ThreadConfiguratorNode::non_ros_thread_callback( + const cie_config_msgs::msg::NonRosThreadInfo::SharedPtr msg) { + auto it = id_to_thread_config_.find(msg->thread_name); + if (it == id_to_thread_config_.end()) { + RCLCPP_INFO(this->get_logger(), + "Received NonRosThreadInfo: but the yaml file does not " + "contain configuration for name=%s (tid=%ld)", + msg->thread_name.c_str(), msg->thread_id); + return; + } + + ThreadConfig *config = it->second; + if (config->applied) { + RCLCPP_INFO(this->get_logger(), + "This thread is already configured. skip (name=%s, tid=%ld)", + msg->thread_name.c_str(), msg->thread_id); + return; + } + + RCLCPP_INFO(this->get_logger(), "Received NonRosThreadInfo: tid=%ld | %s", + msg->thread_id, msg->thread_name.c_str()); + config->thread_id = msg->thread_id; + + if (config->policy == "SCHED_DEADLINE") { + // delayed applying + deadline_configs_.push_back(config); + } else { + bool success = issue_syscalls(*config); + if (!success) + return; + } + + config->applied = true; + unapplied_num_--; +} + bool ThreadConfiguratorNode::apply_deadline_configs() { for (auto config : deadline_configs_) { if (!issue_syscalls(*config)) From 5777eb1213b91f9ca86a3fbd580cbcc472e56952 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Wed, 24 Dec 2025 19:02:15 +0900 Subject: [PATCH 8/9] docs: update README --- cie_sample_application/README.md | 7 +++++++ cie_thread_configurator/README.md | 30 +++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/cie_sample_application/README.md b/cie_sample_application/README.md index 700d65c..69be156 100644 --- a/cie_sample_application/README.md +++ b/cie_sample_application/README.md @@ -17,3 +17,10 @@ Or, load the node to the exsiting component container. $ ros2 run callback_isolated_executor component_container_callback_isolated --ros-args --remap __node:=sample_container $ ros2 launch cie_sample_application load_sample_node.launch.xml ``` + +## Standalone Non-ROS Process +```bash +$ ros2 run cie_sample_application sample_non_ros_process +``` + +This demonstrates using `spawn_cie_thread` in a standalone process without any ROS2 node. diff --git a/cie_thread_configurator/README.md b/cie_thread_configurator/README.md index 4a61ee9..fd04bf9 100644 --- a/cie_thread_configurator/README.md +++ b/cie_thread_configurator/README.md @@ -6,7 +6,7 @@ For instructions on how to use this tool, please refer to https://github.com/tie ## YAML Configuration File Format For each ROS 2 application, prepare a single YAML configuration file. The format of the YAML configuration file is as follows. -There is a top-level entry called `callback_groups`, under which there are arrays representing each callback group. +There are two top-level entries: `callback_groups` for ROS 2 callback groups, and `non_ros_threads` for non-ROS worker threads. The IDs for the callback groups are automatically generated by this tool according to the rules described in the next section. ```yaml @@ -25,6 +25,13 @@ callback_groups: policy: SCHED_FIFO priority: 50 +non_ros_threads: + - id: zzzzz + affinity: + - 4 + policy: SCHED_OTHER + priority: 0 + ... ``` @@ -133,3 +140,24 @@ Timers with the same period cannot be distinguished from each other, so if diffe For `rclcpp::Waitable`, no distinction is made between instances. Note: Future updates may exclude `rclcpp::Waitable` from being included in the CallbackGroup ID. + +## Non-ROS Worker Thread Management + +The `spawn_cie_thread` function enables thread scheduling management for non-ROS2 worker threads. Threads created with this function automatically publish their information to the `cie_thread_configurator` without requiring a ROS node. + +### Usage + +```cpp +#include "cie_thread_configurator/cie_thread_configurator.hpp" + +// Spawn a managed worker thread +auto thread = cie_thread_configurator::spawn_cie_thread( + "worker_thread_name", // Unique thread name (used as ID in YAML) + worker_function, // Thread function + arg1, arg2, ... // Optional arguments +); + +thread.join(); +``` + +The thread name specified as the first argument must match the `id` field in the `non_ros_threads` section of the YAML configuration file. This allows the thread's scheduling policy, priority, and CPU affinity to be configured through the same YAML file used for ROS 2 callback groups. From 562fdea0dc07d39623988df46ab572c5aa67c854 Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Thu, 25 Dec 2025 03:21:29 +0900 Subject: [PATCH 9/9] fix: for backward compatibility --- cie_thread_configurator/src/main.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cie_thread_configurator/src/main.cpp b/cie_thread_configurator/src/main.cpp index 7264471..f69c3f6 100644 --- a/cie_thread_configurator/src/main.cpp +++ b/cie_thread_configurator/src/main.cpp @@ -71,7 +71,9 @@ static void spin_thread_configurator_node(const std::string &config_filename) { } std::cout << config["callback_groups"] << std::endl; - std::cout << config["non_ros_threads"] << std::endl; + if (config["non_ros_threads"]) { + std::cout << config["non_ros_threads"] << std::endl; + } auto node = std::make_shared(config); auto executor = std::make_shared();