LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_stream_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 80.1 % 276 221 55
Test Date: 2026-05-20 17:06:58 Functions: 77.1 % 96 74 22

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/tcp_socket.hpp>
      14                 : #include <boost/corosio/shutdown_type.hpp>
      15                 : #include <boost/corosio/wait_type.hpp>
      16                 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
      17                 : #include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
      18                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      19                 : #include <boost/capy/buffers.hpp>
      20                 : 
      21                 : #include <coroutine>
      22                 : 
      23                 : #include <errno.h>
      24                 : #include <sys/socket.h>
      25                 : #include <sys/uio.h>
      26                 : 
      27                 : namespace boost::corosio::detail {
      28                 : 
      29                 : /** CRTP base for reactor-backed stream socket implementations.
      30                 : 
      31                 :     Inherits shared data members and cancel/close/register logic
      32                 :     from reactor_basic_socket. Adds the stream-specific remote
      33                 :     endpoint, shutdown, and I/O dispatch (connect, read, write, wait).
      34                 : 
      35                 :     @tparam Derived   The concrete socket type (CRTP).
      36                 :     @tparam Service   The backend's socket service type.
      37                 :     @tparam ConnOp    The backend's connect op type.
      38                 :     @tparam ReadOp    The backend's read op type.
      39                 :     @tparam WriteOp   The backend's write op type.
      40                 :     @tparam WaitOp    The backend's wait op type.
      41                 :     @tparam DescState The backend's descriptor_state type.
      42                 :     @tparam ImplBase  The public vtable base
      43                 :                       (tcp_socket::implementation or
      44                 :                        local_stream_socket::implementation).
      45                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      46                 : */
      47                 : template<
      48                 :     class Derived,
      49                 :     class Service,
      50                 :     class ConnOp,
      51                 :     class ReadOp,
      52                 :     class WriteOp,
      53                 :     class WaitOp,
      54                 :     class DescState,
      55                 :     class ImplBase = tcp_socket::implementation,
      56                 :     class Endpoint = endpoint>
      57                 : class reactor_stream_socket
      58                 :     : public reactor_basic_socket<
      59                 :           Derived,
      60                 :           ImplBase,
      61                 :           Service,
      62                 :           DescState,
      63                 :           Endpoint>
      64                 : {
      65                 :     using base_type = reactor_basic_socket<
      66                 :         Derived,
      67                 :         ImplBase,
      68                 :         Service,
      69                 :         DescState,
      70                 :         Endpoint>;
      71                 :     using self_type = reactor_stream_socket<
      72                 :         Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp,
      73                 :         DescState, ImplBase, Endpoint>;
      74                 :     friend base_type;
      75                 :     friend Derived;
      76                 : 
      77                 : protected:
      78                 :     // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
      79 HIT       25399 :     explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
      80                 : 
      81                 : protected:
      82                 :     Endpoint remote_endpoint_;
      83                 : 
      84                 : public:
      85                 :     /// Pending connect operation slot.
      86                 :     ConnOp conn_;
      87                 : 
      88                 :     /// Pending read operation slot.
      89                 :     ReadOp rd_;
      90                 : 
      91                 :     /// Pending write operation slot.
      92                 :     WriteOp wr_;
      93                 : 
      94                 :     /// Pending wait-for-read operation slot.
      95                 :     WaitOp wait_rd_;
      96                 : 
      97                 :     /// Pending wait-for-write operation slot.
      98                 :     WaitOp wait_wr_;
      99                 : 
     100                 :     /// Pending wait-for-error operation slot.
     101                 :     WaitOp wait_er_;
     102                 : 
     103           25399 :     ~reactor_stream_socket() override = default;
     104                 : 
     105                 :     /// Return the cached remote endpoint.
     106              46 :     Endpoint remote_endpoint() const noexcept override
     107                 :     {
     108              46 :         return remote_endpoint_;
     109                 :     }
     110                 : 
     111                 :     // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
     112                 : 
     113            8438 :     std::coroutine_handle<> connect(
     114                 :         std::coroutine_handle<> h,
     115                 :         capy::executor_ref ex,
     116                 :         Endpoint ep,
     117                 :         std::stop_token token,
     118                 :         std::error_code* ec) override
     119                 :     {
     120            8438 :         return do_connect(h, ex, ep, token, ec);
     121                 :     }
     122                 : 
     123          233659 :     std::coroutine_handle<> read_some(
     124                 :         std::coroutine_handle<> h,
     125                 :         capy::executor_ref ex,
     126                 :         buffer_param param,
     127                 :         std::stop_token token,
     128                 :         std::error_code* ec,
     129                 :         std::size_t* bytes_out) override
     130                 :     {
     131          233659 :         return do_read_some(h, ex, param, token, ec, bytes_out);
     132                 :     }
     133                 : 
     134          233368 :     std::coroutine_handle<> write_some(
     135                 :         std::coroutine_handle<> h,
     136                 :         capy::executor_ref ex,
     137                 :         buffer_param param,
     138                 :         std::stop_token token,
     139                 :         std::error_code* ec,
     140                 :         std::size_t* bytes_out) override
     141                 :     {
     142          233368 :         return do_write_some(h, ex, param, token, ec, bytes_out);
     143                 :     }
     144                 : 
     145               8 :     std::coroutine_handle<> wait(
     146                 :         std::coroutine_handle<> h,
     147                 :         capy::executor_ref ex,
     148                 :         wait_type w,
     149                 :         std::stop_token token,
     150                 :         std::error_code* ec) override
     151                 :     {
     152               8 :         return do_wait(h, ex, w, token, ec);
     153                 :     }
     154                 : 
     155                 :     std::error_code
     156               6 :     shutdown(corosio::shutdown_type what) noexcept override
     157                 :     {
     158               6 :         return do_shutdown(static_cast<int>(what));
     159                 :     }
     160                 : 
     161             188 :     void cancel() noexcept override
     162                 :     {
     163             188 :         this->do_cancel();
     164             188 :     }
     165                 : 
     166                 :     // --- End virtual overrides ---
     167                 : 
     168                 :     /// Close the socket (non-virtual, called by the service).
     169                 :     void close_socket() noexcept
     170                 :     {
     171                 :         this->do_close_socket();
     172                 :     }
     173                 : 
     174                 :     /** Shut down part or all of the full-duplex connection.
     175                 : 
     176                 :         @param what 0 = receive, 1 = send, 2 = both.
     177                 :     */
     178               6 :     std::error_code do_shutdown(int what) noexcept
     179                 :     {
     180                 :         int how;
     181               6 :         switch (what)
     182                 :         {
     183               2 :         case 0: // shutdown_receive
     184               2 :             how = SHUT_RD;
     185               2 :             break;
     186               2 :         case 1: // shutdown_send
     187               2 :             how = SHUT_WR;
     188               2 :             break;
     189               2 :         case 2: // shutdown_both
     190               2 :             how = SHUT_RDWR;
     191               2 :             break;
     192 MIS           0 :         default:
     193               0 :             return make_err(EINVAL);
     194                 :         }
     195 HIT           6 :         if (::shutdown(this->fd_, how) != 0)
     196 MIS           0 :             return make_err(errno);
     197 HIT           6 :         return {};
     198                 :     }
     199                 : 
     200                 :     /// Cache local and remote endpoints.
     201           16860 :     void set_endpoints(Endpoint local, Endpoint remote) noexcept
     202                 :     {
     203           16860 :         this->local_endpoint_ = std::move(local);
     204           16860 :         remote_endpoint_      = std::move(remote);
     205           16860 :     }
     206                 : 
     207                 :     /** Shared connect dispatch.
     208                 : 
     209                 :         Tries the connect syscall speculatively. On synchronous
     210                 :         completion, returns via inline budget or posts through queue.
     211                 :         On EINPROGRESS, registers with the reactor.
     212                 :     */
     213                 :     std::coroutine_handle<> do_connect(
     214                 :         std::coroutine_handle<>,
     215                 :         capy::executor_ref,
     216                 :         Endpoint const&,
     217                 :         std::stop_token const&,
     218                 :         std::error_code*);
     219                 : 
     220                 :     /** Shared scatter-read dispatch.
     221                 : 
     222                 :         Tries readv() speculatively. On success or hard error,
     223                 :         returns via inline budget or posts through queue.
     224                 :         On EAGAIN, registers with the reactor.
     225                 :     */
     226                 :     std::coroutine_handle<> do_read_some(
     227                 :         std::coroutine_handle<>,
     228                 :         capy::executor_ref,
     229                 :         buffer_param,
     230                 :         std::stop_token const&,
     231                 :         std::error_code*,
     232                 :         std::size_t*);
     233                 : 
     234                 :     /** Shared gather-write dispatch.
     235                 : 
     236                 :         Tries the write via WriteOp::write_policy speculatively.
     237                 :         On success or hard error, returns via inline budget or
     238                 :         posts through queue. On EAGAIN, registers with the reactor.
     239                 :     */
     240                 :     std::coroutine_handle<> do_write_some(
     241                 :         std::coroutine_handle<>,
     242                 :         capy::executor_ref,
     243                 :         buffer_param,
     244                 :         std::stop_token const&,
     245                 :         std::error_code*,
     246                 :         std::size_t*);
     247                 : 
     248                 :     /** Shared readiness-wait dispatch.
     249                 : 
     250                 :         Registers a wait op for the requested direction. Does not
     251                 :         perform any I/O syscall — completion is signalled when the
     252                 :         reactor delivers the matching edge event.
     253                 :     */
     254                 :     std::coroutine_handle<> do_wait(
     255                 :         std::coroutine_handle<>,
     256                 :         capy::executor_ref,
     257                 :         wait_type,
     258                 :         std::stop_token const&,
     259                 :         std::error_code*);
     260                 : 
     261                 :     /** Close the socket and cancel pending operations.
     262                 : 
     263                 :         Extends the base do_close_socket() to also reset
     264                 :         the remote endpoint.
     265                 :     */
     266           76214 :     void do_close_socket() noexcept
     267                 :     {
     268           76214 :         base_type::do_close_socket();
     269           76214 :         remote_endpoint_ = Endpoint{};
     270           76214 :     }
     271                 : 
     272                 : private:
     273                 :     // CRTP callbacks for reactor_basic_socket cancel/close
     274                 : 
     275                 :     template<class Op>
     276             191 :     reactor_op_base** op_to_desc_slot(Op& op) noexcept
     277                 :     {
     278             191 :         if (&op == static_cast<void*>(&conn_))
     279 MIS           0 :             return &this->desc_state_.connect_op;
     280 HIT         191 :         if (&op == static_cast<void*>(&rd_))
     281             191 :             return &this->desc_state_.read_op;
     282 MIS           0 :         if (&op == static_cast<void*>(&wr_))
     283               0 :             return &this->desc_state_.write_op;
     284               0 :         if (&op == static_cast<void*>(&wait_rd_))
     285               0 :             return &this->desc_state_.wait_read_op;
     286               0 :         if (&op == static_cast<void*>(&wait_wr_))
     287               0 :             return &this->desc_state_.wait_write_op;
     288               0 :         if (&op == static_cast<void*>(&wait_er_))
     289               0 :             return &this->desc_state_.wait_error_op;
     290               0 :         return nullptr;
     291                 :     }
     292                 : 
     293                 :     template<class Op>
     294               0 :     bool* op_to_cancel_flag(Op& op) noexcept
     295                 :     {
     296               0 :         if (&op == static_cast<void*>(&conn_))
     297               0 :             return &this->desc_state_.connect_cancel_pending;
     298               0 :         if (&op == static_cast<void*>(&rd_))
     299               0 :             return &this->desc_state_.read_cancel_pending;
     300               0 :         if (&op == static_cast<void*>(&wr_))
     301               0 :             return &this->desc_state_.write_cancel_pending;
     302               0 :         if (&op == static_cast<void*>(&wait_rd_))
     303               0 :             return &this->desc_state_.wait_read_cancel_pending;
     304               0 :         if (&op == static_cast<void*>(&wait_wr_))
     305               0 :             return &this->desc_state_.wait_write_cancel_pending;
     306               0 :         if (&op == static_cast<void*>(&wait_er_))
     307               0 :             return &this->desc_state_.wait_error_cancel_pending;
     308               0 :         return nullptr;
     309                 :     }
     310                 : 
     311                 :     template<class Fn>
     312 HIT       76404 :     void for_each_op(Fn fn) noexcept
     313                 :     {
     314           76404 :         fn(conn_);
     315           76404 :         fn(rd_);
     316           76404 :         fn(wr_);
     317           76404 :         fn(wait_rd_);
     318           76404 :         fn(wait_wr_);
     319           76404 :         fn(wait_er_);
     320           76404 :     }
     321                 : 
     322                 :     template<class Fn>
     323           76404 :     void for_each_desc_entry(Fn fn) noexcept
     324                 :     {
     325           76404 :         fn(conn_, this->desc_state_.connect_op);
     326           76404 :         fn(rd_, this->desc_state_.read_op);
     327           76404 :         fn(wr_, this->desc_state_.write_op);
     328           76404 :         fn(wait_rd_, this->desc_state_.wait_read_op);
     329           76404 :         fn(wait_wr_, this->desc_state_.wait_write_op);
     330           76404 :         fn(wait_er_, this->desc_state_.wait_error_op);
     331           76404 :     }
     332                 : };
     333                 : 
     334                 : template<
     335                 :     class Derived,
     336                 :     class Service,
     337                 :     class ConnOp,
     338                 :     class ReadOp,
     339                 :     class WriteOp,
     340                 :     class WaitOp,
     341                 :     class DescState,
     342                 :     class ImplBase,
     343                 :     class Endpoint>
     344                 : std::coroutine_handle<>
     345            8438 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
     346                 :     do_connect(
     347                 :         std::coroutine_handle<> h,
     348                 :         capy::executor_ref ex,
     349                 :         Endpoint const& ep,
     350                 :         std::stop_token const& token,
     351                 :         std::error_code* ec)
     352                 : {
     353            8438 :     auto& op = conn_;
     354                 : 
     355            8438 :     sockaddr_storage storage{};
     356            8438 :     socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
     357                 :     int result =
     358            8438 :         ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     359                 : 
     360            8438 :     if (result == 0)
     361                 :     {
     362               6 :         sockaddr_storage local_storage{};
     363               6 :         socklen_t local_len = sizeof(local_storage);
     364               6 :         if (::getsockname(
     365                 :                 this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
     366               6 :                 &local_len) == 0)
     367 MIS           0 :             this->local_endpoint_ =
     368 HIT           6 :                 from_sockaddr_as(local_storage, local_len, Endpoint{});
     369               6 :         remote_endpoint_ = ep;
     370                 :     }
     371                 : 
     372            8438 :     if (result == 0 || errno != EINPROGRESS)
     373                 :     {
     374               6 :         int err = (result < 0) ? errno : 0;
     375               6 :         if (this->svc_.scheduler().try_consume_inline_budget())
     376                 :         {
     377 MIS           0 :             *ec = err ? make_err(err) : std::error_code{};
     378               0 :             op.cont_op.cont.h = h;
     379               0 :             return dispatch_coro(ex, op.cont_op.cont);
     380                 :         }
     381 HIT           6 :         op.reset();
     382               6 :         op.h               = h;
     383               6 :         op.ex              = ex;
     384               6 :         op.ec_out          = ec;
     385               6 :         op.fd              = this->fd_;
     386               6 :         op.target_endpoint = ep;
     387               6 :         op.start(token, static_cast<Derived*>(this));
     388               6 :         op.impl_ptr = this->shared_from_this();
     389               6 :         op.complete(err, 0);
     390               6 :         this->svc_.post(&op);
     391               6 :         return std::noop_coroutine();
     392                 :     }
     393                 : 
     394                 :     // EINPROGRESS — register with reactor
     395            8432 :     op.reset();
     396            8432 :     op.h               = h;
     397            8432 :     op.ex              = ex;
     398            8432 :     op.ec_out          = ec;
     399            8432 :     op.fd              = this->fd_;
     400            8432 :     op.target_endpoint = ep;
     401            8432 :     op.start(token, static_cast<Derived*>(this));
     402            8432 :     op.impl_ptr = this->shared_from_this();
     403                 : 
     404            8432 :     this->register_op(
     405            8432 :         op, this->desc_state_.connect_op, this->desc_state_.write_ready,
     406            8432 :         this->desc_state_.connect_cancel_pending, true);
     407            8432 :     return std::noop_coroutine();
     408                 : }
     409                 : 
     410                 : template<
     411                 :     class Derived,
     412                 :     class Service,
     413                 :     class ConnOp,
     414                 :     class ReadOp,
     415                 :     class WriteOp,
     416                 :     class WaitOp,
     417                 :     class DescState,
     418                 :     class ImplBase,
     419                 :     class Endpoint>
     420                 : std::coroutine_handle<>
     421          233659 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
     422                 :     do_read_some(
     423                 :         std::coroutine_handle<> h,
     424                 :         capy::executor_ref ex,
     425                 :         buffer_param param,
     426                 :         std::stop_token const& token,
     427                 :         std::error_code* ec,
     428                 :         std::size_t* bytes_out)
     429                 : {
     430          233659 :     auto& op = rd_;
     431          233659 :     op.reset();
     432                 : 
     433          233659 :     capy::mutable_buffer bufs[ReadOp::max_buffers];
     434          233659 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
     435                 : 
     436          233659 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     437                 :     {
     438               2 :         op.empty_buffer_read = true;
     439               2 :         op.h                 = h;
     440               2 :         op.ex                = ex;
     441               2 :         op.ec_out            = ec;
     442               2 :         op.bytes_out         = bytes_out;
     443               2 :         op.start(token, static_cast<Derived*>(this));
     444               2 :         op.impl_ptr = this->shared_from_this();
     445               2 :         op.complete(0, 0);
     446               2 :         this->svc_.post(&op);
     447               2 :         return std::noop_coroutine();
     448                 :     }
     449                 : 
     450          467314 :     for (int i = 0; i < op.iovec_count; ++i)
     451                 :     {
     452          233657 :         op.iovecs[i].iov_base = bufs[i].data();
     453          233657 :         op.iovecs[i].iov_len  = bufs[i].size();
     454                 :     }
     455                 : 
     456                 :     // Speculative read; for the single-buffer case use recv() so the
     457                 :     // kernel skips the readv iov_iter setup.
     458                 :     ssize_t n;
     459          233657 :     if (op.iovec_count == 1)
     460                 :     {
     461                 :         do
     462                 :         {
     463          233657 :             n = ::recv(this->fd_, bufs[0].data(), bufs[0].size(), 0);
     464                 :         }
     465          233657 :         while (n < 0 && errno == EINTR);
     466                 :     }
     467                 :     else
     468                 :     {
     469                 :         do
     470                 :         {
     471 MIS           0 :             n = ::readv(this->fd_, op.iovecs, op.iovec_count);
     472                 :         }
     473               0 :         while (n < 0 && errno == EINTR);
     474                 :     }
     475                 : 
     476 HIT      233657 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     477                 :     {
     478          233271 :         int err    = (n < 0) ? errno : 0;
     479          233271 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     480                 : 
     481          233271 :         if (this->svc_.scheduler().try_consume_inline_budget())
     482                 :         {
     483          186649 :             if (err)
     484 MIS           0 :                 *ec = make_err(err);
     485 HIT      186649 :             else if (n == 0)
     486              10 :                 *ec = capy::error::eof;
     487                 :             else
     488          186639 :                 *ec = {};
     489          186649 :             *bytes_out = bytes;
     490          186649 :             op.cont_op.cont.h = h;
     491          186649 :             return dispatch_coro(ex, op.cont_op.cont);
     492                 :         }
     493           46622 :         op.h         = h;
     494           46622 :         op.ex        = ex;
     495           46622 :         op.ec_out    = ec;
     496           46622 :         op.bytes_out = bytes_out;
     497           46622 :         op.start(token, static_cast<Derived*>(this));
     498           46622 :         op.impl_ptr = this->shared_from_this();
     499           46622 :         op.complete(err, bytes);
     500           46622 :         this->svc_.post(&op);
     501           46622 :         return std::noop_coroutine();
     502                 :     }
     503                 : 
     504                 :     // EAGAIN — register with reactor
     505             386 :     op.h         = h;
     506             386 :     op.ex        = ex;
     507             386 :     op.ec_out    = ec;
     508             386 :     op.bytes_out = bytes_out;
     509             386 :     op.fd        = this->fd_;
     510             386 :     op.start(token, static_cast<Derived*>(this));
     511             386 :     op.impl_ptr = this->shared_from_this();
     512                 : 
     513             386 :     this->register_op(
     514             386 :         op, this->desc_state_.read_op, this->desc_state_.read_ready,
     515             386 :         this->desc_state_.read_cancel_pending);
     516             386 :     return std::noop_coroutine();
     517                 : }
     518                 : 
     519                 : template<
     520                 :     class Derived,
     521                 :     class Service,
     522                 :     class ConnOp,
     523                 :     class ReadOp,
     524                 :     class WriteOp,
     525                 :     class WaitOp,
     526                 :     class DescState,
     527                 :     class ImplBase,
     528                 :     class Endpoint>
     529                 : std::coroutine_handle<>
     530          233368 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
     531                 :     do_write_some(
     532                 :         std::coroutine_handle<> h,
     533                 :         capy::executor_ref ex,
     534                 :         buffer_param param,
     535                 :         std::stop_token const& token,
     536                 :         std::error_code* ec,
     537                 :         std::size_t* bytes_out)
     538                 : {
     539          233368 :     auto& op = wr_;
     540          233368 :     op.reset();
     541                 : 
     542          233368 :     capy::mutable_buffer bufs[WriteOp::max_buffers];
     543          233368 :     op.iovec_count =
     544          233368 :         static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
     545                 : 
     546          233368 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     547                 :     {
     548               2 :         op.h         = h;
     549               2 :         op.ex        = ex;
     550               2 :         op.ec_out    = ec;
     551               2 :         op.bytes_out = bytes_out;
     552               2 :         op.start(token, static_cast<Derived*>(this));
     553               2 :         op.impl_ptr = this->shared_from_this();
     554               2 :         op.complete(0, 0);
     555               2 :         this->svc_.post(&op);
     556               2 :         return std::noop_coroutine();
     557                 :     }
     558                 : 
     559          466732 :     for (int i = 0; i < op.iovec_count; ++i)
     560                 :     {
     561          233366 :         op.iovecs[i].iov_base = bufs[i].data();
     562          233366 :         op.iovecs[i].iov_len  = bufs[i].size();
     563                 :     }
     564                 : 
     565                 :     // Speculative write; the single-buffer case dispatches to a
     566                 :     // backend-specific fast path so the kernel skips msghdr/iov_iter
     567                 :     // setup (and so each backend can pick the right SIGPIPE strategy).
     568                 :     ssize_t n;
     569          233366 :     if (op.iovec_count == 1)
     570                 :     {
     571          466732 :         n = WriteOp::write_policy::write_one(
     572          233366 :             this->fd_, bufs[0].data(), bufs[0].size());
     573                 :     }
     574                 :     else
     575                 :     {
     576 MIS           0 :         n = WriteOp::write_policy::write(
     577               0 :             this->fd_, op.iovecs, op.iovec_count);
     578                 :     }
     579                 : 
     580 HIT      233366 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     581                 :     {
     582          233366 :         int err    = (n < 0) ? errno : 0;
     583          233366 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     584                 : 
     585          233366 :         if (this->svc_.scheduler().try_consume_inline_budget())
     586                 :         {
     587          186704 :             *ec        = err ? make_err(err) : std::error_code{};
     588          186704 :             *bytes_out = bytes;
     589          186704 :             op.cont_op.cont.h = h;
     590          186704 :             return dispatch_coro(ex, op.cont_op.cont);
     591                 :         }
     592           46662 :         op.h         = h;
     593           46662 :         op.ex        = ex;
     594           46662 :         op.ec_out    = ec;
     595           46662 :         op.bytes_out = bytes_out;
     596           46662 :         op.start(token, static_cast<Derived*>(this));
     597           46662 :         op.impl_ptr = this->shared_from_this();
     598           46662 :         op.complete(err, bytes);
     599           46662 :         this->svc_.post(&op);
     600           46662 :         return std::noop_coroutine();
     601                 :     }
     602                 : 
     603                 :     // EAGAIN — register with reactor
     604 MIS           0 :     op.h         = h;
     605               0 :     op.ex        = ex;
     606               0 :     op.ec_out    = ec;
     607               0 :     op.bytes_out = bytes_out;
     608               0 :     op.fd        = this->fd_;
     609               0 :     op.start(token, static_cast<Derived*>(this));
     610               0 :     op.impl_ptr = this->shared_from_this();
     611                 : 
     612               0 :     this->register_op(
     613               0 :         op, this->desc_state_.write_op, this->desc_state_.write_ready,
     614               0 :         this->desc_state_.write_cancel_pending, true);
     615               0 :     return std::noop_coroutine();
     616                 : }
     617                 : 
     618                 : template<
     619                 :     class Derived,
     620                 :     class Service,
     621                 :     class ConnOp,
     622                 :     class ReadOp,
     623                 :     class WriteOp,
     624                 :     class WaitOp,
     625                 :     class DescState,
     626                 :     class ImplBase,
     627                 :     class Endpoint>
     628                 : std::coroutine_handle<>
     629 HIT           8 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, DescState, ImplBase, Endpoint>::
     630                 :     do_wait(
     631                 :         std::coroutine_handle<> h,
     632                 :         capy::executor_ref ex,
     633                 :         wait_type w,
     634                 :         std::stop_token const& token,
     635                 :         std::error_code* ec)
     636                 : {
     637                 :     // wait_type::write completes immediately on a connected socket,
     638                 :     // matching asio's behavior on IOCP. Corosio's reactor backends use
     639                 :     // edge-triggered EPOLLOUT, which would never fire on an already-
     640                 :     // writable socket; an immediate completion is also a more useful
     641                 :     // contract than parking until a non-writable -> writable transition.
     642               8 :     if (w == wait_type::write)
     643                 :     {
     644               2 :         auto& op = wait_wr_;
     645               2 :         if (this->svc_.scheduler().try_consume_inline_budget())
     646                 :         {
     647 MIS           0 :             *ec               = std::error_code{};
     648               0 :             op.cont_op.cont.h = h;
     649               0 :             return dispatch_coro(ex, op.cont_op.cont);
     650                 :         }
     651 HIT           2 :         op.reset();
     652               2 :         op.wait_event = reactor_event_write;
     653               2 :         op.h          = h;
     654               2 :         op.ex         = ex;
     655               2 :         op.ec_out     = ec;
     656               2 :         op.fd         = this->fd_;
     657               2 :         op.start(token, static_cast<Derived*>(this));
     658               2 :         op.impl_ptr = this->shared_from_this();
     659               2 :         op.complete(0, 0);
     660               2 :         this->svc_.post(&op);
     661               2 :         return std::noop_coroutine();
     662                 :     }
     663                 : 
     664                 :     // Pick refs up-front to avoid duplicating the register_op call.
     665                 :     WaitOp* op_ptr;
     666                 :     reactor_op_base** desc_slot_ptr;
     667                 :     bool* ready_flag_ptr;
     668                 :     bool* cancel_flag_ptr;
     669                 :     std::uint32_t event;
     670                 : 
     671               6 :     bool dummy_ready = false; // placeholder for error waits (no cached edge)
     672                 : 
     673               6 :     if (w == wait_type::read)
     674                 :     {
     675               6 :         op_ptr          = &wait_rd_;
     676               6 :         desc_slot_ptr   = &this->desc_state_.wait_read_op;
     677               6 :         ready_flag_ptr  = &this->desc_state_.read_ready;
     678               6 :         cancel_flag_ptr = &this->desc_state_.wait_read_cancel_pending;
     679               6 :         event           = reactor_event_read;
     680                 :     }
     681                 :     else // wait_type::error
     682                 :     {
     683 MIS           0 :         op_ptr          = &wait_er_;
     684               0 :         desc_slot_ptr   = &this->desc_state_.wait_error_op;
     685               0 :         ready_flag_ptr  = &dummy_ready;
     686               0 :         cancel_flag_ptr = &this->desc_state_.wait_error_cancel_pending;
     687               0 :         event           = reactor_event_error;
     688                 :     }
     689                 : 
     690 HIT           6 :     auto& op = *op_ptr;
     691               6 :     op.reset();
     692               6 :     op.wait_event = event;
     693               6 :     op.h          = h;
     694               6 :     op.ex         = ex;
     695               6 :     op.ec_out     = ec;
     696               6 :     op.fd         = this->fd_;
     697               6 :     op.start(token, static_cast<Derived*>(this));
     698               6 :     op.impl_ptr = this->shared_from_this();
     699                 : 
     700               6 :     this->register_op(op, *desc_slot_ptr, *ready_flag_ptr, *cancel_flag_ptr,
     701                 :                       false);
     702               6 :     return std::noop_coroutine();
     703                 : }
     704                 : 
     705                 : } // namespace boost::corosio::detail
     706                 : 
     707                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
        

Generated by: LCOV version 2.3