include/boost/capy/ex/thread_pool.hpp

100.0% Lines (7/7) 100.0% List of functions (3/3)
thread_pool.hpp
f(x) Functions (3)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/capy
9 //
10
11 #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12 #define BOOST_CAPY_EX_THREAD_POOL_HPP
13
14 #include <boost/capy/detail/config.hpp>
15 #include <boost/capy/continuation.hpp>
16 #include <coroutine>
17 #include <boost/capy/ex/execution_context.hpp>
18 #include <cstddef>
19 #include <string_view>
20
21 namespace boost {
22 namespace capy {
23
24 /** A pool of threads for executing work concurrently.
25
26 Use this when you need to run coroutines on multiple threads
27 without the overhead of creating and destroying threads for
28 each task. Work items are distributed across the pool using
29 a shared queue.
30
31 @par Thread Safety
32 Distinct objects: Safe.
33 Shared objects: Unsafe.
34
35 @par Example
36 @code
37 thread_pool pool(4); // 4 worker threads
38 auto ex = pool.get_executor();
39 ex.post(some_coroutine);
40 pool.join(); // wait for outstanding work to complete
41 // pool destructor stops the pool, discarding any pending work
42 @endcode
43 */
44 class BOOST_CAPY_DECL
45 thread_pool
46 : public execution_context
47 {
48 class impl;
49 impl* impl_;
50
51 public:
52 class executor_type;
53
54 /** Destroy the thread pool.
55
56 Signals all worker threads to stop, waits for them to
57 finish, and destroys any pending work items.
58 */
59 ~thread_pool();
60
61 /** Construct a thread pool.
62
63 Creates a pool with the specified number of worker threads.
64 If `num_threads` is zero, the number of threads is set to
65 the hardware concurrency, or one if that cannot be determined.
66
67 @param num_threads The number of worker threads, or zero
68 for automatic selection.
69
70 @param thread_name_prefix The prefix for worker thread names.
71 Thread names appear as "{prefix}0", "{prefix}1", etc.
72 The prefix is truncated to 12 characters. Defaults to
73 "capy-pool-".
74 */
75 explicit
76 thread_pool(
77 std::size_t num_threads = 0,
78 std::string_view thread_name_prefix = "capy-pool-");
79
80 thread_pool(thread_pool const&) = delete;
81 thread_pool& operator=(thread_pool const&) = delete;
82
83 /** Wait for all outstanding work to complete.
84
85 Releases the internal work guard, then blocks the calling
86 thread until all outstanding work tracked by
87 @ref executor_type::on_work_started and
88 @ref executor_type::on_work_finished completes. After all
89 work finishes, joins the worker threads.
90
91 If @ref stop is called while `join()` is blocking, the
92 pool stops without waiting for remaining work to
93 complete. Worker threads finish their current item and
94 exit; `join()` still waits for all threads to be joined
95 before returning.
96
97 This function is idempotent. The first call performs the
98 join; subsequent calls return immediately.
99
100 @par Preconditions
101 Must not be called from a thread in this pool (undefined
102 behavior).
103
104 @par Postconditions
105 All worker threads have been joined. The pool cannot be
106 reused.
107
108 @par Thread Safety
109 May be called from any thread not in this pool.
110 */
111 void
112 join() noexcept;
113
114 /** Request all worker threads to stop.
115
116 Signals all threads to exit after finishing their current
117 work item. Queued work that has not started is abandoned.
118 Does not wait for threads to exit.
119
120 If @ref join is blocking on another thread, calling
121 `stop()` causes it to stop waiting for outstanding
122 work. The `join()` call still waits for worker threads
123 to finish their current item and exit before returning.
124 */
125 void
126 stop() noexcept;
127
128 /** Return an executor for this thread pool.
129
130 @return An executor associated with this thread pool.
131 */
132 executor_type
133 get_executor() const noexcept;
134 };
135
136 /** An executor that submits work to a thread_pool.
137
138 Executors are lightweight handles that can be copied and stored.
139 All copies refer to the same underlying thread pool.
140
141 @par Thread Safety
142 Distinct objects: Safe.
143 Shared objects: Safe.
144 */
145 class thread_pool::executor_type
146 {
147 friend class thread_pool;
148
149 thread_pool* pool_ = nullptr;
150
151 explicit
152 11582x executor_type(thread_pool& pool) noexcept
153 11582x : pool_(&pool)
154 {
155 11582x }
156
157 public:
158 /** Construct a default null executor.
159
160 The resulting executor is not associated with any pool.
161 `context()`, `dispatch()`, and `post()` require the
162 executor to be associated with a pool before use.
163 */
164 executor_type() = default;
165
166 /// Return the underlying thread pool.
167 thread_pool&
168 11817x context() const noexcept
169 {
170 11817x return *pool_;
171 }
172
173 /** Notify that work has started.
174
175 Increments the outstanding work count. Must be paired
176 with a subsequent call to @ref on_work_finished.
177
178 @see on_work_finished, work_guard
179 */
180 BOOST_CAPY_DECL
181 void
182 on_work_started() const noexcept;
183
184 /** Notify that work has finished.
185
186 Decrements the outstanding work count. When the count
187 reaches zero after @ref thread_pool::join has been called,
188 the pool's worker threads are signaled to stop.
189
190 @pre A preceding call to @ref on_work_started was made.
191
192 @see on_work_started, work_guard
193 */
194 BOOST_CAPY_DECL
195 void
196 on_work_finished() const noexcept;
197
198 /** Dispatch a continuation for execution.
199
200 If the calling thread is a worker of this pool, returns
201 `c.h` for symmetric transfer so the caller can resume the
202 continuation inline. Otherwise, posts the continuation to
203 the pool for execution on a worker thread and returns
204 `std::noop_coroutine()`.
205
206 @param c The continuation to execute. On the post path,
207 must remain at a stable address until dequeued
208 and resumed.
209
210 @return `c.h` when the calling thread is a pool worker;
211 `std::noop_coroutine()` otherwise.
212 */
213 BOOST_CAPY_DECL
214 std::coroutine_handle<>
215 dispatch(continuation& c) const;
216
217 /** Post a continuation to the thread pool.
218
219 The continuation will be resumed on one of the pool's
220 worker threads. The continuation must remain at a stable
221 address until it is dequeued and resumed.
222
223 @param c The continuation to execute.
224 */
225 BOOST_CAPY_DECL
226 void
227 post(continuation& c) const;
228
229 /// Return true if two executors refer to the same thread pool.
230 bool
231 13x operator==(executor_type const& other) const noexcept
232 {
233 13x return pool_ == other.pool_;
234 }
235 };
236
237 } // capy
238 } // boost
239
240 #endif
241