TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/intrusive.hpp>
14 : #include <boost/corosio/detail/native_handle.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
17 : #include <boost/corosio/native/detail/make_err.hpp>
18 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
19 :
20 : #include <memory>
21 : #include <mutex>
22 : #include <utility>
23 :
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 : #include <sys/socket.h>
27 : #include <unistd.h>
28 :
29 : namespace boost::corosio::detail {
30 :
31 : /** CRTP base for reactor-backed socket implementations.
32 :
33 : Extracts the shared data members, virtual overrides, and
34 : cancel/close/register logic that is identical across TCP
35 : (reactor_stream_socket) and UDP (reactor_datagram_socket).
36 :
37 : Derived classes provide CRTP callbacks that enumerate their
38 : specific op slots so cancel/close can iterate them generically.
39 :
40 : @tparam Derived The concrete socket type (CRTP).
41 : @tparam ImplBase The public vtable base (tcp_socket::implementation
42 : or udp_socket::implementation).
43 : @tparam Service The backend's service type.
44 : @tparam DescState The backend's descriptor_state type.
45 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
46 : */
47 : template<
48 : class Derived,
49 : class ImplBase,
50 : class Service,
51 : class DescState,
52 : class Endpoint = endpoint>
53 : class reactor_basic_socket
54 : : public ImplBase
55 : , public std::enable_shared_from_this<Derived>
56 : , public intrusive_list<Derived>::node
57 : {
58 : friend Derived;
59 :
60 : template<class, class, class, class, class, class, class, class, class>
61 : friend class reactor_stream_socket;
62 :
63 : template<class, class, class, class, class, class, class, class, class, class, class>
64 : friend class reactor_datagram_socket;
65 :
66 HIT 25525 : explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
67 :
68 : protected:
69 : Service& svc_;
70 : int fd_ = -1;
71 : Endpoint local_endpoint_;
72 :
73 : public:
74 : /// Per-descriptor state for persistent reactor registration.
75 : DescState desc_state_;
76 :
77 25525 : ~reactor_basic_socket() override = default;
78 :
79 : /// Return the underlying file descriptor.
80 77266 : native_handle_type native_handle() const noexcept override
81 : {
82 77266 : return fd_;
83 : }
84 :
85 : /// Return the cached local endpoint.
86 82 : Endpoint local_endpoint() const noexcept override
87 : {
88 82 : return local_endpoint_;
89 : }
90 :
91 : /// Return true if the socket has an open file descriptor.
92 : bool is_open() const noexcept
93 : {
94 : return fd_ >= 0;
95 : }
96 :
97 : /// Set a socket option.
98 20 : std::error_code set_option(
99 : int level,
100 : int optname,
101 : void const* data,
102 : std::size_t size) noexcept override
103 : {
104 20 : if (::setsockopt(
105 20 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
106 MIS 0 : return make_err(errno);
107 HIT 20 : return {};
108 : }
109 :
110 : /// Get a socket option.
111 : std::error_code
112 78 : get_option(int level, int optname, void* data, std::size_t* size)
113 : const noexcept override
114 : {
115 78 : socklen_t len = static_cast<socklen_t>(*size);
116 78 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
117 MIS 0 : return make_err(errno);
118 HIT 78 : *size = static_cast<std::size_t>(len);
119 78 : return {};
120 : }
121 :
122 : /// Assign the file descriptor.
123 8422 : void set_socket(int fd) noexcept
124 : {
125 8422 : fd_ = fd;
126 8422 : }
127 :
128 : /// Cache the local endpoint.
129 : void set_local_endpoint(Endpoint ep) noexcept
130 : {
131 : local_endpoint_ = ep;
132 : }
133 :
134 : /** Bind the socket to a local endpoint.
135 :
136 : Calls ::bind() and caches the resulting local endpoint
137 : via getsockname().
138 :
139 : @param ep The endpoint to bind to.
140 : @return Error code on failure, empty on success.
141 : */
142 80 : std::error_code do_bind(Endpoint const& ep) noexcept
143 : {
144 80 : sockaddr_storage storage{};
145 80 : socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
146 80 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
147 10 : return make_err(errno);
148 :
149 70 : sockaddr_storage local_storage{};
150 70 : socklen_t local_len = sizeof(local_storage);
151 70 : if (::getsockname(
152 70 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
153 : 0)
154 56 : local_endpoint_ =
155 70 : from_sockaddr_as(local_storage, local_len, Endpoint{});
156 :
157 70 : return {};
158 : }
159 :
160 : /// Assign the fd, initialize descriptor state, and register with the reactor.
161 8616 : void init_and_register(int fd) noexcept
162 : {
163 8616 : fd_ = fd;
164 8616 : desc_state_.fd = fd;
165 : {
166 8616 : std::lock_guard lock(desc_state_.mutex);
167 8616 : desc_state_.read_op = nullptr;
168 8616 : desc_state_.write_op = nullptr;
169 8616 : desc_state_.connect_op = nullptr;
170 8616 : }
171 8616 : svc_.scheduler().register_descriptor(fd, &desc_state_);
172 8616 : }
173 :
174 : /** Register an op with the reactor.
175 :
176 : Handles cached edge events and deferred cancellation.
177 : Called on the EAGAIN/EINPROGRESS path when speculative
178 : I/O failed.
179 : */
180 : template<class Op>
181 : void register_op(
182 : Op& op,
183 : reactor_op_base*& desc_slot,
184 : bool& ready_flag,
185 : bool& cancel_flag,
186 : bool is_write_direction = false) noexcept;
187 :
188 : /** Cancel a single pending operation.
189 :
190 : Claims the operation from its descriptor_state slot under
191 : the mutex and posts it to the scheduler as cancelled.
192 : Derived must implement:
193 : op_to_desc_slot(Op&) -> reactor_op_base**
194 : op_to_cancel_flag(Op&) -> bool*
195 : */
196 : template<class Op>
197 : void cancel_single_op(Op& op) noexcept;
198 :
199 : /** Cancel all pending operations.
200 :
201 : Invoked by the derived class's cancel() override.
202 : Derived must implement:
203 : for_each_op(auto fn)
204 : for_each_desc_entry(auto fn)
205 : */
206 : void do_cancel() noexcept;
207 :
208 : /** Close the socket and cancel pending operations.
209 :
210 : Invoked by the derived class's close_socket(). The
211 : derived class may add backend-specific cleanup after
212 : calling this method.
213 : Derived must implement:
214 : for_each_op(auto fn)
215 : for_each_desc_entry(auto fn)
216 : */
217 : void do_close_socket() noexcept;
218 :
219 : /** Release the socket without closing the fd.
220 :
221 : Like do_close_socket() but does not call ::close().
222 : Returns the fd so the caller can take ownership.
223 : */
224 : native_handle_type do_release_socket() noexcept;
225 : };
226 :
227 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
228 : template<class Op>
229 : void
230 8838 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
231 : Op& op,
232 : reactor_op_base*& desc_slot,
233 : bool& ready_flag,
234 : bool& cancel_flag,
235 : bool is_write_direction) noexcept
236 : {
237 8838 : svc_.work_started();
238 :
239 8838 : std::lock_guard lock(desc_state_.mutex);
240 8838 : bool io_done = false;
241 8838 : if (ready_flag)
242 : {
243 184 : ready_flag = false;
244 184 : op.perform_io();
245 184 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
246 184 : if (!io_done)
247 184 : op.errn = 0;
248 : }
249 :
250 8838 : if (cancel_flag)
251 : {
252 MIS 0 : cancel_flag = false;
253 0 : op.cancelled.store(true, std::memory_order_relaxed);
254 : }
255 :
256 HIT 8838 : if (io_done || op.cancelled.load(std::memory_order_acquire))
257 : {
258 MIS 0 : svc_.post(&op);
259 0 : svc_.work_finished();
260 : }
261 : else
262 : {
263 HIT 8838 : desc_slot = &op;
264 :
265 : // Select must rebuild its fd_sets when a write-direction op
266 : // is parked, so select() watches for writability. Compiled
267 : // away to nothing for epoll and kqueue.
268 : if constexpr (requires { Service::needs_write_notification; })
269 : {
270 : if constexpr (Service::needs_write_notification)
271 : {
272 3776 : if (is_write_direction)
273 3580 : svc_.scheduler().notify_reactor();
274 : }
275 : }
276 : }
277 8838 : }
278 :
279 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
280 : template<class Op>
281 : void
282 193 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
283 : Op& op) noexcept
284 : {
285 193 : auto self = this->weak_from_this().lock();
286 193 : if (!self)
287 MIS 0 : return;
288 :
289 HIT 193 : op.request_cancel();
290 :
291 193 : auto* d = static_cast<Derived*>(this);
292 193 : reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
293 :
294 193 : if (desc_op_ptr)
295 : {
296 193 : reactor_op_base* claimed = nullptr;
297 : {
298 193 : std::lock_guard lock(desc_state_.mutex);
299 193 : if (*desc_op_ptr == &op)
300 193 : claimed = std::exchange(*desc_op_ptr, nullptr);
301 : else
302 : {
303 MIS 0 : bool* cflag = d->op_to_cancel_flag(op);
304 0 : if (cflag)
305 0 : *cflag = true;
306 : }
307 HIT 193 : }
308 193 : if (claimed)
309 : {
310 193 : op.impl_ptr = self;
311 193 : svc_.post(&op);
312 193 : svc_.work_finished();
313 : }
314 : }
315 193 : }
316 :
317 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
318 : void
319 194 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
320 : do_cancel() noexcept
321 : {
322 194 : auto self = this->weak_from_this().lock();
323 194 : if (!self)
324 MIS 0 : return;
325 :
326 HIT 194 : auto* d = static_cast<Derived*>(this);
327 :
328 1370 : d->for_each_op([](auto& op) { op.request_cancel(); });
329 :
330 : // Claim ops under a single lock acquisition
331 : struct claimed_entry
332 : {
333 : reactor_op_base* op = nullptr;
334 : reactor_op_base* base = nullptr;
335 : };
336 : // Max 8 ops: conn, rd, wr, wait_rd, wait_wr, wait_er, recv_rd, send_wr
337 194 : claimed_entry claimed[8];
338 194 : int count = 0;
339 :
340 : {
341 194 : std::lock_guard lock(desc_state_.mutex);
342 2546 : d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
343 1176 : if (desc_slot == &op)
344 : {
345 107 : claimed[count].op = std::exchange(desc_slot, nullptr);
346 107 : claimed[count].base = &op;
347 107 : ++count;
348 : }
349 : });
350 194 : }
351 :
352 301 : for (int i = 0; i < count; ++i)
353 : {
354 107 : claimed[i].base->impl_ptr = self;
355 107 : svc_.post(claimed[i].base);
356 107 : svc_.work_finished();
357 : }
358 194 : }
359 :
360 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
361 : void
362 76702 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
363 : do_close_socket() noexcept
364 : {
365 76702 : auto self = this->weak_from_this().lock();
366 76702 : if (self)
367 : {
368 76702 : auto* d = static_cast<Derived*>(this);
369 :
370 537890 : d->for_each_op([](auto& op) { op.request_cancel(); });
371 :
372 : struct claimed_entry
373 : {
374 : reactor_op_base* base = nullptr;
375 : };
376 76702 : claimed_entry claimed[8];
377 76702 : int count = 0;
378 :
379 : {
380 76702 : std::lock_guard lock(desc_state_.mutex);
381 76702 : d->for_each_desc_entry(
382 922376 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
383 461188 : auto* c = std::exchange(desc_slot, nullptr);
384 461188 : if (c)
385 : {
386 4 : claimed[count].base = c;
387 4 : ++count;
388 : }
389 : });
390 76702 : desc_state_.read_ready = false;
391 76702 : desc_state_.write_ready = false;
392 76702 : desc_state_.read_cancel_pending = false;
393 76702 : desc_state_.write_cancel_pending = false;
394 76702 : desc_state_.connect_cancel_pending = false;
395 76702 : desc_state_.wait_read_cancel_pending = false;
396 76702 : desc_state_.wait_write_cancel_pending = false;
397 76702 : desc_state_.wait_error_cancel_pending = false;
398 :
399 76702 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
400 271 : desc_state_.impl_ref_ = self;
401 76702 : }
402 :
403 76706 : for (int i = 0; i < count; ++i)
404 : {
405 4 : claimed[i].base->impl_ptr = self;
406 4 : svc_.post(claimed[i].base);
407 4 : svc_.work_finished();
408 : }
409 : }
410 :
411 76702 : if (fd_ >= 0)
412 : {
413 17036 : if (desc_state_.registered_events != 0)
414 17036 : svc_.scheduler().deregister_descriptor(fd_);
415 17036 : ::close(fd_);
416 17036 : fd_ = -1;
417 : }
418 :
419 76702 : desc_state_.fd = -1;
420 76702 : desc_state_.registered_events = 0;
421 :
422 76702 : local_endpoint_ = Endpoint{};
423 76702 : }
424 :
425 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
426 : native_handle_type
427 2 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
428 : do_release_socket() noexcept
429 : {
430 : // Cancel pending ops (same as do_close_socket)
431 2 : auto self = this->weak_from_this().lock();
432 2 : if (self)
433 : {
434 2 : auto* d = static_cast<Derived*>(this);
435 :
436 14 : d->for_each_op([](auto& op) { op.request_cancel(); });
437 :
438 : struct claimed_entry
439 : {
440 : reactor_op_base* base = nullptr;
441 : };
442 2 : claimed_entry claimed[8];
443 2 : int count = 0;
444 :
445 : {
446 2 : std::lock_guard lock(desc_state_.mutex);
447 2 : d->for_each_desc_entry(
448 24 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
449 12 : auto* c = std::exchange(desc_slot, nullptr);
450 12 : if (c)
451 : {
452 MIS 0 : claimed[count].base = c;
453 0 : ++count;
454 : }
455 : });
456 HIT 2 : desc_state_.read_ready = false;
457 2 : desc_state_.write_ready = false;
458 2 : desc_state_.read_cancel_pending = false;
459 2 : desc_state_.write_cancel_pending = false;
460 2 : desc_state_.connect_cancel_pending = false;
461 2 : desc_state_.wait_read_cancel_pending = false;
462 2 : desc_state_.wait_write_cancel_pending = false;
463 2 : desc_state_.wait_error_cancel_pending = false;
464 :
465 2 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
466 MIS 0 : desc_state_.impl_ref_ = self;
467 HIT 2 : }
468 :
469 2 : for (int i = 0; i < count; ++i)
470 : {
471 MIS 0 : claimed[i].base->impl_ptr = self;
472 0 : svc_.post(claimed[i].base);
473 0 : svc_.work_finished();
474 : }
475 : }
476 :
477 HIT 2 : native_handle_type released = fd_;
478 :
479 2 : if (fd_ >= 0)
480 : {
481 2 : if (desc_state_.registered_events != 0)
482 2 : svc_.scheduler().deregister_descriptor(fd_);
483 : // Do NOT close -- caller takes ownership
484 2 : fd_ = -1;
485 : }
486 :
487 2 : desc_state_.fd = -1;
488 2 : desc_state_.registered_events = 0;
489 :
490 2 : local_endpoint_ = Endpoint{};
491 :
492 4 : return released;
493 2 : }
494 :
495 : } // namespace boost::corosio::detail
496 :
497 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
|