LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_descriptor_state.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 70.0 % 90 63 27
Test Date: 2026-05-20 17:06:58 Functions: 100.0 % 4 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
      15                 : 
      16                 : #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
      17                 : 
      18                 : #include <atomic>
      19                 : #include <cstdint>
      20                 : #include <memory>
      21                 : 
      22                 : #include <errno.h>
      23                 : #include <sys/socket.h>
      24                 : 
      25                 : namespace boost::corosio::detail {
      26                 : 
      27                 : /// Shared reactor event constants.
      28                 : /// These match epoll numeric values; kqueue maps its events to the same.
      29                 : static constexpr std::uint32_t reactor_event_read  = 0x001;
      30                 : static constexpr std::uint32_t reactor_event_write = 0x004;
      31                 : static constexpr std::uint32_t reactor_event_error = 0x008;
      32                 : 
      33                 : /** Per-descriptor state shared across reactor backends.
      34                 : 
      35                 :     Tracks pending operations for a file descriptor. The fd is registered
      36                 :     once with the reactor and stays registered until closed. Uses deferred
      37                 :     I/O: the reactor sets ready_events atomically, then enqueues this state.
      38                 :     When popped by the scheduler, invoke_deferred_io() performs I/O under
      39                 :     the mutex and queues completed ops.
      40                 : 
      41                 :     Non-template: uses reactor_op_base pointers so the scheduler and
      42                 :     descriptor_state code exist as a single copy in the binary regardless
      43                 :     of how many backends are compiled in.
      44                 : 
      45                 :     @par Thread Safety
      46                 :     The mutex protects operation pointers and ready flags. ready_events_
      47                 :     and is_enqueued_ are atomic for lock-free reactor access.
      48                 : */
      49                 : struct reactor_descriptor_state : scheduler_op
      50                 : {
      51                 :     /// Protects operation pointers and ready/cancel flags.
      52                 :     /// Becomes a no-op in single-threaded mode.
      53                 :     conditionally_enabled_mutex mutex{true};
      54                 : 
      55                 :     /// Pending read operation (guarded by `mutex`).
      56                 :     reactor_op_base* read_op = nullptr;
      57                 : 
      58                 :     /// Pending write operation (guarded by `mutex`).
      59                 :     reactor_op_base* write_op = nullptr;
      60                 : 
      61                 :     /// Pending connect operation (guarded by `mutex`).
      62                 :     reactor_op_base* connect_op = nullptr;
      63                 : 
      64                 :     /// Pending wait-for-read operation (guarded by `mutex`).
      65                 :     reactor_op_base* wait_read_op = nullptr;
      66                 : 
      67                 :     /// Pending wait-for-write operation (guarded by `mutex`).
      68                 :     reactor_op_base* wait_write_op = nullptr;
      69                 : 
      70                 :     /// Pending wait-for-error operation (guarded by `mutex`).
      71                 :     reactor_op_base* wait_error_op = nullptr;
      72                 : 
      73                 :     /// True if a read edge event arrived before an op was registered.
      74                 :     bool read_ready = false;
      75                 : 
      76                 :     /// True if a write edge event arrived before an op was registered.
      77                 :     bool write_ready = false;
      78                 : 
      79                 :     /// Deferred read cancellation (IOCP-style cancel semantics).
      80                 :     bool read_cancel_pending = false;
      81                 : 
      82                 :     /// Deferred write cancellation (IOCP-style cancel semantics).
      83                 :     bool write_cancel_pending = false;
      84                 : 
      85                 :     /// Deferred connect cancellation (IOCP-style cancel semantics).
      86                 :     bool connect_cancel_pending = false;
      87                 : 
      88                 :     /// Deferred wait-read cancellation (IOCP-style cancel semantics).
      89                 :     bool wait_read_cancel_pending = false;
      90                 : 
      91                 :     /// Deferred wait-write cancellation (IOCP-style cancel semantics).
      92                 :     bool wait_write_cancel_pending = false;
      93                 : 
      94                 :     /// Deferred wait-error cancellation (IOCP-style cancel semantics).
      95                 :     bool wait_error_cancel_pending = false;
      96                 : 
      97                 :     /// Event mask set during registration (no mutex needed).
      98                 :     std::uint32_t registered_events = 0;
      99                 : 
     100                 :     /// File descriptor this state tracks.
     101                 :     int fd = -1;
     102                 : 
     103                 :     /// Accumulated ready events (set by reactor, read by scheduler).
     104                 :     std::atomic<std::uint32_t> ready_events_{0};
     105                 : 
     106                 :     /// True while this state is queued in the scheduler's completed_ops.
     107                 :     std::atomic<bool> is_enqueued_{false};
     108                 : 
     109                 :     /// Owning scheduler for posting completions.
     110                 :     reactor_scheduler const* scheduler_ = nullptr;
     111                 : 
     112                 :     /// Prevents impl destruction while queued in the scheduler.
     113                 :     std::shared_ptr<void> impl_ref_;
     114                 : 
     115                 :     /// Add ready events atomically.
     116                 :     /// Release pairs with the consumer's acquire exchange on
     117                 :     /// ready_events_ so the consumer sees all flags. On x86 (TSO)
     118                 :     /// this compiles to the same LOCK OR as relaxed.
     119 HIT      175142 :     void add_ready_events(std::uint32_t ev) noexcept
     120                 :     {
     121          175142 :         ready_events_.fetch_or(ev, std::memory_order_release);
     122          175142 :     }
     123                 : 
     124                 :     /// Invoke deferred I/O and dispatch completions.
     125          175054 :     void operator()() override
     126                 :     {
     127          175054 :         invoke_deferred_io();
     128          175054 :     }
     129                 : 
     130                 :     /// Destroy without invoking.
     131                 :     /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
     132                 :     /// the self-referential cycle set by close_socket().
     133              88 :     void destroy() override
     134                 :     {
     135              88 :         impl_ref_.reset();
     136              88 :     }
     137                 : 
     138                 :     /** Perform deferred I/O and queue completions.
     139                 : 
     140                 :         Performs I/O under the mutex and queues completed ops. EAGAIN
     141                 :         ops stay parked in their slot for re-delivery on the next
     142                 :         edge event.
     143                 :     */
     144                 :     void invoke_deferred_io();
     145                 : };
     146                 : 
     147                 : inline void
     148          175054 : reactor_descriptor_state::invoke_deferred_io()
     149                 : {
     150          175054 :     std::shared_ptr<void> prevent_impl_destruction;
     151          175054 :     op_queue local_ops;
     152                 : 
     153                 :     {
     154          175054 :         conditionally_enabled_mutex::scoped_lock lock(mutex);
     155                 : 
     156                 :         // Must clear is_enqueued_ and move impl_ref_ under the same
     157                 :         // lock that processes I/O. close_socket() checks is_enqueued_
     158                 :         // under this mutex — without atomicity between the flag store
     159                 :         // and the ref move, close_socket() could see is_enqueued_==false,
     160                 :         // skip setting impl_ref_, and destroy the impl under us.
     161          175054 :         prevent_impl_destruction = std::move(impl_ref_);
     162          175054 :         is_enqueued_.store(false, std::memory_order_release);
     163                 : 
     164          175054 :         std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
     165          175054 :         if (ev == 0)
     166                 :         {
     167                 :             // Mutex unlocks here; compensate for work_cleanup's decrement
     168 MIS           0 :             scheduler_->compensating_work_started();
     169               0 :             return;
     170                 :         }
     171                 : 
     172 HIT      175054 :         int err = 0;
     173          175054 :         if (ev & reactor_event_error)
     174                 :         {
     175               6 :             socklen_t len = sizeof(err);
     176               6 :             if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     177 MIS           0 :                 err = errno;
     178 HIT           6 :             if (err == 0)
     179 MIS           0 :                 err = EIO;
     180                 :         }
     181                 : 
     182 HIT      175054 :         if (ev & reactor_event_read)
     183                 :         {
     184          144311 :             if (read_op)
     185                 :             {
     186            8555 :                 auto* rd = read_op;
     187            8555 :                 if (err)
     188 MIS           0 :                     rd->complete(err, 0);
     189                 :                 else
     190 HIT        8555 :                     rd->perform_io();
     191                 : 
     192            8555 :                 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
     193                 :                 {
     194              43 :                     rd->errn = 0;
     195                 :                 }
     196                 :                 else
     197                 :                 {
     198            8512 :                     read_op = nullptr;
     199            8512 :                     local_ops.push(rd);
     200                 :                 }
     201                 :             }
     202                 :             else
     203                 :             {
     204          135756 :                 read_ready = true;
     205                 :             }
     206                 : 
     207                 :             // Complete any parked wait-for-read regardless of read_op presence.
     208          144311 :             if (wait_read_op)
     209                 :             {
     210               8 :                 wait_read_op->complete(err, 0);
     211               8 :                 local_ops.push(std::exchange(wait_read_op, nullptr));
     212                 :             }
     213                 :         }
     214          175054 :         if (ev & reactor_event_write)
     215                 :         {
     216           40019 :             bool had_write_op = (connect_op || write_op);
     217           40019 :             if (connect_op)
     218                 :             {
     219            8430 :                 auto* cn = connect_op;
     220            8430 :                 if (err)
     221               6 :                     cn->complete(err, 0);
     222                 :                 else
     223            8424 :                     cn->perform_io();
     224            8430 :                 connect_op = nullptr;
     225            8430 :                 local_ops.push(cn);
     226                 :             }
     227           40019 :             if (write_op)
     228                 :             {
     229 MIS           0 :                 auto* wr = write_op;
     230               0 :                 if (err)
     231               0 :                     wr->complete(err, 0);
     232                 :                 else
     233               0 :                     wr->perform_io();
     234                 : 
     235               0 :                 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
     236                 :                 {
     237               0 :                     wr->errn = 0;
     238                 :                 }
     239                 :                 else
     240                 :                 {
     241               0 :                     write_op = nullptr;
     242               0 :                     local_ops.push(wr);
     243                 :                 }
     244                 :             }
     245 HIT       40019 :             if (!had_write_op)
     246           31589 :                 write_ready = true;
     247                 : 
     248                 :             // Complete any parked wait-for-write regardless of write_op presence.
     249           40019 :             if (wait_write_op)
     250                 :             {
     251 MIS           0 :                 wait_write_op->complete(err, 0);
     252               0 :                 local_ops.push(std::exchange(wait_write_op, nullptr));
     253                 :             }
     254                 :         }
     255                 :         // Complete a parked wait-for-error on any error condition.
     256 HIT      175054 :         if ((ev & reactor_event_error) || err)
     257                 :         {
     258               6 :             if (wait_error_op)
     259                 :             {
     260 MIS           0 :                 wait_error_op->complete(err, 0);
     261               0 :                 local_ops.push(std::exchange(wait_error_op, nullptr));
     262                 :             }
     263                 :         }
     264 HIT      175054 :         if (err)
     265                 :         {
     266               6 :             if (read_op)
     267                 :             {
     268 MIS           0 :                 read_op->complete(err, 0);
     269               0 :                 local_ops.push(std::exchange(read_op, nullptr));
     270                 :             }
     271 HIT           6 :             if (write_op)
     272                 :             {
     273 MIS           0 :                 write_op->complete(err, 0);
     274               0 :                 local_ops.push(std::exchange(write_op, nullptr));
     275                 :             }
     276 HIT           6 :             if (connect_op)
     277                 :             {
     278 MIS           0 :                 connect_op->complete(err, 0);
     279               0 :                 local_ops.push(std::exchange(connect_op, nullptr));
     280                 :             }
     281 HIT           6 :             if (wait_read_op)
     282                 :             {
     283 MIS           0 :                 wait_read_op->complete(err, 0);
     284               0 :                 local_ops.push(std::exchange(wait_read_op, nullptr));
     285                 :             }
     286 HIT           6 :             if (wait_write_op)
     287                 :             {
     288 MIS           0 :                 wait_write_op->complete(err, 0);
     289               0 :                 local_ops.push(std::exchange(wait_write_op, nullptr));
     290                 :             }
     291                 :         }
     292 HIT      175054 :     }
     293                 : 
     294                 :     // Execute first handler inline — the scheduler's work_cleanup
     295                 :     // accounts for this as the "consumed" work item
     296          175054 :     scheduler_op* first = local_ops.pop();
     297          175054 :     if (first)
     298                 :     {
     299           16950 :         scheduler_->post_deferred_completions(local_ops);
     300           16950 :         (*first)();
     301                 :     }
     302                 :     else
     303                 :     {
     304          158104 :         scheduler_->compensating_work_started();
     305                 :     }
     306          175054 : }
     307                 : 
     308                 : } // namespace boost::corosio::detail
     309                 : 
     310                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
        

Generated by: LCOV version 2.3