diff --git a/trpc/metrics/prometheus/prometheus_metrics.cc b/trpc/metrics/prometheus/prometheus_metrics.cc index 8ed324e2..458fdeb6 100644 --- a/trpc/metrics/prometheus/prometheus_metrics.cc +++ b/trpc/metrics/prometheus/prometheus_metrics.cc @@ -88,7 +88,7 @@ void PrometheusMetrics::Start() noexcept { void PrometheusMetrics::Stop() noexcept { if (push_gateway_task_id_ != 0) { - PeripheryTaskScheduler::GetInstance()->StopInnerTask(push_gateway_task_id_); + PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(push_gateway_task_id_); PeripheryTaskScheduler::GetInstance()->JoinInnerTask(push_gateway_task_id_); push_gateway_task_id_ = 0; } diff --git a/trpc/naming/domain/selector_domain.cc b/trpc/naming/domain/selector_domain.cc index 1da0ef7b..ba76f5ef 100644 --- a/trpc/naming/domain/selector_domain.cc +++ b/trpc/naming/domain/selector_domain.cc @@ -306,7 +306,7 @@ void SelectorDomain::Start() noexcept { void SelectorDomain::Stop() noexcept { if (task_id_) { - PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_); + PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_); task_id_ = 0; } } diff --git a/trpc/rpcz/collector.cc b/trpc/rpcz/collector.cc index 416c3f25..76c6acd1 100644 --- a/trpc/rpcz/collector.cc +++ b/trpc/rpcz/collector.cc @@ -84,7 +84,7 @@ void RpczCollectorTask::Destroy() { void RpczCollectorTask::Stop() { if (task_id_) { - trpc::PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_); + trpc::PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_); task_id_ = 0; } } diff --git a/trpc/runtime/common/heartbeat/heartbeat_report.cc b/trpc/runtime/common/heartbeat/heartbeat_report.cc index ad16889f..ebe235af 100644 --- a/trpc/runtime/common/heartbeat/heartbeat_report.cc +++ b/trpc/runtime/common/heartbeat/heartbeat_report.cc @@ -117,7 +117,7 @@ void HeartBeatReport::Stop() { enable_ = false; - PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_); + PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_); PeripheryTaskScheduler::GetInstance()->JoinInnerTask(task_id_); task_id_ = 0; diff --git a/trpc/runtime/common/periphery_task_scheduler.cc b/trpc/runtime/common/periphery_task_scheduler.cc index d9a6705c..8e49d9ac 100644 --- a/trpc/runtime/common/periphery_task_scheduler.cc +++ b/trpc/runtime/common/periphery_task_scheduler.cc @@ -47,9 +47,9 @@ void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Stop() { } void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Join() { - for (unsigned i = 0; i < thread_num_; ++i) { - if (workers_[i].joinable()) { - workers_[i].join(); + for (auto & worker : workers_) { + if (worker.joinable()) { + worker.join(); } } workers_.clear(); @@ -61,6 +61,11 @@ void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Join() { } while (!tasks_.empty()) { + // Compatibility handling for scenarios where the user does not call DetachTask/RemoveTask: + // actively call Deref when program exit. + if (tasks_.top()->UnsafeRefCount() > 1) { + tasks_.top()->Deref(); + } tasks_.pop(); } } @@ -135,6 +140,19 @@ bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::StopTaskImpl(uint64_t t return StopAndDestroyTask(task_id, false); } +bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::DetachTaskImpl(uint64_t task_id) { + if (TRPC_UNLIKELY(exited_.load(std::memory_order_relaxed))) { + return false; + } + + TaskPtr task_ptr = GetTaskPtr(task_id); + if (task_ptr.Get() == nullptr) { + return false; + } + + return true; +} + bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::JoinTaskImpl(uint64_t task_id) { if (TRPC_UNLIKELY(exited_.load(std::memory_order_relaxed))) { TRPC_FMT_ERROR("PeripheryTaskScheduler is already exited."); @@ -323,6 +341,13 @@ bool PeripheryTaskScheduler::RemoveTask(uint64_t task_id) { return scheduler_->RemoveTaskImpl(task_id); } +bool PeripheryTaskScheduler::DetachTask(std::uint64_t task_id) { + if (!scheduler_) { + return false; + } + return scheduler_->DetachTaskImpl(task_id); +} + bool PeripheryTaskScheduler::StopTask(uint64_t task_id) { if (!scheduler_) { return false; @@ -368,6 +393,13 @@ bool PeripheryTaskScheduler::RemoveInnerTask(uint64_t task_id) { return inner_scheduler_->RemoveTaskImpl(task_id); } +bool PeripheryTaskScheduler::DetachInnerTask(std::uint64_t task_id) { + if (!inner_scheduler_) { + return false; + } + return inner_scheduler_->DetachTaskImpl(task_id); +} + bool PeripheryTaskScheduler::StopInnerTask(uint64_t task_id) { if (!inner_scheduler_) { return false; diff --git a/trpc/runtime/common/periphery_task_scheduler.h b/trpc/runtime/common/periphery_task_scheduler.h index 63640a9f..2adf18e2 100644 --- a/trpc/runtime/common/periphery_task_scheduler.h +++ b/trpc/runtime/common/periphery_task_scheduler.h @@ -102,9 +102,16 @@ class PeripheryTaskScheduler { /// @brief Remove task, used in scenarios where it is not necessary to wait for tasks to complete before exiting. /// @param task_id task id /// @return on success, return true. on error, return false - /// @note This interface can only be called once with the same ID. + /// @note This interface can only be called once with the same ID. After call it, the task_id can't be used again bool RemoveTask(std::uint64_t task_id); + /// @brief Detach task, after calling this interface, the lifecycle of this task will be managed by the scheduler, + // users no longer need to concern themselves with the release of the task. + /// @param task_id task id + /// @return on success return true, otherwise return false + /// @note the task_id can't be used again after calling this interface + bool DetachTask(std::uint64_t task_id); + /// @brief Same as 'SubmitTask', but is used only internally by the framework. std::uint64_t SubmitInnerTask(Function&& task, const std::string& name = ""); @@ -119,10 +126,13 @@ class PeripheryTaskScheduler { /// @brief Same as 'RemoveTask', but is used only internally by the framework. bool RemoveInnerTask(std::uint64_t task_id); - /// @brief Same as 'Stoptask', but is used only internally by the framework. + /// @brief Same as 'DetachTask', but is used only internally by the framework. + bool DetachInnerTask(std::uint64_t task_id); + + /// @brief Same as 'StopTask', but is used only internally by the framework. bool StopInnerTask(std::uint64_t task_id); - /// @brief Same as 'Jointask', but is used only internally by the framework. + /// @brief Same as 'JoinTask', but is used only internally by the framework. bool JoinInnerTask(std::uint64_t task_id); /// @brief Used to destroy resources accessed by scheduled tasks after all scheduled task execution threads have @@ -158,6 +168,7 @@ class PeripheryTaskScheduler { bool StopTaskImpl(std::uint64_t task_id); bool JoinTaskImpl(std::uint64_t task_id); bool RemoveTaskImpl(std::uint64_t task_id); + bool DetachTaskImpl(uint64_t task_id); void Init(size_t thread_num); void Start(); diff --git a/trpc/runtime/common/periphery_task_scheduler_test.cc b/trpc/runtime/common/periphery_task_scheduler_test.cc index 1f59b2e2..e0e6cf92 100644 --- a/trpc/runtime/common/periphery_task_scheduler_test.cc +++ b/trpc/runtime/common/periphery_task_scheduler_test.cc @@ -74,6 +74,13 @@ void TestRemoveTask() { ASSERT_TRUE(PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id)); } +void TestDetachTask() { + std::uint64_t task_id = PeripheryTaskScheduler::GetInstance()->SubmitInnerTask([]() {}); + ASSERT_TRUE(task_id > 0); + ASSERT_FALSE(PeripheryTaskScheduler::GetInstance()->DetachInnerTask(task_id + 1)); + ASSERT_TRUE(PeripheryTaskScheduler::GetInstance()->DetachInnerTask(task_id)); +} + void TestSubmitPeriodicTask() { int count = 0; Latch latch(1); @@ -268,6 +275,8 @@ TEST_F(PeripheryTaskSchedulerTest, SubmitTaskTest) { TestSubmitTask(); } TEST_F(PeripheryTaskSchedulerTest, RemoveTaskTest) { TestRemoveTask(); } +TEST_F(PeripheryTaskSchedulerTest, DetachTaskTest) { TestDetachTask(); } + TEST_F(PeripheryTaskSchedulerTest, SubmitPeriodicTaskTest) { TestSubmitPeriodicTask(); } TEST_F(PeripheryTaskSchedulerTest, RemoveTaskAdvanceTest) { TestRemoveTaskAdvance(); } diff --git a/trpc/runtime/common/runtime_info_report/runtime_info_reporter.cc b/trpc/runtime/common/runtime_info_report/runtime_info_reporter.cc index 13993ba2..2e9ac96b 100644 --- a/trpc/runtime/common/runtime_info_report/runtime_info_reporter.cc +++ b/trpc/runtime/common/runtime_info_report/runtime_info_reporter.cc @@ -173,7 +173,7 @@ void StopReportRuntimeInfo() { return; } - PeripheryTaskScheduler::GetInstance()->StopInnerTask(report_task_id); + PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(report_task_id); PeripheryTaskScheduler::GetInstance()->JoinInnerTask(report_task_id); report_task_id = 0; diff --git a/trpc/runtime/common/stats/frame_stats.cc b/trpc/runtime/common/stats/frame_stats.cc index a78dd7cd..55523012 100644 --- a/trpc/runtime/common/stats/frame_stats.cc +++ b/trpc/runtime/common/stats/frame_stats.cc @@ -37,7 +37,7 @@ void FrameStats::Start() { void FrameStats::Stop() { if (task_id_) { - PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_); + PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_); PeripheryTaskScheduler::GetInstance()->JoinInnerTask(task_id_); task_id_ = 0; } diff --git a/trpc/runtime/iomodel/reactor/default/tcp_connection.cc b/trpc/runtime/iomodel/reactor/default/tcp_connection.cc index edfaeddf..494158ad 100644 --- a/trpc/runtime/iomodel/reactor/default/tcp_connection.cc +++ b/trpc/runtime/iomodel/reactor/default/tcp_connection.cc @@ -180,6 +180,9 @@ int TcpConnection::HandleReadEvent() { int ret = ReadIoData(read_buffer_.buffer); if (ret > 0) { + SetConnActiveTime(trpc::time::GetMilliSeconds()); + GetConnectionHandler()->UpdateConnection(); + std::deque data; RefPtr ref(ref_ptr, this); int checker_ret = GetConnectionHandler()->CheckMessage(ref, read_buffer_.buffer, data); @@ -197,9 +200,6 @@ int TcpConnection::HandleReadEvent() { if (TRPC_UNLIKELY(GetConnectionState() == ConnectionState::kUnconnected)) { return -1; } - - SetConnActiveTime(trpc::time::GetMilliSeconds()); - GetConnectionHandler()->UpdateConnection(); } } else if (checker_ret == kPacketError) { TRPC_LOG_ERROR("TcpConnection::HandleReadEvent fd:" << socket_.GetFd() << ", ip:" << GetPeerIp() diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc index 2c79f9e3..119d4a26 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc @@ -194,6 +194,8 @@ FiberConnection::EventAction FiberTcpConnection::OnReadable() { do { status = ReadData(); if (TRPC_LIKELY(status != ReadStatus::kError)) { + SetConnActiveTime(trpc::time::GetMilliSeconds()); + GetConnectionHandler()->UpdateConnection(); auto rc = ConsumeReadData(); if (TRPC_UNLIKELY(rc != EventAction::kReady)) { TRPC_LOG_WARN("FiberTcpConnection::OnReadable ConsumeReadData failed, ip:" @@ -256,9 +258,6 @@ FiberConnection::EventAction FiberTcpConnection::ConsumeReadData() { return EventAction::kLeaving; } - SetConnActiveTime(trpc::time::GetMilliSeconds()); - GetConnectionHandler()->UpdateConnection(); - return EventAction::kReady; } else if (checker_ret == kPacketError) { return EventAction::kLeaving; diff --git a/trpc/stream/http/http_client_stream_connection_handler.cc b/trpc/stream/http/http_client_stream_connection_handler.cc index 2b5e7627..e4a220cc 100644 --- a/trpc/stream/http/http_client_stream_connection_handler.cc +++ b/trpc/stream/http/http_client_stream_connection_handler.cc @@ -63,6 +63,14 @@ int FiberHttpClientStreamConnectionHandler::CheckMessage(const ConnectionPtr& co HttpClientStreamHandlerPtr handler_ptr = static_pointer_cast(stream_handler_); if (p) { (*p)->GetStream() = handler_ptr->GetHttpStream(); + // When MessageCheck is repeatedly entered on a stream, the underlying objects of (*p)->GetStream() and + // handler_ptr->GetHttpStream()actually point to the same stream object. In this scenario, assigning the + // HttpClientStreamPtrwould cause its internal reference count to first decrease by 1 and then increase by 1. If the + // connection happens to close at this moment, it would lead to incorrect reference counting, causing the stream + // object to be destructed prematurely. Subsequent operations using this stream would then result in a core dump. + if (((*p)->GetStream()).get() != (handler_ptr->GetHttpStream()).get()) { + (*p)->GetStream() = handler_ptr->GetHttpStream(); + } return FiberClientConnectionHandler::CheckMessage(conn, in, out); } diff --git a/trpc/util/thread/cpu.cc b/trpc/util/thread/cpu.cc index bfc7a0b8..38cd6cfc 100644 --- a/trpc/util/thread/cpu.cc +++ b/trpc/util/thread/cpu.cc @@ -117,7 +117,22 @@ void InitializeProcessorInfoOnce() { // I don't think it's possible to print log reliably here, unfortunately. static std::once_flag once; std::call_once(once, [&] { - node_of_cpus.resize(GetNumberOfProcessorsConfigured(), -1); + // In a container environment with strict CPU affinity binding, the number of CPU cores obtained by + // GetNumberOfProcessorsConfigured may reflect the actual number of cores allocated to the container (rather than + // the total cores of the deployment host). For example, if the host machine has 48 cores [0,47) and the container + // is bound to 6 cores [2-7], the corresponding CPU affinity would be {2, 3, 4, 5, 6, 7}. However, in this case, + // GetNumberOfProcessorsConfigured returns 6 (the container's available cores) instead of 48 (the host's total + // cores). This causes CPUs 6 and 7 to be incorrectly judged as inaccessible, preventing fiber worker threads from + // running on these CPUs. Therefore, we adjust the actual number of machine cores based on the maximum CPU index in + // the CPU affinity, to prevent certain CPUs from being misidentified as inaccessible in container environments. + std::size_t number_of_processors = GetNumberOfProcessorsConfigured(); + auto affinity = GetCurrentThreadAffinity(); + if (!affinity.empty()) { + if (affinity.back() >= number_of_processors) { + number_of_processors = affinity.back() + 1; + } + } + node_of_cpus.resize(number_of_processors, -1); for (std::size_t i = 0; i != node_of_cpus.size(); ++i) { auto n = GetNodeOfProcessorImpl(static_cast(i)); if (n == -1) {