LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 89.4 % 85 76 9
Test Date: 2026-03-03 16:12:35 Functions: 88.2 % 17 15 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/detail/intrusive.hpp>
      13                 : #include <boost/capy/test/thread_name.hpp>
      14                 : #include <atomic>
      15                 : #include <condition_variable>
      16                 : #include <cstdio>
      17                 : #include <mutex>
      18                 : #include <thread>
      19                 : #include <vector>
      20                 : 
      21                 : /*
      22                 :     Thread pool implementation using a shared work queue.
      23                 : 
      24                 :     Work items are coroutine handles wrapped in intrusive list nodes, stored
      25                 :     in a single queue protected by a mutex. Worker threads wait on a
      26                 :     condition_variable until work is available or stop is requested.
      27                 : 
      28                 :     Threads are started lazily on first post() via std::call_once to avoid
      29                 :     spawning threads for pools that are constructed but never used. Each
      30                 :     thread is named with a configurable prefix plus index for debugger
      31                 :     visibility.
      32                 : 
      33                 :     Shutdown sequence: stop() sets the stop flag and notifies all threads,
      34                 :     then the destructor joins threads and destroys any remaining queued
      35                 :     work without executing it.
      36                 : */
      37                 : 
      38                 : namespace boost {
      39                 : namespace capy {
      40                 : 
      41                 : //------------------------------------------------------------------------------
      42                 : 
      43                 : class thread_pool::impl
      44                 : {
      45                 :     struct work : detail::intrusive_queue<work>::node
      46                 :     {
      47                 :         std::coroutine_handle<> h_;
      48                 : 
      49 HIT         128 :         explicit work(std::coroutine_handle<> h) noexcept
      50             128 :             : h_(h)
      51                 :         {
      52             128 :         }
      53                 : 
      54             128 :         void run()
      55                 :         {
      56             128 :             auto h = h_;
      57             128 :             delete this;
      58             128 :             h.resume();
      59             128 :         }
      60                 : 
      61 MIS           0 :         void destroy()
      62                 :         {
      63               0 :             delete this;
      64               0 :         }
      65                 :     };
      66                 : 
      67                 :     std::mutex mutex_;
      68                 :     std::condition_variable cv_;
      69                 :     detail::intrusive_queue<work> q_;
      70                 :     std::vector<std::thread> threads_;
      71                 :     std::atomic<bool> stop_{false};
      72                 :     std::size_t num_threads_;
      73                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      74                 :     std::once_flag start_flag_;
      75                 : 
      76                 : public:
      77 HIT          63 :     ~impl()
      78                 :     {
      79              63 :         stop();
      80             105 :         for(auto& t : threads_)
      81              42 :             if(t.joinable())
      82 MIS           0 :                 t.join();
      83                 : 
      84 HIT          63 :         while(auto* w = q_.pop())
      85 MIS           0 :             w->destroy();
      86 HIT          63 :     }
      87                 : 
      88              63 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
      89              63 :         : num_threads_(num_threads)
      90                 :     {
      91              63 :         if(num_threads_ == 0)
      92               2 :             num_threads_ = std::thread::hardware_concurrency();
      93              63 :         if(num_threads_ == 0)
      94 MIS           0 :             num_threads_ = 1;
      95                 : 
      96                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
      97 HIT          63 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
      98              63 :         thread_name_prefix_[n] = '\0';
      99              63 :     }
     100                 : 
     101                 :     void
     102             128 :     post(std::coroutine_handle<> h)
     103                 :     {
     104             128 :         ensure_started();
     105             128 :         auto* w = new work(h);
     106                 :         {
     107             128 :             std::lock_guard<std::mutex> lock(mutex_);
     108             128 :             q_.push(w);
     109             128 :         }
     110             128 :         cv_.notify_one();
     111             128 :     }
     112                 : 
     113                 :     void
     114              63 :     join() noexcept
     115                 :     {
     116              63 :         stop();
     117             105 :         for(auto& t : threads_)
     118              42 :             if(t.joinable())
     119              42 :                 t.join();
     120              63 :     }
     121                 : 
     122                 :     void
     123             126 :     stop() noexcept
     124                 :     {
     125             126 :         stop_.store(true, std::memory_order_release);
     126             126 :         cv_.notify_all();
     127             126 :     }
     128                 : 
     129                 : private:
     130                 :     void
     131             128 :     ensure_started()
     132                 :     {
     133             128 :         std::call_once(start_flag_, [this]{
     134              24 :             threads_.reserve(num_threads_);
     135              66 :             for(std::size_t i = 0; i < num_threads_; ++i)
     136              84 :                 threads_.emplace_back([this, i]{ run(i); });
     137              24 :         });
     138             128 :     }
     139                 : 
     140                 :     void
     141              42 :     run(std::size_t index)
     142                 :     {
     143                 :         // Build name; set_current_thread_name truncates to platform limits.
     144                 :         char name[16];
     145              42 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     146              42 :         set_current_thread_name(name);
     147                 : 
     148                 :         for(;;)
     149                 :         {
     150             170 :             work* w = nullptr;
     151                 :             {
     152             170 :                 std::unique_lock<std::mutex> lock(mutex_);
     153             170 :                 cv_.wait(lock, [this]{
     154             346 :                     return !q_.empty() ||
     155             346 :                         stop_.load(std::memory_order_acquire);
     156                 :                 });
     157             170 :                 if(stop_.load(std::memory_order_acquire) && q_.empty())
     158              84 :                     return;
     159             128 :                 w = q_.pop();
     160             170 :             }
     161             128 :             if(w)
     162             128 :                 w->run();
     163             128 :         }
     164                 :     }
     165                 : };
     166                 : 
     167                 : //------------------------------------------------------------------------------
     168                 : 
     169              63 : thread_pool::
     170                 : ~thread_pool()
     171                 : {
     172              63 :     impl_->join();
     173              63 :     shutdown();
     174              63 :     destroy();
     175              63 :     delete impl_;
     176              63 : }
     177                 : 
     178              63 : thread_pool::
     179              63 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     180              63 :     : impl_(new impl(num_threads, thread_name_prefix))
     181                 : {
     182              63 :     this->set_frame_allocator(std::allocator<void>{});
     183              63 : }
     184                 : 
     185                 : void
     186 MIS           0 : thread_pool::
     187                 : stop() noexcept
     188                 : {
     189               0 :     impl_->stop();
     190               0 : }
     191                 : 
     192                 : //------------------------------------------------------------------------------
     193                 : 
     194                 : void
     195 HIT         128 : thread_pool::executor_type::
     196                 : post(std::coroutine_handle<> h) const
     197                 : {
     198             128 :     pool_->impl_->post(h);
     199             128 : }
     200                 : 
     201                 : } // capy
     202                 : } // boost
        

Generated by: LCOV version 2.3