TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 :
19 : #include <atomic>
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <memory>
23 : #include <optional>
24 : #include <stop_token>
25 : #include <system_error>
26 :
27 : #include <errno.h>
28 :
29 : #include <netinet/in.h>
30 : #include <sys/socket.h>
31 : #include <sys/uio.h>
32 :
33 : namespace boost::corosio::detail {
34 :
35 : /** Base operation for reactor-based backends.
36 :
37 : Holds per-operation state that depends on the concrete backend
38 : socket/acceptor types: coroutine handle, executor, output
39 : pointers, file descriptor, stop_callback, and type-specific
40 : impl pointers.
41 :
42 : Fields shared across all backends (errn, bytes_transferred,
43 : cancelled, impl_ptr, perform_io, complete) live in
44 : reactor_op_base so the scheduler and descriptor_state can
45 : access them without template instantiation.
46 :
47 : @tparam Socket The backend socket impl type (forward-declared).
48 : @tparam Acceptor The backend acceptor impl type (forward-declared).
49 : */
50 : template<class Socket, class Acceptor>
51 : struct reactor_op : reactor_op_base
52 : {
53 : /// Stop-token callback that invokes cancel() on the target op.
54 : struct canceller
55 : {
56 : reactor_op* op;
57 HIT 199 : void operator()() const noexcept
58 : {
59 199 : op->cancel();
60 199 : }
61 : };
62 :
63 : /// Caller's coroutine handle to resume on completion.
64 : std::coroutine_handle<> h;
65 :
66 : /// Scheduler-ready continuation for executor dispatch/post (wraps h).
67 : detail::continuation_op cont_op;
68 :
69 : /// Executor for dispatching the completion.
70 : capy::executor_ref ex;
71 :
72 : /// Output pointer for the error code.
73 : std::error_code* ec_out = nullptr;
74 :
75 : /// Output pointer for bytes transferred.
76 : std::size_t* bytes_out = nullptr;
77 :
78 : /// File descriptor this operation targets.
79 : int fd = -1;
80 :
81 : /// Stop-token callback registration.
82 : std::optional<std::stop_callback<canceller>> stop_cb;
83 :
84 : /// Owning socket impl (for stop_token cancellation).
85 : Socket* socket_impl_ = nullptr;
86 :
87 : /// Owning acceptor impl (for stop_token cancellation).
88 : Acceptor* acceptor_impl_ = nullptr;
89 :
90 154182 : reactor_op() = default;
91 :
92 : /// Reset operation state for reuse.
93 484021 : void reset() noexcept
94 : {
95 484021 : fd = -1;
96 484021 : errn = 0;
97 484021 : bytes_transferred = 0;
98 484021 : cancelled.store(false, std::memory_order_relaxed);
99 484021 : impl_ptr.reset();
100 484021 : socket_impl_ = nullptr;
101 484021 : acceptor_impl_ = nullptr;
102 484021 : }
103 :
104 : /// Return true if this is a read-direction operation.
105 46686 : virtual bool is_read_operation() const noexcept
106 : {
107 46686 : return false;
108 : }
109 :
110 : /// Cancel this operation via the owning impl.
111 : virtual void cancel() noexcept = 0;
112 :
113 : /// Destroy without invoking.
114 MIS 0 : void destroy() override
115 : {
116 0 : stop_cb.reset();
117 0 : reactor_op_base::destroy();
118 0 : }
119 :
120 : /// Arm the stop-token callback for a socket operation.
121 HIT 102188 : void start(std::stop_token const& token, Socket* impl)
122 : {
123 102188 : cancelled.store(false, std::memory_order_release);
124 102188 : stop_cb.reset();
125 102188 : socket_impl_ = impl;
126 102188 : acceptor_impl_ = nullptr;
127 :
128 102188 : if (token.stop_possible())
129 197 : stop_cb.emplace(token, canceller{this});
130 102188 : }
131 :
132 : /// Arm the stop-token callback for an acceptor operation.
133 8436 : void start(std::stop_token const& token, Acceptor* impl)
134 : {
135 8436 : cancelled.store(false, std::memory_order_release);
136 8436 : stop_cb.reset();
137 8436 : socket_impl_ = nullptr;
138 8436 : acceptor_impl_ = impl;
139 :
140 8436 : if (token.stop_possible())
141 9 : stop_cb.emplace(token, canceller{this});
142 8436 : }
143 : };
144 :
145 : /** Shared connect operation.
146 :
147 : Checks SO_ERROR for connect completion status. The operator()()
148 : and cancel() are provided by the concrete backend type.
149 :
150 : @tparam Base The backend's base op type.
151 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
152 : */
153 : template<class Base, class Endpoint = endpoint>
154 : struct reactor_connect_op : Base
155 : {
156 : /// Endpoint to connect to.
157 : Endpoint target_endpoint;
158 :
159 : /// Reset operation state for reuse.
160 8448 : void reset() noexcept
161 : {
162 8448 : Base::reset();
163 8448 : target_endpoint = Endpoint{};
164 8448 : }
165 :
166 8424 : void perform_io() noexcept override
167 : {
168 8424 : int err = 0;
169 8424 : socklen_t len = sizeof(err);
170 8424 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
171 MIS 0 : err = errno;
172 HIT 8424 : this->complete(err, 0);
173 8424 : }
174 : };
175 :
176 : /** Readiness-only wait operation.
177 :
178 : Does not perform any I/O syscall. Completion is signalled by
179 : the reactor delivering the requested edge event; reactor_descriptor_state
180 : calls complete() directly and never invokes perform_io().
181 :
182 : @tparam Base The backend's base op type.
183 : */
184 : template<class Base>
185 : struct reactor_wait_op : Base
186 : {
187 : /* Mirror of reactor_event_read from reactor_descriptor_state.hpp.
188 : Including that header from here would create an include cycle
189 : (descriptor_state -> reactor_op_base; reactor_op -> reactor_op_base),
190 : so we carry the value locally. Both must stay in sync. */
191 : static constexpr std::uint32_t read_event = 0x001;
192 :
193 : /// Which event bit this wait targets (reactor_event_read/write/error).
194 : std::uint32_t wait_event = 0;
195 :
196 14 : void reset() noexcept
197 : {
198 14 : Base::reset();
199 14 : wait_event = 0;
200 14 : }
201 :
202 MIS 0 : bool is_read_operation() const noexcept override
203 : {
204 0 : return wait_event == read_event;
205 : }
206 :
207 : /* perform_io() should never be called for a wait op — readiness
208 : IS the completion. Overridden here to satisfy the virtual and
209 : produce a safe result if called defensively. */
210 0 : void perform_io() noexcept override
211 : {
212 0 : this->complete(0, 0);
213 0 : }
214 : };
215 :
216 : /** Shared scatter-read operation.
217 :
218 : Uses readv() with an EINTR retry loop.
219 :
220 : @tparam Base The backend's base op type.
221 : */
222 : template<class Base>
223 : struct reactor_read_op : Base
224 : {
225 : /// Maximum scatter-gather buffer count.
226 : static constexpr std::size_t max_buffers = 16;
227 :
228 : /// Scatter-gather I/O vectors.
229 : iovec iovecs[max_buffers];
230 :
231 : /// Number of active I/O vectors.
232 : int iovec_count = 0;
233 :
234 : /// True for zero-length reads (completed immediately).
235 : bool empty_buffer_read = false;
236 :
237 : /// Return true (this is a read-direction operation).
238 HIT 46712 : bool is_read_operation() const noexcept override
239 : {
240 46712 : return !empty_buffer_read;
241 : }
242 :
243 233659 : void reset() noexcept
244 : {
245 233659 : Base::reset();
246 233659 : iovec_count = 0;
247 233659 : empty_buffer_read = false;
248 233659 : }
249 :
250 323 : void perform_io() noexcept override
251 : {
252 : ssize_t n;
253 : do
254 : {
255 323 : n = ::readv(this->fd, iovecs, iovec_count);
256 : }
257 323 : while (n < 0 && errno == EINTR);
258 :
259 323 : if (n >= 0)
260 96 : this->complete(0, static_cast<std::size_t>(n));
261 : else
262 227 : this->complete(errno, 0);
263 323 : }
264 : };
265 :
266 : /** Shared gather-write operation.
267 :
268 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
269 : which returns ssize_t (bytes written or -1 with errno set).
270 :
271 : @tparam Base The backend's base op type.
272 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
273 : */
274 : template<class Base, class WritePolicy>
275 : struct reactor_write_op : Base
276 : {
277 : /// The write syscall policy type.
278 : using write_policy = WritePolicy;
279 :
280 : /// Maximum scatter-gather buffer count.
281 : static constexpr std::size_t max_buffers = 16;
282 :
283 : /// Scatter-gather I/O vectors.
284 : iovec iovecs[max_buffers];
285 :
286 : /// Number of active I/O vectors.
287 : int iovec_count = 0;
288 :
289 233368 : void reset() noexcept
290 : {
291 233368 : Base::reset();
292 233368 : iovec_count = 0;
293 233368 : }
294 :
295 MIS 0 : void perform_io() noexcept override
296 : {
297 0 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
298 0 : if (n >= 0)
299 0 : this->complete(0, static_cast<std::size_t>(n));
300 : else
301 0 : this->complete(errno, 0);
302 0 : }
303 : };
304 :
305 : /** Shared accept operation.
306 :
307 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
308 : which returns the accepted fd or -1 with errno set.
309 :
310 : @tparam Base The backend's base op type.
311 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
312 : */
313 : template<class Base, class AcceptPolicy>
314 : struct reactor_accept_op : Base
315 : {
316 : /// File descriptor of the accepted connection.
317 : int accepted_fd = -1;
318 :
319 : /// Pointer to the peer socket implementation.
320 : io_object::implementation* peer_impl = nullptr;
321 :
322 : /// Output pointer for the accepted implementation.
323 : io_object::implementation** impl_out = nullptr;
324 :
325 : /// Peer address storage filled by accept.
326 : sockaddr_storage peer_storage{};
327 :
328 : /// Peer address length returned by accept.
329 : socklen_t peer_addrlen = 0;
330 :
331 HIT 8434 : void reset() noexcept
332 : {
333 8434 : Base::reset();
334 8434 : accepted_fd = -1;
335 8434 : peer_impl = nullptr;
336 8434 : impl_out = nullptr;
337 8434 : peer_storage = {};
338 8434 : peer_addrlen = 0;
339 8434 : }
340 :
341 8414 : void perform_io() noexcept override
342 : {
343 8414 : int new_fd = AcceptPolicy::do_accept(
344 8414 : this->fd, peer_storage, peer_addrlen);
345 8414 : if (new_fd >= 0)
346 : {
347 8414 : accepted_fd = new_fd;
348 8414 : this->complete(0, 0);
349 : }
350 : else
351 : {
352 MIS 0 : this->complete(errno, 0);
353 : }
354 HIT 8414 : }
355 : };
356 :
357 : /** Shared connected send operation for datagram sockets.
358 :
359 : Uses sendmsg() with msg_name=nullptr (connected mode).
360 :
361 : @tparam Base The backend's base op type.
362 : */
363 : template<class Base>
364 : struct reactor_send_op : Base
365 : {
366 : /// Maximum scatter-gather buffer count.
367 : static constexpr std::size_t max_buffers = 16;
368 :
369 : /// Scatter-gather I/O vectors.
370 : iovec iovecs[max_buffers];
371 :
372 : /// Number of active I/O vectors.
373 : int iovec_count = 0;
374 :
375 : /// User-supplied message flags.
376 : int msg_flags = 0;
377 :
378 14 : void reset() noexcept
379 : {
380 14 : Base::reset();
381 14 : iovec_count = 0;
382 14 : msg_flags = 0;
383 14 : }
384 :
385 MIS 0 : void perform_io() noexcept override
386 : {
387 0 : msghdr msg{};
388 0 : msg.msg_iov = iovecs;
389 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
390 :
391 : #ifdef MSG_NOSIGNAL
392 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
393 : #else
394 : int send_flags = msg_flags;
395 : #endif
396 :
397 : ssize_t n;
398 : do
399 : {
400 0 : n = ::sendmsg(this->fd, &msg, send_flags);
401 : }
402 0 : while (n < 0 && errno == EINTR);
403 :
404 0 : if (n >= 0)
405 0 : this->complete(0, static_cast<std::size_t>(n));
406 : else
407 0 : this->complete(errno, 0);
408 0 : }
409 : };
410 :
411 : /** Shared connected recv operation for datagram sockets.
412 :
413 : Uses recvmsg() with msg_name=nullptr (connected mode).
414 : Unlike reactor_read_op, does not map n==0 to EOF
415 : (zero-length datagrams are valid).
416 :
417 : @tparam Base The backend's base op type.
418 : */
419 : template<class Base>
420 : struct reactor_recv_op : Base
421 : {
422 : /// Maximum scatter-gather buffer count.
423 : static constexpr std::size_t max_buffers = 16;
424 :
425 : /// Scatter-gather I/O vectors.
426 : iovec iovecs[max_buffers];
427 :
428 : /// Number of active I/O vectors.
429 : int iovec_count = 0;
430 :
431 : /// User-supplied message flags.
432 : int msg_flags = 0;
433 :
434 : /// Return true (this is a read-direction operation).
435 0 : bool is_read_operation() const noexcept override
436 : {
437 0 : return true;
438 : }
439 :
440 HIT 14 : void reset() noexcept
441 : {
442 14 : Base::reset();
443 14 : iovec_count = 0;
444 14 : msg_flags = 0;
445 14 : }
446 :
447 MIS 0 : void perform_io() noexcept override
448 : {
449 0 : msghdr msg{};
450 0 : msg.msg_iov = iovecs;
451 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
452 :
453 : ssize_t n;
454 : do
455 : {
456 0 : n = ::recvmsg(this->fd, &msg, msg_flags);
457 : }
458 0 : while (n < 0 && errno == EINTR);
459 :
460 0 : if (n >= 0)
461 0 : this->complete(0, static_cast<std::size_t>(n));
462 : else
463 0 : this->complete(errno, 0);
464 0 : }
465 : };
466 :
467 : /** Shared send_to operation for datagram sockets.
468 :
469 : Uses sendmsg() with the destination endpoint in msg_name.
470 :
471 : @tparam Base The backend's base op type.
472 : */
473 : template<class Base>
474 : struct reactor_send_to_op : Base
475 : {
476 : /// Maximum scatter-gather buffer count.
477 : static constexpr std::size_t max_buffers = 16;
478 :
479 : /// Scatter-gather I/O vectors.
480 : iovec iovecs[max_buffers];
481 :
482 : /// Number of active I/O vectors.
483 : int iovec_count = 0;
484 :
485 : /// Destination address storage.
486 : sockaddr_storage dest_storage{};
487 :
488 : /// Destination address length.
489 : socklen_t dest_len = 0;
490 :
491 : /// User-supplied message flags.
492 : int msg_flags = 0;
493 :
494 HIT 30 : void reset() noexcept
495 : {
496 30 : Base::reset();
497 30 : iovec_count = 0;
498 30 : dest_storage = {};
499 30 : dest_len = 0;
500 30 : msg_flags = 0;
501 30 : }
502 :
503 MIS 0 : void perform_io() noexcept override
504 : {
505 0 : msghdr msg{};
506 0 : msg.msg_name = &dest_storage;
507 0 : msg.msg_namelen = dest_len;
508 0 : msg.msg_iov = iovecs;
509 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
510 :
511 : #ifdef MSG_NOSIGNAL
512 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
513 : #else
514 : int send_flags = msg_flags;
515 : #endif
516 :
517 : ssize_t n;
518 : do
519 : {
520 0 : n = ::sendmsg(this->fd, &msg, send_flags);
521 : }
522 0 : while (n < 0 && errno == EINTR);
523 :
524 0 : if (n >= 0)
525 0 : this->complete(0, static_cast<std::size_t>(n));
526 : else
527 0 : this->complete(errno, 0);
528 0 : }
529 : };
530 :
531 : /** Shared recv_from operation for datagram sockets.
532 :
533 : Uses recvmsg() with msg_name to capture the source endpoint.
534 :
535 : @tparam Base The backend's base op type.
536 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
537 : */
538 : template<class Base, class Endpoint = endpoint>
539 : struct reactor_recv_from_op : Base
540 : {
541 : /// Maximum scatter-gather buffer count.
542 : static constexpr std::size_t max_buffers = 16;
543 :
544 : /// Scatter-gather I/O vectors.
545 : iovec iovecs[max_buffers];
546 :
547 : /// Number of active I/O vectors.
548 : int iovec_count = 0;
549 :
550 : /// Source address storage filled by recvmsg.
551 : sockaddr_storage source_storage{};
552 :
553 : /// Actual source address length returned by recvmsg.
554 : socklen_t source_addrlen = 0;
555 :
556 : /// Output pointer for the source endpoint (set by do_recv_from).
557 : Endpoint* source_out = nullptr;
558 :
559 : /// User-supplied message flags.
560 : int msg_flags = 0;
561 :
562 : /// Return true (this is a read-direction operation).
563 0 : bool is_read_operation() const noexcept override
564 : {
565 0 : return true;
566 : }
567 :
568 HIT 40 : void reset() noexcept
569 : {
570 40 : Base::reset();
571 40 : iovec_count = 0;
572 40 : source_storage = {};
573 40 : source_addrlen = 0;
574 40 : source_out = nullptr;
575 40 : msg_flags = 0;
576 40 : }
577 :
578 2 : void perform_io() noexcept override
579 : {
580 2 : msghdr msg{};
581 2 : msg.msg_name = &source_storage;
582 2 : msg.msg_namelen = sizeof(source_storage);
583 2 : msg.msg_iov = iovecs;
584 2 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
585 :
586 : ssize_t n;
587 : do
588 : {
589 2 : n = ::recvmsg(this->fd, &msg, msg_flags);
590 : }
591 2 : while (n < 0 && errno == EINTR);
592 :
593 2 : if (n >= 0)
594 : {
595 2 : source_addrlen = msg.msg_namelen;
596 2 : this->complete(0, static_cast<std::size_t>(n));
597 : }
598 : else
599 MIS 0 : this->complete(errno, 0);
600 HIT 2 : }
601 : };
602 :
603 : } // namespace boost::corosio::detail
604 :
605 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|