TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
15 :
16 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
17 :
18 : #include <atomic>
19 : #include <cstdint>
20 : #include <memory>
21 :
22 : #include <errno.h>
23 : #include <sys/socket.h>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /// Shared reactor event constants.
28 : /// These match epoll numeric values; kqueue maps its events to the same.
29 : static constexpr std::uint32_t reactor_event_read = 0x001;
30 : static constexpr std::uint32_t reactor_event_write = 0x004;
31 : static constexpr std::uint32_t reactor_event_error = 0x008;
32 :
33 : /** Per-descriptor state shared across reactor backends.
34 :
35 : Tracks pending operations for a file descriptor. The fd is registered
36 : once with the reactor and stays registered until closed. Uses deferred
37 : I/O: the reactor sets ready_events atomically, then enqueues this state.
38 : When popped by the scheduler, invoke_deferred_io() performs I/O under
39 : the mutex and queues completed ops.
40 :
41 : Non-template: uses reactor_op_base pointers so the scheduler and
42 : descriptor_state code exist as a single copy in the binary regardless
43 : of how many backends are compiled in.
44 :
45 : @par Thread Safety
46 : The mutex protects operation pointers and ready flags. ready_events_
47 : and is_enqueued_ are atomic for lock-free reactor access.
48 : */
49 : struct reactor_descriptor_state : scheduler_op
50 : {
51 : /// Protects operation pointers and ready/cancel flags.
52 : /// Becomes a no-op in single-threaded mode.
53 : conditionally_enabled_mutex mutex{true};
54 :
55 : /// Pending read operation (guarded by `mutex`).
56 : reactor_op_base* read_op = nullptr;
57 :
58 : /// Pending write operation (guarded by `mutex`).
59 : reactor_op_base* write_op = nullptr;
60 :
61 : /// Pending connect operation (guarded by `mutex`).
62 : reactor_op_base* connect_op = nullptr;
63 :
64 : /// Pending wait-for-read operation (guarded by `mutex`).
65 : reactor_op_base* wait_read_op = nullptr;
66 :
67 : /// Pending wait-for-write operation (guarded by `mutex`).
68 : reactor_op_base* wait_write_op = nullptr;
69 :
70 : /// Pending wait-for-error operation (guarded by `mutex`).
71 : reactor_op_base* wait_error_op = nullptr;
72 :
73 : /// True if a read edge event arrived before an op was registered.
74 : bool read_ready = false;
75 :
76 : /// True if a write edge event arrived before an op was registered.
77 : bool write_ready = false;
78 :
79 : /// Deferred read cancellation (IOCP-style cancel semantics).
80 : bool read_cancel_pending = false;
81 :
82 : /// Deferred write cancellation (IOCP-style cancel semantics).
83 : bool write_cancel_pending = false;
84 :
85 : /// Deferred connect cancellation (IOCP-style cancel semantics).
86 : bool connect_cancel_pending = false;
87 :
88 : /// Deferred wait-read cancellation (IOCP-style cancel semantics).
89 : bool wait_read_cancel_pending = false;
90 :
91 : /// Deferred wait-write cancellation (IOCP-style cancel semantics).
92 : bool wait_write_cancel_pending = false;
93 :
94 : /// Deferred wait-error cancellation (IOCP-style cancel semantics).
95 : bool wait_error_cancel_pending = false;
96 :
97 : /// Event mask set during registration (no mutex needed).
98 : std::uint32_t registered_events = 0;
99 :
100 : /// File descriptor this state tracks.
101 : int fd = -1;
102 :
103 : /// Accumulated ready events (set by reactor, read by scheduler).
104 : std::atomic<std::uint32_t> ready_events_{0};
105 :
106 : /// True while this state is queued in the scheduler's completed_ops.
107 : std::atomic<bool> is_enqueued_{false};
108 :
109 : /// Owning scheduler for posting completions.
110 : reactor_scheduler const* scheduler_ = nullptr;
111 :
112 : /// Prevents impl destruction while queued in the scheduler.
113 : std::shared_ptr<void> impl_ref_;
114 :
115 : /// Add ready events atomically.
116 : /// Release pairs with the consumer's acquire exchange on
117 : /// ready_events_ so the consumer sees all flags. On x86 (TSO)
118 : /// this compiles to the same LOCK OR as relaxed.
119 HIT 175142 : void add_ready_events(std::uint32_t ev) noexcept
120 : {
121 175142 : ready_events_.fetch_or(ev, std::memory_order_release);
122 175142 : }
123 :
124 : /// Invoke deferred I/O and dispatch completions.
125 175054 : void operator()() override
126 : {
127 175054 : invoke_deferred_io();
128 175054 : }
129 :
130 : /// Destroy without invoking.
131 : /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
132 : /// the self-referential cycle set by close_socket().
133 88 : void destroy() override
134 : {
135 88 : impl_ref_.reset();
136 88 : }
137 :
138 : /** Perform deferred I/O and queue completions.
139 :
140 : Performs I/O under the mutex and queues completed ops. EAGAIN
141 : ops stay parked in their slot for re-delivery on the next
142 : edge event.
143 : */
144 : void invoke_deferred_io();
145 : };
146 :
147 : inline void
148 175054 : reactor_descriptor_state::invoke_deferred_io()
149 : {
150 175054 : std::shared_ptr<void> prevent_impl_destruction;
151 175054 : op_queue local_ops;
152 :
153 : {
154 175054 : conditionally_enabled_mutex::scoped_lock lock(mutex);
155 :
156 : // Must clear is_enqueued_ and move impl_ref_ under the same
157 : // lock that processes I/O. close_socket() checks is_enqueued_
158 : // under this mutex — without atomicity between the flag store
159 : // and the ref move, close_socket() could see is_enqueued_==false,
160 : // skip setting impl_ref_, and destroy the impl under us.
161 175054 : prevent_impl_destruction = std::move(impl_ref_);
162 175054 : is_enqueued_.store(false, std::memory_order_release);
163 :
164 175054 : std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
165 175054 : if (ev == 0)
166 : {
167 : // Mutex unlocks here; compensate for work_cleanup's decrement
168 MIS 0 : scheduler_->compensating_work_started();
169 0 : return;
170 : }
171 :
172 HIT 175054 : int err = 0;
173 175054 : if (ev & reactor_event_error)
174 : {
175 6 : socklen_t len = sizeof(err);
176 6 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
177 MIS 0 : err = errno;
178 HIT 6 : if (err == 0)
179 MIS 0 : err = EIO;
180 : }
181 :
182 HIT 175054 : if (ev & reactor_event_read)
183 : {
184 144311 : if (read_op)
185 : {
186 8555 : auto* rd = read_op;
187 8555 : if (err)
188 MIS 0 : rd->complete(err, 0);
189 : else
190 HIT 8555 : rd->perform_io();
191 :
192 8555 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
193 : {
194 43 : rd->errn = 0;
195 : }
196 : else
197 : {
198 8512 : read_op = nullptr;
199 8512 : local_ops.push(rd);
200 : }
201 : }
202 : else
203 : {
204 135756 : read_ready = true;
205 : }
206 :
207 : // Complete any parked wait-for-read regardless of read_op presence.
208 144311 : if (wait_read_op)
209 : {
210 8 : wait_read_op->complete(err, 0);
211 8 : local_ops.push(std::exchange(wait_read_op, nullptr));
212 : }
213 : }
214 175054 : if (ev & reactor_event_write)
215 : {
216 40019 : bool had_write_op = (connect_op || write_op);
217 40019 : if (connect_op)
218 : {
219 8430 : auto* cn = connect_op;
220 8430 : if (err)
221 6 : cn->complete(err, 0);
222 : else
223 8424 : cn->perform_io();
224 8430 : connect_op = nullptr;
225 8430 : local_ops.push(cn);
226 : }
227 40019 : if (write_op)
228 : {
229 MIS 0 : auto* wr = write_op;
230 0 : if (err)
231 0 : wr->complete(err, 0);
232 : else
233 0 : wr->perform_io();
234 :
235 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
236 : {
237 0 : wr->errn = 0;
238 : }
239 : else
240 : {
241 0 : write_op = nullptr;
242 0 : local_ops.push(wr);
243 : }
244 : }
245 HIT 40019 : if (!had_write_op)
246 31589 : write_ready = true;
247 :
248 : // Complete any parked wait-for-write regardless of write_op presence.
249 40019 : if (wait_write_op)
250 : {
251 MIS 0 : wait_write_op->complete(err, 0);
252 0 : local_ops.push(std::exchange(wait_write_op, nullptr));
253 : }
254 : }
255 : // Complete a parked wait-for-error on any error condition.
256 HIT 175054 : if ((ev & reactor_event_error) || err)
257 : {
258 6 : if (wait_error_op)
259 : {
260 MIS 0 : wait_error_op->complete(err, 0);
261 0 : local_ops.push(std::exchange(wait_error_op, nullptr));
262 : }
263 : }
264 HIT 175054 : if (err)
265 : {
266 6 : if (read_op)
267 : {
268 MIS 0 : read_op->complete(err, 0);
269 0 : local_ops.push(std::exchange(read_op, nullptr));
270 : }
271 HIT 6 : if (write_op)
272 : {
273 MIS 0 : write_op->complete(err, 0);
274 0 : local_ops.push(std::exchange(write_op, nullptr));
275 : }
276 HIT 6 : if (connect_op)
277 : {
278 MIS 0 : connect_op->complete(err, 0);
279 0 : local_ops.push(std::exchange(connect_op, nullptr));
280 : }
281 HIT 6 : if (wait_read_op)
282 : {
283 MIS 0 : wait_read_op->complete(err, 0);
284 0 : local_ops.push(std::exchange(wait_read_op, nullptr));
285 : }
286 HIT 6 : if (wait_write_op)
287 : {
288 MIS 0 : wait_write_op->complete(err, 0);
289 0 : local_ops.push(std::exchange(wait_write_op, nullptr));
290 : }
291 : }
292 HIT 175054 : }
293 :
294 : // Execute first handler inline — the scheduler's work_cleanup
295 : // accounts for this as the "consumed" work item
296 175054 : scheduler_op* first = local_ops.pop();
297 175054 : if (first)
298 : {
299 16950 : scheduler_->post_deferred_completions(local_ops);
300 16950 : (*first)();
301 : }
302 : else
303 : {
304 158104 : scheduler_->compensating_work_started();
305 : }
306 175054 : }
307 :
308 : } // namespace boost::corosio::detail
309 :
310 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
|