TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/platform.hpp>
15 : #include <boost/corosio/detail/except.hpp>
16 : #include <boost/corosio/detail/native_handle.hpp>
17 : #include <boost/corosio/detail/op_base.hpp>
18 : #include <boost/corosio/io/io_stream.hpp>
19 : #include <boost/capy/io_result.hpp>
20 : #include <boost/corosio/detail/buffer_param.hpp>
21 : #include <boost/corosio/local_endpoint.hpp>
22 : #include <boost/corosio/local_stream.hpp>
23 : #include <boost/corosio/shutdown_type.hpp>
24 : #include <boost/corosio/wait_type.hpp>
25 : #include <boost/capy/ex/executor_ref.hpp>
26 : #include <boost/capy/ex/execution_context.hpp>
27 : #include <boost/capy/ex/io_env.hpp>
28 : #include <boost/capy/concept/executor.hpp>
29 :
30 : #include <system_error>
31 :
32 : #include <concepts>
33 : #include <coroutine>
34 : #include <cstddef>
35 : #include <stop_token>
36 : #include <type_traits>
37 :
38 : namespace boost::corosio {
39 :
40 : /** An asynchronous Unix stream socket for coroutine I/O.
41 :
42 : This class provides asynchronous Unix domain stream socket
43 : operations that return awaitable types. Each operation
44 : participates in the affine awaitable protocol, ensuring
45 : coroutines resume on the correct executor.
46 :
47 : The socket must be opened before performing I/O operations.
48 : Operations support cancellation through `std::stop_token` via
49 : the affine protocol, or explicitly through the `cancel()`
50 : member function.
51 :
52 : @par Thread Safety
53 : Distinct objects: Safe.@n
54 : Shared objects: Unsafe. A socket must not have concurrent
55 : operations of the same type (e.g., two simultaneous reads).
56 : One read and one write may be in flight simultaneously.
57 :
58 : @par Semantics
59 : Wraps the platform Unix domain socket stack. Operations
60 : dispatch to OS socket APIs via the io_context backend
61 : (epoll, kqueue, select, or IOCP). Satisfies @ref capy::Stream.
62 :
63 : @par Example
64 : @code
65 : io_context ioc;
66 : local_stream_socket s(ioc);
67 : s.open();
68 :
69 : auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock"));
70 : if (ec)
71 : co_return;
72 :
73 : char buf[1024];
74 : auto [read_ec, n] = co_await s.read_some(
75 : capy::mutable_buffer(buf, sizeof(buf)));
76 : @endcode
77 : */
78 : class BOOST_COROSIO_DECL local_stream_socket : public io_stream
79 : {
80 : public:
81 : /// The endpoint type used by this socket.
82 : using endpoint_type = corosio::local_endpoint;
83 :
84 : using shutdown_type = corosio::shutdown_type;
85 : using enum corosio::shutdown_type;
86 :
87 : /** Define backend hooks for local stream socket operations.
88 :
89 : Platform backends (epoll, kqueue, select) derive from this
90 : to implement socket I/O, connection, and option management.
91 : */
92 : struct implementation : io_stream::implementation
93 : {
94 : /** Initiate an asynchronous connect to the given endpoint.
95 :
96 : @param h Coroutine handle to resume on completion.
97 : @param ex Executor for dispatching the completion.
98 : @param ep The local endpoint (path) to connect to.
99 : @param token Stop token for cancellation.
100 : @param ec Output error code.
101 :
102 : @return Coroutine handle to resume immediately.
103 : */
104 : virtual std::coroutine_handle<> connect(
105 : std::coroutine_handle<> h,
106 : capy::executor_ref ex,
107 : corosio::local_endpoint ep,
108 : std::stop_token token,
109 : std::error_code* ec) = 0;
110 :
111 : /** Initiate an asynchronous wait for socket readiness.
112 :
113 : Completes when the socket becomes ready for the
114 : specified direction, or an error condition is
115 : reported. No bytes are transferred.
116 :
117 : @param h Coroutine handle to resume on completion.
118 : @param ex Executor for dispatching the completion.
119 : @param w The direction to wait on.
120 : @param token Stop token for cancellation.
121 : @param ec Output error code.
122 :
123 : @return Coroutine handle to resume immediately.
124 : */
125 : virtual std::coroutine_handle<> wait(
126 : std::coroutine_handle<> h,
127 : capy::executor_ref ex,
128 : wait_type w,
129 : std::stop_token token,
130 : std::error_code* ec) = 0;
131 :
132 : /** Shut down the socket for the given direction(s).
133 :
134 : @param what The shutdown direction.
135 :
136 : @return Error code on failure, empty on success.
137 : */
138 : virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
139 :
140 : /// Return the platform socket descriptor.
141 : virtual native_handle_type native_handle() const noexcept = 0;
142 :
143 : /** Release ownership of the native socket handle.
144 :
145 : Deregisters the socket from the reactor without closing
146 : the descriptor. The caller takes ownership.
147 :
148 : @return The native handle.
149 : */
150 : virtual native_handle_type release_socket() noexcept = 0;
151 :
152 : /** Request cancellation of pending asynchronous operations.
153 :
154 : All outstanding operations complete with operation_canceled error.
155 : Check `ec == cond::canceled` for portable comparison.
156 : */
157 : virtual void cancel() noexcept = 0;
158 :
159 : /** Set a socket option.
160 :
161 : @param level The protocol level (e.g. `SOL_SOCKET`).
162 : @param optname The option name (e.g. `SO_KEEPALIVE`).
163 : @param data Pointer to the option value.
164 : @param size Size of the option value in bytes.
165 : @return Error code on failure, empty on success.
166 : */
167 : virtual std::error_code set_option(
168 : int level,
169 : int optname,
170 : void const* data,
171 : std::size_t size) noexcept = 0;
172 :
173 : /** Get a socket option.
174 :
175 : @param level The protocol level (e.g. `SOL_SOCKET`).
176 : @param optname The option name (e.g. `SO_KEEPALIVE`).
177 : @param data Pointer to receive the option value.
178 : @param size On entry, the size of the buffer. On exit,
179 : the size of the option value.
180 : @return Error code on failure, empty on success.
181 : */
182 : virtual std::error_code
183 : get_option(int level, int optname, void* data, std::size_t* size)
184 : const noexcept = 0;
185 :
186 : /// Return the cached local endpoint.
187 : virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
188 :
189 : /// Return the cached remote endpoint.
190 : virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
191 : };
192 :
193 : /// Represent the awaitable returned by @ref connect.
194 : struct connect_awaitable
195 : : detail::void_op_base<connect_awaitable>
196 : {
197 : local_stream_socket& s_;
198 : corosio::local_endpoint endpoint_;
199 :
200 HIT 6 : connect_awaitable(
201 : local_stream_socket& s, corosio::local_endpoint ep) noexcept
202 6 : : s_(s), endpoint_(ep) {}
203 :
204 6 : std::coroutine_handle<> dispatch(
205 : std::coroutine_handle<> h, capy::executor_ref ex) const
206 : {
207 6 : return s_.get().connect(h, ex, endpoint_, token_, &ec_);
208 : }
209 : };
210 :
211 : /// Represent the awaitable returned by @ref wait.
212 : struct wait_awaitable
213 : : detail::void_op_base<wait_awaitable>
214 : {
215 : local_stream_socket& s_;
216 : wait_type w_;
217 :
218 2 : wait_awaitable(local_stream_socket& s, wait_type w) noexcept
219 2 : : s_(s), w_(w) {}
220 :
221 2 : std::coroutine_handle<> dispatch(
222 : std::coroutine_handle<> h, capy::executor_ref ex) const
223 : {
224 2 : return s_.get().wait(h, ex, w_, token_, &ec_);
225 : }
226 : };
227 :
228 : public:
229 : /** Destructor.
230 :
231 : Closes the socket if open, cancelling any pending operations.
232 : */
233 : ~local_stream_socket() override;
234 :
235 : /** Construct a socket from an execution context.
236 :
237 : @param ctx The execution context that will own this socket.
238 : */
239 : explicit local_stream_socket(capy::execution_context& ctx);
240 :
241 : /** Construct a socket from an executor.
242 :
243 : The socket is associated with the executor's context.
244 :
245 : @param ex The executor whose context will own the socket.
246 : */
247 : template<class Ex>
248 : requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
249 : capy::Executor<Ex>
250 : explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
251 : {
252 : }
253 :
254 : /** Move constructor.
255 :
256 : Transfers ownership of the socket resources.
257 :
258 : @param other The socket to move from.
259 :
260 : @pre No awaitables returned by @p other's methods exist.
261 : @pre The execution context associated with @p other must
262 : outlive this socket.
263 : */
264 22 : local_stream_socket(local_stream_socket&& other) noexcept
265 22 : : io_object(std::move(other))
266 : {
267 22 : }
268 :
269 : /** Move assignment operator.
270 :
271 : Closes any existing socket and transfers ownership.
272 :
273 : @param other The socket to move from.
274 :
275 : @pre No awaitables returned by either `*this` or @p other's
276 : methods exist.
277 : @pre The execution context associated with @p other must
278 : outlive this socket.
279 :
280 : @return Reference to this socket.
281 : */
282 : local_stream_socket& operator=(local_stream_socket&& other) noexcept
283 : {
284 : if (this != &other)
285 : {
286 : close();
287 : io_object::operator=(std::move(other));
288 : }
289 : return *this;
290 : }
291 :
292 : local_stream_socket(local_stream_socket const&) = delete;
293 : local_stream_socket& operator=(local_stream_socket const&) = delete;
294 :
295 : /** Open the socket.
296 :
297 : Creates a Unix stream socket and associates it with
298 : the platform reactor.
299 :
300 : @param proto The protocol. Defaults to local_stream{}.
301 :
302 : @throws std::system_error on failure.
303 : */
304 : void open(local_stream proto = {});
305 :
306 : /** Close the socket.
307 :
308 : Releases socket resources. Any pending operations complete
309 : with `errc::operation_canceled`.
310 : */
311 : void close();
312 :
313 : /** Check if the socket is open.
314 :
315 : @return `true` if the socket is open and ready for operations.
316 : */
317 126 : bool is_open() const noexcept
318 : {
319 : #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
320 : return h_ && get().native_handle() != ~native_handle_type(0);
321 : #else
322 126 : return h_ && get().native_handle() >= 0;
323 : #endif
324 : }
325 :
326 : /** Initiate an asynchronous connect operation.
327 :
328 : If the socket is not already open, it is opened automatically.
329 :
330 : @param ep The local endpoint (path) to connect to.
331 :
332 : @return An awaitable that completes with io_result<>.
333 :
334 : @throws std::system_error if the socket needs to be opened
335 : and the open fails.
336 : */
337 6 : auto connect(corosio::local_endpoint ep)
338 : {
339 6 : if (!is_open())
340 MIS 0 : open();
341 HIT 6 : return connect_awaitable(*this, ep);
342 : }
343 :
344 : /** Wait for the socket to become ready in a given direction.
345 :
346 : Suspends until the socket is ready for the requested
347 : direction, or an error condition is reported. No bytes
348 : are transferred.
349 :
350 : @param w The wait direction (read, write, or error).
351 :
352 : @return An awaitable that completes with `io_result<>`.
353 :
354 : @par Preconditions
355 : The socket must be open. This socket must outlive the
356 : returned awaitable.
357 : */
358 2 : [[nodiscard]] auto wait(wait_type w)
359 : {
360 2 : return wait_awaitable(*this, w);
361 : }
362 :
363 : /** Cancel any pending asynchronous operations.
364 :
365 : All outstanding operations complete with `errc::operation_canceled`.
366 : Check `ec == cond::canceled` for portable comparison.
367 : */
368 : void cancel();
369 :
370 : /** Get the native socket handle.
371 :
372 : Returns the underlying platform-specific socket descriptor.
373 : On POSIX systems this is an `int` file descriptor.
374 :
375 : @return The native socket handle, or an invalid sentinel
376 : if not open.
377 : */
378 : native_handle_type native_handle() const noexcept;
379 :
380 : /** Query the number of bytes available for reading.
381 :
382 : @return The number of bytes that can be read without blocking.
383 :
384 : @throws std::logic_error if the socket is not open.
385 : @throws std::system_error on ioctl failure.
386 : */
387 : std::size_t available() const;
388 :
389 : /** Release ownership of the native socket handle.
390 :
391 : Deregisters the socket from the backend and cancels pending
392 : operations without closing the descriptor. The caller takes
393 : ownership of the returned handle.
394 :
395 : @return The native handle.
396 :
397 : @throws std::logic_error if the socket is not open.
398 :
399 : @post is_open() == false
400 : */
401 : native_handle_type release();
402 :
403 : /** Disable sends or receives on the socket.
404 :
405 : Unix stream connections are full-duplex: each direction
406 : (send and receive) operates independently. This function
407 : allows you to close one or both directions without
408 : destroying the socket.
409 :
410 : @param what Determines what operations will no longer
411 : be allowed.
412 :
413 : @throws std::system_error on failure.
414 : */
415 : void shutdown(shutdown_type what);
416 :
417 : /** Shut down part or all of the socket (non-throwing).
418 :
419 : @param what Which direction to shut down.
420 : @param ec Set to the error code on failure.
421 : */
422 : void shutdown(shutdown_type what, std::error_code& ec) noexcept;
423 :
424 : /** Set a socket option.
425 :
426 : Applies a type-safe socket option to the underlying socket.
427 : The option type encodes the protocol level and option name.
428 :
429 : @param opt The option to set.
430 :
431 : @throws std::logic_error if the socket is not open.
432 : @throws std::system_error on failure.
433 : */
434 : template<class Option>
435 : void set_option(Option const& opt)
436 : {
437 : if (!is_open())
438 : detail::throw_logic_error("set_option: socket not open");
439 : std::error_code ec = get().set_option(
440 : Option::level(), Option::name(), opt.data(), opt.size());
441 : if (ec)
442 : detail::throw_system_error(ec, "local_stream_socket::set_option");
443 : }
444 :
445 : /** Get a socket option.
446 :
447 : Retrieves the current value of a type-safe socket option.
448 :
449 : @return The current option value.
450 :
451 : @throws std::logic_error if the socket is not open.
452 : @throws std::system_error on failure.
453 : */
454 : template<class Option>
455 : Option get_option() const
456 : {
457 : if (!is_open())
458 : detail::throw_logic_error("get_option: socket not open");
459 : Option opt{};
460 : std::size_t sz = opt.size();
461 : std::error_code ec =
462 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
463 : if (ec)
464 : detail::throw_system_error(ec, "local_stream_socket::get_option");
465 : opt.resize(sz);
466 : return opt;
467 : }
468 :
469 : /** Assign an existing file descriptor to this socket.
470 :
471 : The socket must not already be open. The fd is adopted
472 : and registered with the platform reactor. Used by
473 : make_local_stream_pair() to wrap socketpair() fds.
474 :
475 : @param fd The file descriptor to adopt. Must be a valid,
476 : open, non-blocking Unix stream socket.
477 :
478 : @throws std::system_error on failure.
479 : */
480 : void assign(native_handle_type fd);
481 :
482 : /** Get the local endpoint of the socket.
483 :
484 : Returns the local address (path) to which the socket is bound.
485 : The endpoint is cached when the connection is established.
486 :
487 : @return The local endpoint, or a default endpoint if the socket
488 : is not connected.
489 : */
490 : corosio::local_endpoint local_endpoint() const noexcept;
491 :
492 : /** Get the remote endpoint of the socket.
493 :
494 : Returns the remote address (path) to which the socket is connected.
495 : The endpoint is cached when the connection is established.
496 :
497 : @return The remote endpoint, or a default endpoint if the socket
498 : is not connected.
499 : */
500 : corosio::local_endpoint remote_endpoint() const noexcept;
501 :
502 : protected:
503 MIS 0 : local_stream_socket() noexcept = default;
504 :
505 : explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
506 :
507 : private:
508 : friend class local_stream_acceptor;
509 :
510 : void open_for_family(int family, int type, int protocol);
511 :
512 HIT 116 : inline implementation& get() const noexcept
513 : {
514 116 : return *static_cast<implementation*>(h_.get());
515 : }
516 : };
517 :
518 : } // namespace boost::corosio
519 :
520 : #endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
|