include/boost/corosio/local_stream_socket.hpp

90.9% Lines (20/22) 90.0% List of functions (9/10)
local_stream_socket.hpp
f(x) Functions (10)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
11 #define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/platform.hpp>
15 #include <boost/corosio/detail/except.hpp>
16 #include <boost/corosio/detail/native_handle.hpp>
17 #include <boost/corosio/detail/op_base.hpp>
18 #include <boost/corosio/io/io_stream.hpp>
19 #include <boost/capy/io_result.hpp>
20 #include <boost/corosio/detail/buffer_param.hpp>
21 #include <boost/corosio/local_endpoint.hpp>
22 #include <boost/corosio/local_stream.hpp>
23 #include <boost/corosio/shutdown_type.hpp>
24 #include <boost/corosio/wait_type.hpp>
25 #include <boost/capy/ex/executor_ref.hpp>
26 #include <boost/capy/ex/execution_context.hpp>
27 #include <boost/capy/ex/io_env.hpp>
28 #include <boost/capy/concept/executor.hpp>
29
30 #include <system_error>
31
32 #include <concepts>
33 #include <coroutine>
34 #include <cstddef>
35 #include <stop_token>
36 #include <type_traits>
37
38 namespace boost::corosio {
39
40 /** An asynchronous Unix stream socket for coroutine I/O.
41
42 This class provides asynchronous Unix domain stream socket
43 operations that return awaitable types. Each operation
44 participates in the affine awaitable protocol, ensuring
45 coroutines resume on the correct executor.
46
47 The socket must be opened before performing I/O operations.
48 Operations support cancellation through `std::stop_token` via
49 the affine protocol, or explicitly through the `cancel()`
50 member function.
51
52 @par Thread Safety
53 Distinct objects: Safe.@n
54 Shared objects: Unsafe. A socket must not have concurrent
55 operations of the same type (e.g., two simultaneous reads).
56 One read and one write may be in flight simultaneously.
57
58 @par Semantics
59 Wraps the platform Unix domain socket stack. Operations
60 dispatch to OS socket APIs via the io_context backend
61 (epoll, kqueue, select, or IOCP). Satisfies @ref capy::Stream.
62
63 @par Example
64 @code
65 io_context ioc;
66 local_stream_socket s(ioc);
67 s.open();
68
69 auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock"));
70 if (ec)
71 co_return;
72
73 char buf[1024];
74 auto [read_ec, n] = co_await s.read_some(
75 capy::mutable_buffer(buf, sizeof(buf)));
76 @endcode
77 */
78 class BOOST_COROSIO_DECL local_stream_socket : public io_stream
79 {
80 public:
81 /// The endpoint type used by this socket.
82 using endpoint_type = corosio::local_endpoint;
83
84 using shutdown_type = corosio::shutdown_type;
85 using enum corosio::shutdown_type;
86
87 /** Define backend hooks for local stream socket operations.
88
89 Platform backends (epoll, kqueue, select) derive from this
90 to implement socket I/O, connection, and option management.
91 */
92 struct implementation : io_stream::implementation
93 {
94 /** Initiate an asynchronous connect to the given endpoint.
95
96 @param h Coroutine handle to resume on completion.
97 @param ex Executor for dispatching the completion.
98 @param ep The local endpoint (path) to connect to.
99 @param token Stop token for cancellation.
100 @param ec Output error code.
101
102 @return Coroutine handle to resume immediately.
103 */
104 virtual std::coroutine_handle<> connect(
105 std::coroutine_handle<> h,
106 capy::executor_ref ex,
107 corosio::local_endpoint ep,
108 std::stop_token token,
109 std::error_code* ec) = 0;
110
111 /** Initiate an asynchronous wait for socket readiness.
112
113 Completes when the socket becomes ready for the
114 specified direction, or an error condition is
115 reported. No bytes are transferred.
116
117 @param h Coroutine handle to resume on completion.
118 @param ex Executor for dispatching the completion.
119 @param w The direction to wait on.
120 @param token Stop token for cancellation.
121 @param ec Output error code.
122
123 @return Coroutine handle to resume immediately.
124 */
125 virtual std::coroutine_handle<> wait(
126 std::coroutine_handle<> h,
127 capy::executor_ref ex,
128 wait_type w,
129 std::stop_token token,
130 std::error_code* ec) = 0;
131
132 /** Shut down the socket for the given direction(s).
133
134 @param what The shutdown direction.
135
136 @return Error code on failure, empty on success.
137 */
138 virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
139
140 /// Return the platform socket descriptor.
141 virtual native_handle_type native_handle() const noexcept = 0;
142
143 /** Release ownership of the native socket handle.
144
145 Deregisters the socket from the reactor without closing
146 the descriptor. The caller takes ownership.
147
148 @return The native handle.
149 */
150 virtual native_handle_type release_socket() noexcept = 0;
151
152 /** Request cancellation of pending asynchronous operations.
153
154 All outstanding operations complete with operation_canceled error.
155 Check `ec == cond::canceled` for portable comparison.
156 */
157 virtual void cancel() noexcept = 0;
158
159 /** Set a socket option.
160
161 @param level The protocol level (e.g. `SOL_SOCKET`).
162 @param optname The option name (e.g. `SO_KEEPALIVE`).
163 @param data Pointer to the option value.
164 @param size Size of the option value in bytes.
165 @return Error code on failure, empty on success.
166 */
167 virtual std::error_code set_option(
168 int level,
169 int optname,
170 void const* data,
171 std::size_t size) noexcept = 0;
172
173 /** Get a socket option.
174
175 @param level The protocol level (e.g. `SOL_SOCKET`).
176 @param optname The option name (e.g. `SO_KEEPALIVE`).
177 @param data Pointer to receive the option value.
178 @param size On entry, the size of the buffer. On exit,
179 the size of the option value.
180 @return Error code on failure, empty on success.
181 */
182 virtual std::error_code
183 get_option(int level, int optname, void* data, std::size_t* size)
184 const noexcept = 0;
185
186 /// Return the cached local endpoint.
187 virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
188
189 /// Return the cached remote endpoint.
190 virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
191 };
192
193 /// Represent the awaitable returned by @ref connect.
194 struct connect_awaitable
195 : detail::void_op_base<connect_awaitable>
196 {
197 local_stream_socket& s_;
198 corosio::local_endpoint endpoint_;
199
200 6x connect_awaitable(
201 local_stream_socket& s, corosio::local_endpoint ep) noexcept
202 6x : s_(s), endpoint_(ep) {}
203
204 6x std::coroutine_handle<> dispatch(
205 std::coroutine_handle<> h, capy::executor_ref ex) const
206 {
207 6x return s_.get().connect(h, ex, endpoint_, token_, &ec_);
208 }
209 };
210
211 /// Represent the awaitable returned by @ref wait.
212 struct wait_awaitable
213 : detail::void_op_base<wait_awaitable>
214 {
215 local_stream_socket& s_;
216 wait_type w_;
217
218 2x wait_awaitable(local_stream_socket& s, wait_type w) noexcept
219 2x : s_(s), w_(w) {}
220
221 2x std::coroutine_handle<> dispatch(
222 std::coroutine_handle<> h, capy::executor_ref ex) const
223 {
224 2x return s_.get().wait(h, ex, w_, token_, &ec_);
225 }
226 };
227
228 public:
229 /** Destructor.
230
231 Closes the socket if open, cancelling any pending operations.
232 */
233 ~local_stream_socket() override;
234
235 /** Construct a socket from an execution context.
236
237 @param ctx The execution context that will own this socket.
238 */
239 explicit local_stream_socket(capy::execution_context& ctx);
240
241 /** Construct a socket from an executor.
242
243 The socket is associated with the executor's context.
244
245 @param ex The executor whose context will own the socket.
246 */
247 template<class Ex>
248 requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
249 capy::Executor<Ex>
250 explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
251 {
252 }
253
254 /** Move constructor.
255
256 Transfers ownership of the socket resources.
257
258 @param other The socket to move from.
259
260 @pre No awaitables returned by @p other's methods exist.
261 @pre The execution context associated with @p other must
262 outlive this socket.
263 */
264 22x local_stream_socket(local_stream_socket&& other) noexcept
265 22x : io_object(std::move(other))
266 {
267 22x }
268
269 /** Move assignment operator.
270
271 Closes any existing socket and transfers ownership.
272
273 @param other The socket to move from.
274
275 @pre No awaitables returned by either `*this` or @p other's
276 methods exist.
277 @pre The execution context associated with @p other must
278 outlive this socket.
279
280 @return Reference to this socket.
281 */
282 local_stream_socket& operator=(local_stream_socket&& other) noexcept
283 {
284 if (this != &other)
285 {
286 close();
287 io_object::operator=(std::move(other));
288 }
289 return *this;
290 }
291
292 local_stream_socket(local_stream_socket const&) = delete;
293 local_stream_socket& operator=(local_stream_socket const&) = delete;
294
295 /** Open the socket.
296
297 Creates a Unix stream socket and associates it with
298 the platform reactor.
299
300 @param proto The protocol. Defaults to local_stream{}.
301
302 @throws std::system_error on failure.
303 */
304 void open(local_stream proto = {});
305
306 /** Close the socket.
307
308 Releases socket resources. Any pending operations complete
309 with `errc::operation_canceled`.
310 */
311 void close();
312
313 /** Check if the socket is open.
314
315 @return `true` if the socket is open and ready for operations.
316 */
317 126x bool is_open() const noexcept
318 {
319 #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
320 return h_ && get().native_handle() != ~native_handle_type(0);
321 #else
322 126x return h_ && get().native_handle() >= 0;
323 #endif
324 }
325
326 /** Initiate an asynchronous connect operation.
327
328 If the socket is not already open, it is opened automatically.
329
330 @param ep The local endpoint (path) to connect to.
331
332 @return An awaitable that completes with io_result<>.
333
334 @throws std::system_error if the socket needs to be opened
335 and the open fails.
336 */
337 6x auto connect(corosio::local_endpoint ep)
338 {
339 6x if (!is_open())
340 open();
341 6x return connect_awaitable(*this, ep);
342 }
343
344 /** Wait for the socket to become ready in a given direction.
345
346 Suspends until the socket is ready for the requested
347 direction, or an error condition is reported. No bytes
348 are transferred.
349
350 @param w The wait direction (read, write, or error).
351
352 @return An awaitable that completes with `io_result<>`.
353
354 @par Preconditions
355 The socket must be open. This socket must outlive the
356 returned awaitable.
357 */
358 2x [[nodiscard]] auto wait(wait_type w)
359 {
360 2x return wait_awaitable(*this, w);
361 }
362
363 /** Cancel any pending asynchronous operations.
364
365 All outstanding operations complete with `errc::operation_canceled`.
366 Check `ec == cond::canceled` for portable comparison.
367 */
368 void cancel();
369
370 /** Get the native socket handle.
371
372 Returns the underlying platform-specific socket descriptor.
373 On POSIX systems this is an `int` file descriptor.
374
375 @return The native socket handle, or an invalid sentinel
376 if not open.
377 */
378 native_handle_type native_handle() const noexcept;
379
380 /** Query the number of bytes available for reading.
381
382 @return The number of bytes that can be read without blocking.
383
384 @throws std::logic_error if the socket is not open.
385 @throws std::system_error on ioctl failure.
386 */
387 std::size_t available() const;
388
389 /** Release ownership of the native socket handle.
390
391 Deregisters the socket from the backend and cancels pending
392 operations without closing the descriptor. The caller takes
393 ownership of the returned handle.
394
395 @return The native handle.
396
397 @throws std::logic_error if the socket is not open.
398
399 @post is_open() == false
400 */
401 native_handle_type release();
402
403 /** Disable sends or receives on the socket.
404
405 Unix stream connections are full-duplex: each direction
406 (send and receive) operates independently. This function
407 allows you to close one or both directions without
408 destroying the socket.
409
410 @param what Determines what operations will no longer
411 be allowed.
412
413 @throws std::system_error on failure.
414 */
415 void shutdown(shutdown_type what);
416
417 /** Shut down part or all of the socket (non-throwing).
418
419 @param what Which direction to shut down.
420 @param ec Set to the error code on failure.
421 */
422 void shutdown(shutdown_type what, std::error_code& ec) noexcept;
423
424 /** Set a socket option.
425
426 Applies a type-safe socket option to the underlying socket.
427 The option type encodes the protocol level and option name.
428
429 @param opt The option to set.
430
431 @throws std::logic_error if the socket is not open.
432 @throws std::system_error on failure.
433 */
434 template<class Option>
435 void set_option(Option const& opt)
436 {
437 if (!is_open())
438 detail::throw_logic_error("set_option: socket not open");
439 std::error_code ec = get().set_option(
440 Option::level(), Option::name(), opt.data(), opt.size());
441 if (ec)
442 detail::throw_system_error(ec, "local_stream_socket::set_option");
443 }
444
445 /** Get a socket option.
446
447 Retrieves the current value of a type-safe socket option.
448
449 @return The current option value.
450
451 @throws std::logic_error if the socket is not open.
452 @throws std::system_error on failure.
453 */
454 template<class Option>
455 Option get_option() const
456 {
457 if (!is_open())
458 detail::throw_logic_error("get_option: socket not open");
459 Option opt{};
460 std::size_t sz = opt.size();
461 std::error_code ec =
462 get().get_option(Option::level(), Option::name(), opt.data(), &sz);
463 if (ec)
464 detail::throw_system_error(ec, "local_stream_socket::get_option");
465 opt.resize(sz);
466 return opt;
467 }
468
469 /** Assign an existing file descriptor to this socket.
470
471 The socket must not already be open. The fd is adopted
472 and registered with the platform reactor. Used by
473 make_local_stream_pair() to wrap socketpair() fds.
474
475 @param fd The file descriptor to adopt. Must be a valid,
476 open, non-blocking Unix stream socket.
477
478 @throws std::system_error on failure.
479 */
480 void assign(native_handle_type fd);
481
482 /** Get the local endpoint of the socket.
483
484 Returns the local address (path) to which the socket is bound.
485 The endpoint is cached when the connection is established.
486
487 @return The local endpoint, or a default endpoint if the socket
488 is not connected.
489 */
490 corosio::local_endpoint local_endpoint() const noexcept;
491
492 /** Get the remote endpoint of the socket.
493
494 Returns the remote address (path) to which the socket is connected.
495 The endpoint is cached when the connection is established.
496
497 @return The remote endpoint, or a default endpoint if the socket
498 is not connected.
499 */
500 corosio::local_endpoint remote_endpoint() const noexcept;
501
502 protected:
503 local_stream_socket() noexcept = default;
504
505 explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
506
507 private:
508 friend class local_stream_acceptor;
509
510 void open_for_family(int family, int type, int protocol);
511
512 116x inline implementation& get() const noexcept
513 {
514 116x return *static_cast<implementation*>(h_.get());
515 }
516 };
517
518 } // namespace boost::corosio
519
520 #endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
521