src/ex/thread_pool.cpp

89.4% Lines (76/85) 88.2% Functions (15/17)
src/ex/thread_pool.cpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <atomic>
15 #include <condition_variable>
16 #include <cstdio>
17 #include <mutex>
18 #include <thread>
19 #include <vector>
20
21 /*
22 Thread pool implementation using a shared work queue.
23
24 Work items are coroutine handles wrapped in intrusive list nodes, stored
25 in a single queue protected by a mutex. Worker threads wait on a
26 condition_variable until work is available or stop is requested.
27
28 Threads are started lazily on first post() via std::call_once to avoid
29 spawning threads for pools that are constructed but never used. Each
30 thread is named with a configurable prefix plus index for debugger
31 visibility.
32
33 Shutdown sequence: stop() sets the stop flag and notifies all threads,
34 then the destructor joins threads and destroys any remaining queued
35 work without executing it.
36 */
37
38 namespace boost {
39 namespace capy {
40
41 //------------------------------------------------------------------------------
42
43 class thread_pool::impl
44 {
45 struct work : detail::intrusive_queue<work>::node
46 {
47 std::coroutine_handle<> h_;
48
49 128 explicit work(std::coroutine_handle<> h) noexcept
50 128 : h_(h)
51 {
52 128 }
53
54 128 void run()
55 {
56 128 auto h = h_;
57 128 delete this;
58 128 h.resume();
59 128 }
60
61 void destroy()
62 {
63 delete this;
64 }
65 };
66
67 std::mutex mutex_;
68 std::condition_variable cv_;
69 detail::intrusive_queue<work> q_;
70 std::vector<std::thread> threads_;
71 std::atomic<bool> stop_{false};
72 std::size_t num_threads_;
73 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
74 std::once_flag start_flag_;
75
76 public:
77 63 ~impl()
78 {
79 63 stop();
80 105 for(auto& t : threads_)
81 42 if(t.joinable())
82 t.join();
83
84 63 while(auto* w = q_.pop())
85 w->destroy();
86 63 }
87
88 63 impl(std::size_t num_threads, std::string_view thread_name_prefix)
89 63 : num_threads_(num_threads)
90 {
91 63 if(num_threads_ == 0)
92 2 num_threads_ = std::thread::hardware_concurrency();
93 63 if(num_threads_ == 0)
94 num_threads_ = 1;
95
96 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
97 63 auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
98 63 thread_name_prefix_[n] = '\0';
99 63 }
100
101 void
102 128 post(std::coroutine_handle<> h)
103 {
104 128 ensure_started();
105 128 auto* w = new work(h);
106 {
107 128 std::lock_guard<std::mutex> lock(mutex_);
108 128 q_.push(w);
109 128 }
110 128 cv_.notify_one();
111 128 }
112
113 void
114 63 join() noexcept
115 {
116 63 stop();
117 105 for(auto& t : threads_)
118 42 if(t.joinable())
119 42 t.join();
120 63 }
121
122 void
123 126 stop() noexcept
124 {
125 126 stop_.store(true, std::memory_order_release);
126 126 cv_.notify_all();
127 126 }
128
129 private:
130 void
131 128 ensure_started()
132 {
133 128 std::call_once(start_flag_, [this]{
134 24 threads_.reserve(num_threads_);
135 66 for(std::size_t i = 0; i < num_threads_; ++i)
136 84 threads_.emplace_back([this, i]{ run(i); });
137 24 });
138 128 }
139
140 void
141 42 run(std::size_t index)
142 {
143 // Build name; set_current_thread_name truncates to platform limits.
144 char name[16];
145 42 std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
146 42 set_current_thread_name(name);
147
148 for(;;)
149 {
150 170 work* w = nullptr;
151 {
152 170 std::unique_lock<std::mutex> lock(mutex_);
153 170 cv_.wait(lock, [this]{
154 346 return !q_.empty() ||
155 346 stop_.load(std::memory_order_acquire);
156 });
157 170 if(stop_.load(std::memory_order_acquire) && q_.empty())
158 84 return;
159 128 w = q_.pop();
160 170 }
161 128 if(w)
162 128 w->run();
163 128 }
164 }
165 };
166
167 //------------------------------------------------------------------------------
168
169 63 thread_pool::
170 ~thread_pool()
171 {
172 63 impl_->join();
173 63 shutdown();
174 63 destroy();
175 63 delete impl_;
176 63 }
177
178 63 thread_pool::
179 63 thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
180 63 : impl_(new impl(num_threads, thread_name_prefix))
181 {
182 63 this->set_frame_allocator(std::allocator<void>{});
183 63 }
184
185 void
186 thread_pool::
187 stop() noexcept
188 {
189 impl_->stop();
190 }
191
192 //------------------------------------------------------------------------------
193
194 void
195 128 thread_pool::executor_type::
196 post(std::coroutine_handle<> h) const
197 {
198 128 pool_->impl_->post(h);
199 128 }
200
201 } // capy
202 } // boost
203