TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/detail/intrusive.hpp>
13 : #include <boost/capy/test/thread_name.hpp>
14 : #include <atomic>
15 : #include <condition_variable>
16 : #include <cstdio>
17 : #include <mutex>
18 : #include <thread>
19 : #include <vector>
20 :
21 : /*
22 : Thread pool implementation using a shared work queue.
23 :
24 : Work items are coroutine handles wrapped in intrusive list nodes, stored
25 : in a single queue protected by a mutex. Worker threads wait on a
26 : condition_variable until work is available or stop is requested.
27 :
28 : Threads are started lazily on first post() via std::call_once to avoid
29 : spawning threads for pools that are constructed but never used. Each
30 : thread is named with a configurable prefix plus index for debugger
31 : visibility.
32 :
33 : Shutdown sequence: stop() sets the stop flag and notifies all threads,
34 : then the destructor joins threads and destroys any remaining queued
35 : work without executing it.
36 : */
37 :
38 : namespace boost {
39 : namespace capy {
40 :
41 : //------------------------------------------------------------------------------
42 :
43 : class thread_pool::impl
44 : {
45 : struct work : detail::intrusive_queue<work>::node
46 : {
47 : std::coroutine_handle<> h_;
48 :
49 HIT 128 : explicit work(std::coroutine_handle<> h) noexcept
50 128 : : h_(h)
51 : {
52 128 : }
53 :
54 128 : void run()
55 : {
56 128 : auto h = h_;
57 128 : delete this;
58 128 : h.resume();
59 128 : }
60 :
61 MIS 0 : void destroy()
62 : {
63 0 : delete this;
64 0 : }
65 : };
66 :
67 : std::mutex mutex_;
68 : std::condition_variable cv_;
69 : detail::intrusive_queue<work> q_;
70 : std::vector<std::thread> threads_;
71 : std::atomic<bool> stop_{false};
72 : std::size_t num_threads_;
73 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
74 : std::once_flag start_flag_;
75 :
76 : public:
77 HIT 63 : ~impl()
78 : {
79 63 : stop();
80 105 : for(auto& t : threads_)
81 42 : if(t.joinable())
82 MIS 0 : t.join();
83 :
84 HIT 63 : while(auto* w = q_.pop())
85 MIS 0 : w->destroy();
86 HIT 63 : }
87 :
88 63 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
89 63 : : num_threads_(num_threads)
90 : {
91 63 : if(num_threads_ == 0)
92 2 : num_threads_ = std::thread::hardware_concurrency();
93 63 : if(num_threads_ == 0)
94 MIS 0 : num_threads_ = 1;
95 :
96 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
97 HIT 63 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
98 63 : thread_name_prefix_[n] = '\0';
99 63 : }
100 :
101 : void
102 128 : post(std::coroutine_handle<> h)
103 : {
104 128 : ensure_started();
105 128 : auto* w = new work(h);
106 : {
107 128 : std::lock_guard<std::mutex> lock(mutex_);
108 128 : q_.push(w);
109 128 : }
110 128 : cv_.notify_one();
111 128 : }
112 :
113 : void
114 63 : join() noexcept
115 : {
116 63 : stop();
117 105 : for(auto& t : threads_)
118 42 : if(t.joinable())
119 42 : t.join();
120 63 : }
121 :
122 : void
123 126 : stop() noexcept
124 : {
125 126 : stop_.store(true, std::memory_order_release);
126 126 : cv_.notify_all();
127 126 : }
128 :
129 : private:
130 : void
131 128 : ensure_started()
132 : {
133 128 : std::call_once(start_flag_, [this]{
134 24 : threads_.reserve(num_threads_);
135 66 : for(std::size_t i = 0; i < num_threads_; ++i)
136 84 : threads_.emplace_back([this, i]{ run(i); });
137 24 : });
138 128 : }
139 :
140 : void
141 42 : run(std::size_t index)
142 : {
143 : // Build name; set_current_thread_name truncates to platform limits.
144 : char name[16];
145 42 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
146 42 : set_current_thread_name(name);
147 :
148 : for(;;)
149 : {
150 170 : work* w = nullptr;
151 : {
152 170 : std::unique_lock<std::mutex> lock(mutex_);
153 170 : cv_.wait(lock, [this]{
154 346 : return !q_.empty() ||
155 346 : stop_.load(std::memory_order_acquire);
156 : });
157 170 : if(stop_.load(std::memory_order_acquire) && q_.empty())
158 84 : return;
159 128 : w = q_.pop();
160 170 : }
161 128 : if(w)
162 128 : w->run();
163 128 : }
164 : }
165 : };
166 :
167 : //------------------------------------------------------------------------------
168 :
169 63 : thread_pool::
170 : ~thread_pool()
171 : {
172 63 : impl_->join();
173 63 : shutdown();
174 63 : destroy();
175 63 : delete impl_;
176 63 : }
177 :
178 63 : thread_pool::
179 63 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
180 63 : : impl_(new impl(num_threads, thread_name_prefix))
181 : {
182 63 : this->set_frame_allocator(std::allocator<void>{});
183 63 : }
184 :
185 : void
186 MIS 0 : thread_pool::
187 : stop() noexcept
188 : {
189 0 : impl_->stop();
190 0 : }
191 :
192 : //------------------------------------------------------------------------------
193 :
194 : void
195 HIT 128 : thread_pool::executor_type::
196 : post(std::coroutine_handle<> h) const
197 : {
198 128 : pool_->impl_->post(h);
199 128 : }
200 :
201 : } // capy
202 : } // boost
|