LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op_complete.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 91.7 % 132 121 11
Test Date: 2026-05-20 17:06:58 Functions: 90.0 % 40 36 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      14                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      15                 : #include <boost/corosio/native/detail/make_err.hpp>
      16                 : #include <boost/corosio/io/io_object.hpp>
      17                 : 
      18                 : #include <coroutine>
      19                 : #include <mutex>
      20                 : #include <utility>
      21                 : 
      22                 : #include <netinet/in.h>
      23                 : #include <sys/socket.h>
      24                 : #include <unistd.h>
      25                 : 
      26                 : namespace boost::corosio::detail {
      27                 : 
      28                 : /** Complete a base read/write operation.
      29                 : 
      30                 :     Translates the recorded errno and cancellation state into
      31                 :     an error_code, stores the byte count, then resumes the
      32                 :     caller via symmetric transfer.
      33                 : 
      34                 :     @tparam Op The concrete operation type.
      35                 :     @param op The operation to complete.
      36                 : */
      37                 : template<typename Op>
      38                 : void
      39 HIT       93704 : complete_io_op(Op& op)
      40                 : {
      41           93704 :     op.stop_cb.reset();
      42           93704 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      43                 : 
      44           93704 :     if (op.cancelled.load(std::memory_order_acquire))
      45             306 :         *op.ec_out = capy::error::canceled;
      46           93398 :     else if (op.errn != 0)
      47 MIS           0 :         *op.ec_out = make_err(op.errn);
      48 HIT       93398 :     else if (op.is_read_operation() && op.bytes_transferred == 0)
      49 MIS           0 :         *op.ec_out = capy::error::eof;
      50                 :     else
      51 HIT       93398 :         *op.ec_out = {};
      52                 : 
      53           93704 :     *op.bytes_out = op.bytes_transferred;
      54                 : 
      55           93704 :     op.cont_op.cont.h = op.h;
      56           93704 :     capy::executor_ref saved_ex(op.ex);
      57           93704 :     auto prevent = std::move(op.impl_ptr);
      58           93704 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
      59           93704 : }
      60                 : 
      61                 : /** Complete a datagram recv operation (connected mode).
      62                 : 
      63                 :     Like complete_io_op but does not translate zero bytes into
      64                 :     EOF. Zero-length datagrams are valid and should be reported
      65                 :     as success with 0 bytes transferred.
      66                 : 
      67                 :     @param op The operation to complete.
      68                 : */
      69                 : template<typename Op>
      70                 : void
      71                 : complete_dgram_recv_op(Op& op)
      72                 : {
      73                 :     op.stop_cb.reset();
      74                 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      75                 : 
      76                 :     if (op.cancelled.load(std::memory_order_acquire))
      77                 :         *op.ec_out = capy::error::canceled;
      78                 :     else if (op.errn != 0)
      79                 :         *op.ec_out = make_err(op.errn);
      80                 :     else
      81                 :         *op.ec_out = {};
      82                 : 
      83                 :     *op.bytes_out = op.bytes_transferred;
      84                 : 
      85                 :     op.cont_op.cont.h = op.h;
      86                 :     capy::executor_ref saved_ex(op.ex);
      87                 :     auto prevent = std::move(op.impl_ptr);
      88                 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
      89                 : }
      90                 : 
      91                 : /** Complete a wait operation.
      92                 : 
      93                 :     Wait operations report only an error_code — no bytes_transferred,
      94                 :     no EOF translation. Used for socket and acceptor wait() awaitables;
      95                 :     picks the impl pointer set by start() to reach the scheduler.
      96                 : 
      97                 :     @tparam Op The concrete wait operation type.
      98                 :     @param op The operation to complete.
      99                 : */
     100                 : template<typename Op>
     101                 : void
     102              14 : complete_wait_op(Op& op)
     103                 : {
     104              14 :     op.stop_cb.reset();
     105              14 :     if (op.socket_impl_)
     106              12 :         op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     107                 :     else
     108               2 :         op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
     109                 : 
     110              14 :     if (op.cancelled.load(std::memory_order_acquire))
     111               4 :         *op.ec_out = capy::error::canceled;
     112              10 :     else if (op.errn != 0)
     113 MIS           0 :         *op.ec_out = make_err(op.errn);
     114                 :     else
     115 HIT          10 :         *op.ec_out = {};
     116                 : 
     117              14 :     op.cont_op.cont.h = op.h;
     118              14 :     capy::executor_ref saved_ex(op.ex);
     119              14 :     auto prevent = std::move(op.impl_ptr);
     120              14 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     121              14 : }
     122                 : 
     123                 : /** Complete a connect operation with endpoint caching.
     124                 : 
     125                 :     On success, queries the local endpoint via getsockname and
     126                 :     caches both endpoints in the socket impl. Then resumes the
     127                 :     caller via symmetric transfer.
     128                 : 
     129                 :     @tparam Op The concrete connect operation type.
     130                 :     @param op The operation to complete.
     131                 : */
     132                 : template<typename Op>
     133                 : void
     134            8448 : complete_connect_op(Op& op)
     135                 : {
     136            8448 :     op.stop_cb.reset();
     137            8448 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     138                 : 
     139            8448 :     bool success =
     140            8448 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
     141                 : 
     142            8448 :     if (success && op.socket_impl_)
     143                 :     {
     144                 :         using ep_type = decltype(op.target_endpoint);
     145            8432 :         ep_type local_ep;
     146            8432 :         sockaddr_storage local_storage{};
     147            8432 :         socklen_t local_len = sizeof(local_storage);
     148            8432 :         if (::getsockname(
     149                 :                 op.fd, reinterpret_cast<sockaddr*>(&local_storage),
     150            8432 :                 &local_len) == 0)
     151            8426 :             local_ep =
     152            8432 :                 from_sockaddr_as(local_storage, local_len, ep_type{});
     153            8432 :         op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
     154                 :     }
     155                 : 
     156            8448 :     if (op.cancelled.load(std::memory_order_acquire))
     157               2 :         *op.ec_out = capy::error::canceled;
     158            8446 :     else if (op.errn != 0)
     159              14 :         *op.ec_out = make_err(op.errn);
     160                 :     else
     161            8432 :         *op.ec_out = {};
     162                 : 
     163            8448 :     op.cont_op.cont.h = op.h;
     164            8448 :     capy::executor_ref saved_ex(op.ex);
     165            8448 :     auto prevent = std::move(op.impl_ptr);
     166            8448 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     167            8448 : }
     168                 : 
     169                 : /** Construct and register a peer socket from an accepted fd.
     170                 : 
     171                 :     Creates a new socket impl via the acceptor's associated
     172                 :     socket service, registers it with the scheduler, and caches
     173                 :     the local and remote endpoints.
     174                 : 
     175                 :     @tparam SocketImpl The concrete socket implementation type.
     176                 :     @tparam AcceptorImpl The concrete acceptor implementation type.
     177                 :     @param acceptor_impl The acceptor that accepted the connection.
     178                 :     @param accepted_fd The accepted file descriptor (set to -1 on success).
     179                 :     @param peer_storage The peer address from accept().
     180                 :     @param impl_out Output pointer for the new socket impl.
     181                 :     @param ec_out Output pointer for any error.
     182                 :     @return True on success, false on failure.
     183                 : */
     184                 : template<typename SocketImpl, typename AcceptorImpl>
     185                 : bool
     186            8420 : setup_accepted_socket(
     187                 :     AcceptorImpl* acceptor_impl,
     188                 :     int& accepted_fd,
     189                 :     sockaddr_storage const& peer_storage,
     190                 :     socklen_t peer_addrlen,
     191                 :     io_object::implementation** impl_out,
     192                 :     std::error_code* ec_out)
     193                 : {
     194            8420 :     auto* socket_svc = acceptor_impl->service().stream_service();
     195            8420 :     if (!socket_svc)
     196                 :     {
     197 MIS           0 :         *ec_out = make_err(ENOENT);
     198               0 :         return false;
     199                 :     }
     200                 : 
     201 HIT        8420 :     auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
     202            8420 :     impl.set_socket(accepted_fd);
     203                 : 
     204            8420 :     impl.desc_state_.fd = accepted_fd;
     205                 :     {
     206            8420 :         std::lock_guard lock(impl.desc_state_.mutex);
     207            8420 :         impl.desc_state_.read_op    = nullptr;
     208            8420 :         impl.desc_state_.write_op   = nullptr;
     209            8420 :         impl.desc_state_.connect_op = nullptr;
     210            8420 :     }
     211            8420 :     socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
     212                 : 
     213                 :     using ep_type = decltype(acceptor_impl->local_endpoint());
     214            8420 :     impl.set_endpoints(
     215                 :         acceptor_impl->local_endpoint(),
     216            8420 :         from_sockaddr_as(
     217                 :             peer_storage,
     218                 :             peer_addrlen,
     219                 :             ep_type{}));
     220                 : 
     221            8420 :     if (impl_out)
     222            8420 :         *impl_out = &impl;
     223            8420 :     accepted_fd = -1;
     224            8420 :     return true;
     225                 : }
     226                 : 
     227                 : /** Complete an accept operation.
     228                 : 
     229                 :     Sets up the peer socket on success, or closes the accepted
     230                 :     fd on failure. Then resumes the caller via symmetric transfer.
     231                 : 
     232                 :     @tparam SocketImpl The concrete socket implementation type.
     233                 :     @tparam Op The concrete accept operation type.
     234                 :     @param op The operation to complete.
     235                 : */
     236                 : template<typename SocketImpl, typename Op>
     237                 : void
     238            8432 : complete_accept_op(Op& op)
     239                 : {
     240            8432 :     op.stop_cb.reset();
     241            8432 :     op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
     242                 : 
     243            8432 :     bool success =
     244            8432 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
     245                 : 
     246            8432 :     if (op.cancelled.load(std::memory_order_acquire))
     247              12 :         *op.ec_out = capy::error::canceled;
     248            8420 :     else if (op.errn != 0)
     249 MIS           0 :         *op.ec_out = make_err(op.errn);
     250                 :     else
     251 HIT        8420 :         *op.ec_out = {};
     252                 : 
     253            8432 :     if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
     254                 :     {
     255            8420 :         if (!setup_accepted_socket<SocketImpl>(
     256            8420 :                 op.acceptor_impl_, op.accepted_fd, op.peer_storage,
     257                 :                 op.peer_addrlen, op.impl_out, op.ec_out))
     258 MIS           0 :             success = false;
     259                 :     }
     260                 : 
     261 HIT        8432 :     if (!success || !op.acceptor_impl_)
     262                 :     {
     263              12 :         if (op.accepted_fd >= 0)
     264                 :         {
     265 MIS           0 :             ::close(op.accepted_fd);
     266               0 :             op.accepted_fd = -1;
     267                 :         }
     268 HIT          12 :         if (op.impl_out)
     269              12 :             *op.impl_out = nullptr;
     270                 :     }
     271                 : 
     272            8432 :     op.cont_op.cont.h = op.h;
     273            8432 :     capy::executor_ref saved_ex(op.ex);
     274            8432 :     auto prevent = std::move(op.impl_ptr);
     275            8432 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     276            8432 : }
     277                 : 
     278                 : /** Complete a datagram operation (send_to or recv_from).
     279                 : 
     280                 :     For recv_from operations, writes the source endpoint from the
     281                 :     recorded sockaddr_storage into the caller's endpoint pointer.
     282                 :     Then resumes the caller via symmetric transfer.
     283                 : 
     284                 :     @tparam Op The concrete datagram operation type.
     285                 :     @param op The operation to complete.
     286                 : */
     287                 : template<typename Op>
     288                 : void
     289               6 : complete_datagram_op(Op& op)
     290                 : {
     291               6 :     op.stop_cb.reset();
     292               6 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     293                 : 
     294               6 :     if (op.cancelled.load(std::memory_order_acquire))
     295               2 :         *op.ec_out = capy::error::canceled;
     296               4 :     else if (op.errn != 0)
     297 MIS           0 :         *op.ec_out = make_err(op.errn);
     298                 :     else
     299 HIT           4 :         *op.ec_out = {};
     300                 : 
     301               6 :     *op.bytes_out = op.bytes_transferred;
     302                 : 
     303               6 :     op.cont_op.cont.h = op.h;
     304               6 :     capy::executor_ref saved_ex(op.ex);
     305               6 :     auto prevent = std::move(op.impl_ptr);
     306               6 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     307               6 : }
     308                 : 
     309                 : /** Complete a datagram operation with source endpoint capture.
     310                 : 
     311                 :     For recv_from operations, writes the source endpoint from the
     312                 :     recorded sockaddr_storage into the caller's endpoint pointer.
     313                 :     Then resumes the caller via symmetric transfer.
     314                 : 
     315                 :     @tparam Op The concrete datagram operation type.
     316                 :     @param op The operation to complete.
     317                 :     @param source_out Optional pointer to store source endpoint
     318                 :         (non-null for recv_from, null for send_to).
     319                 : */
     320                 : template<typename Op, typename Endpoint>
     321                 : void
     322              18 : complete_datagram_op(Op& op, Endpoint* source_out)
     323                 : {
     324              18 :     op.stop_cb.reset();
     325              18 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     326                 : 
     327              18 :     if (op.cancelled.load(std::memory_order_acquire))
     328               6 :         *op.ec_out = capy::error::canceled;
     329              12 :     else if (op.errn != 0)
     330 MIS           0 :         *op.ec_out = make_err(op.errn);
     331                 :     else
     332 HIT          12 :         *op.ec_out = {};
     333                 : 
     334              18 :     *op.bytes_out = op.bytes_transferred;
     335                 : 
     336              28 :     if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
     337              10 :         op.errn == 0)
     338              20 :         *source_out = from_sockaddr_as(
     339              10 :             op.source_storage,
     340                 :             op.source_addrlen,
     341                 :             Endpoint{});
     342                 : 
     343              18 :     op.cont_op.cont.h = op.h;
     344              18 :     capy::executor_ref saved_ex(op.ex);
     345              18 :     auto prevent = std::move(op.impl_ptr);
     346              18 :     dispatch_coro(saved_ex, op.cont_op.cont).resume();
     347              18 : }
     348                 : 
     349                 : } // namespace boost::corosio::detail
     350                 : 
     351                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
        

Generated by: LCOV version 2.3