TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23 :
24 : #include <boost/corosio/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/except.hpp>
28 : #include <boost/capy/buffers.hpp>
29 :
30 : #include <coroutine>
31 : #include <mutex>
32 : #include <unordered_map>
33 : #include <utility>
34 :
35 : #include <errno.h>
36 : #include <netinet/in.h>
37 : #include <netinet/tcp.h>
38 : #include <sys/epoll.h>
39 : #include <sys/socket.h>
40 : #include <unistd.h>
41 :
42 : /*
43 : epoll Socket Implementation
44 : ===========================
45 :
46 : Each I/O operation follows the same pattern:
47 : 1. Try the syscall immediately (non-blocking socket)
48 : 2. If it succeeds or fails with a real error, post to completion queue
49 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50 :
51 : This "try first" approach avoids unnecessary epoll round-trips for
52 : operations that can complete immediately (common for small reads/writes
53 : on fast local connections).
54 :
55 : One-Shot Registration
56 : ---------------------
57 : We use one-shot epoll registration: each operation registers, waits for
58 : one event, then unregisters. This simplifies the state machine since we
59 : don't need to track whether an fd is currently registered or handle
60 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 : simplicity is worth it.
62 :
63 : Cancellation
64 : ------------
65 : See op.hpp for the completion/cancellation race handling via the
66 : `registered` atomic. cancel() must complete pending operations (post
67 : them with cancelled flag) so coroutines waiting on them can resume.
68 : close_socket() calls cancel() first to ensure this.
69 :
70 : Impl Lifetime with shared_ptr
71 : -----------------------------
72 : Socket impls use enable_shared_from_this. The service owns impls via
73 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 : removal. When a user calls close(), we call cancel() which posts pending
75 : ops to the scheduler.
76 :
77 : CRITICAL: The posted ops must keep the impl alive until they complete.
78 : Otherwise the scheduler would process a freed op (use-after-free). The
79 : cancel() method captures shared_from_this() into op.impl_ptr before
80 : posting. When the op completes, impl_ptr is cleared, allowing the impl
81 : to be destroyed if no other references exist.
82 :
83 : Service Ownership
84 : -----------------
85 : epoll_socket_service owns all socket impls. destroy_impl() removes the
86 : shared_ptr from the map, but the impl may survive if ops still hold
87 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 : in-flight ops will complete and release their refs.
89 : */
90 :
91 : namespace boost::corosio::detail {
92 :
93 : /** State for epoll socket service. */
94 : class epoll_socket_state
95 : {
96 : public:
97 HIT 205 : explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 : {
99 205 : }
100 :
101 : epoll_scheduler& sched_;
102 : std::mutex mutex_;
103 : intrusive_list<epoll_socket> socket_list_;
104 : std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 : socket_ptrs_;
106 : };
107 :
108 : /** epoll socket service implementation.
109 :
110 : Inherits from socket_service to enable runtime polymorphism.
111 : Uses key_type = socket_service for service lookup.
112 : */
113 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 : {
115 : public:
116 : explicit epoll_socket_service(capy::execution_context& ctx);
117 : ~epoll_socket_service() override;
118 :
119 : epoll_socket_service(epoll_socket_service const&) = delete;
120 : epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121 :
122 : void shutdown() override;
123 :
124 : io_object::implementation* construct() override;
125 : void destroy(io_object::implementation*) override;
126 : void close(io_object::handle&) override;
127 : std::error_code open_socket(tcp_socket::implementation& impl) override;
128 :
129 359355 : epoll_scheduler& scheduler() const noexcept
130 : {
131 359355 : return state_->sched_;
132 : }
133 : void post(epoll_op* op);
134 : void work_started() noexcept;
135 : void work_finished() noexcept;
136 :
137 : private:
138 : std::unique_ptr<epoll_socket_state> state_;
139 : };
140 :
141 : //--------------------------------------------------------------------------
142 : //
143 : // Implementation
144 : //
145 : //--------------------------------------------------------------------------
146 :
147 : // Register an op with the reactor, handling cached edge events.
148 : // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
149 : inline void
150 4935 : epoll_socket::register_op(
151 : epoll_op& op,
152 : epoll_op*& desc_slot,
153 : bool& ready_flag,
154 : bool& cancel_flag) noexcept
155 : {
156 4935 : svc_.work_started();
157 :
158 4935 : std::lock_guard lock(desc_state_.mutex);
159 4935 : bool io_done = false;
160 4935 : if (ready_flag)
161 : {
162 142 : ready_flag = false;
163 142 : op.perform_io();
164 142 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
165 142 : if (!io_done)
166 142 : op.errn = 0;
167 : }
168 :
169 4935 : if (cancel_flag)
170 : {
171 95 : cancel_flag = false;
172 95 : op.cancelled.store(true, std::memory_order_relaxed);
173 : }
174 :
175 4935 : if (io_done || op.cancelled.load(std::memory_order_acquire))
176 : {
177 95 : svc_.post(&op);
178 95 : svc_.work_finished();
179 : }
180 : else
181 : {
182 4840 : desc_slot = &op;
183 : }
184 4935 : }
185 :
186 : inline void
187 104 : epoll_op::canceller::operator()() const noexcept
188 : {
189 104 : op->cancel();
190 104 : }
191 :
192 : inline void
193 MIS 0 : epoll_connect_op::cancel() noexcept
194 : {
195 0 : if (socket_impl_)
196 0 : socket_impl_->cancel_single_op(*this);
197 : else
198 0 : request_cancel();
199 0 : }
200 :
201 : inline void
202 HIT 98 : epoll_read_op::cancel() noexcept
203 : {
204 98 : if (socket_impl_)
205 98 : socket_impl_->cancel_single_op(*this);
206 : else
207 MIS 0 : request_cancel();
208 HIT 98 : }
209 :
210 : inline void
211 MIS 0 : epoll_write_op::cancel() noexcept
212 : {
213 0 : if (socket_impl_)
214 0 : socket_impl_->cancel_single_op(*this);
215 : else
216 0 : request_cancel();
217 0 : }
218 :
219 : inline void
220 HIT 56050 : epoll_op::operator()()
221 : {
222 56050 : stop_cb.reset();
223 :
224 56050 : socket_impl_->svc_.scheduler().reset_inline_budget();
225 :
226 56050 : if (cancelled.load(std::memory_order_acquire))
227 205 : *ec_out = capy::error::canceled;
228 55845 : else if (errn != 0)
229 MIS 0 : *ec_out = make_err(errn);
230 HIT 55845 : else if (is_read_operation() && bytes_transferred == 0)
231 MIS 0 : *ec_out = capy::error::eof;
232 : else
233 HIT 55845 : *ec_out = {};
234 :
235 56050 : *bytes_out = bytes_transferred;
236 :
237 : // Move to stack before resuming coroutine. The coroutine might close
238 : // the socket, releasing the last wrapper ref. If impl_ptr were the
239 : // last ref and we destroyed it while still in operator(), we'd have
240 : // use-after-free. Moving to local ensures destruction happens at
241 : // function exit, after all member accesses are complete.
242 56050 : capy::executor_ref saved_ex(ex);
243 56050 : std::coroutine_handle<> saved_h(h);
244 56050 : auto prevent_premature_destruction = std::move(impl_ptr);
245 56050 : dispatch_coro(saved_ex, saved_h).resume();
246 56050 : }
247 :
248 : inline void
249 4734 : epoll_connect_op::operator()()
250 : {
251 4734 : stop_cb.reset();
252 :
253 4734 : socket_impl_->svc_.scheduler().reset_inline_budget();
254 :
255 4734 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
256 :
257 : // Cache endpoints on successful connect
258 4734 : if (success && socket_impl_)
259 : {
260 : // Query local endpoint via getsockname (may fail, but remote is always known)
261 4732 : endpoint local_ep;
262 4732 : sockaddr_in local_addr{};
263 4732 : socklen_t local_len = sizeof(local_addr);
264 4732 : if (::getsockname(
265 4732 : fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
266 4732 : local_ep = from_sockaddr_in(local_addr);
267 : // Always cache remote endpoint; local may be default if getsockname failed
268 4732 : static_cast<epoll_socket*>(socket_impl_)
269 4732 : ->set_endpoints(local_ep, target_endpoint);
270 : }
271 :
272 4734 : if (cancelled.load(std::memory_order_acquire))
273 MIS 0 : *ec_out = capy::error::canceled;
274 HIT 4734 : else if (errn != 0)
275 2 : *ec_out = make_err(errn);
276 : else
277 4732 : *ec_out = {};
278 :
279 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
280 4734 : capy::executor_ref saved_ex(ex);
281 4734 : std::coroutine_handle<> saved_h(h);
282 4734 : auto prevent_premature_destruction = std::move(impl_ptr);
283 4734 : dispatch_coro(saved_ex, saved_h).resume();
284 4734 : }
285 :
286 14254 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
287 14254 : : svc_(svc)
288 : {
289 14254 : }
290 :
291 14254 : inline epoll_socket::~epoll_socket() = default;
292 :
293 : inline std::coroutine_handle<>
294 4734 : epoll_socket::connect(
295 : std::coroutine_handle<> h,
296 : capy::executor_ref ex,
297 : endpoint ep,
298 : std::stop_token token,
299 : std::error_code* ec)
300 : {
301 4734 : auto& op = conn_;
302 :
303 4734 : sockaddr_in addr = detail::to_sockaddr_in(ep);
304 : int result =
305 4734 : ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
306 :
307 4734 : if (result == 0)
308 : {
309 MIS 0 : sockaddr_in local_addr{};
310 0 : socklen_t local_len = sizeof(local_addr);
311 0 : if (::getsockname(
312 0 : fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
313 0 : local_endpoint_ = detail::from_sockaddr_in(local_addr);
314 0 : remote_endpoint_ = ep;
315 : }
316 :
317 HIT 4734 : if (result == 0 || errno != EINPROGRESS)
318 : {
319 MIS 0 : int err = (result < 0) ? errno : 0;
320 0 : if (svc_.scheduler().try_consume_inline_budget())
321 : {
322 0 : *ec = err ? make_err(err) : std::error_code{};
323 0 : return dispatch_coro(ex, h);
324 : }
325 0 : op.reset();
326 0 : op.h = h;
327 0 : op.ex = ex;
328 0 : op.ec_out = ec;
329 0 : op.fd = fd_;
330 0 : op.target_endpoint = ep;
331 0 : op.start(token, this);
332 0 : op.impl_ptr = shared_from_this();
333 0 : op.complete(err, 0);
334 0 : svc_.post(&op);
335 0 : return std::noop_coroutine();
336 : }
337 :
338 : // EINPROGRESS — register with reactor
339 HIT 4734 : op.reset();
340 4734 : op.h = h;
341 4734 : op.ex = ex;
342 4734 : op.ec_out = ec;
343 4734 : op.fd = fd_;
344 4734 : op.target_endpoint = ep;
345 4734 : op.start(token, this);
346 4734 : op.impl_ptr = shared_from_this();
347 :
348 4734 : register_op(
349 4734 : op, desc_state_.connect_op, desc_state_.write_ready,
350 4734 : desc_state_.connect_cancel_pending);
351 4734 : return std::noop_coroutine();
352 : }
353 :
354 : inline std::coroutine_handle<>
355 140010 : epoll_socket::read_some(
356 : std::coroutine_handle<> h,
357 : capy::executor_ref ex,
358 : io_buffer_param param,
359 : std::stop_token token,
360 : std::error_code* ec,
361 : std::size_t* bytes_out)
362 : {
363 140010 : auto& op = rd_;
364 140010 : op.reset();
365 :
366 140010 : capy::mutable_buffer bufs[epoll_read_op::max_buffers];
367 140010 : op.iovec_count =
368 140010 : static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
369 :
370 140010 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
371 : {
372 1 : op.empty_buffer_read = true;
373 1 : op.h = h;
374 1 : op.ex = ex;
375 1 : op.ec_out = ec;
376 1 : op.bytes_out = bytes_out;
377 1 : op.start(token, this);
378 1 : op.impl_ptr = shared_from_this();
379 1 : op.complete(0, 0);
380 1 : svc_.post(&op);
381 1 : return std::noop_coroutine();
382 : }
383 :
384 280018 : for (int i = 0; i < op.iovec_count; ++i)
385 : {
386 140009 : op.iovecs[i].iov_base = bufs[i].data();
387 140009 : op.iovecs[i].iov_len = bufs[i].size();
388 : }
389 :
390 : // Speculative read
391 : ssize_t n;
392 : do
393 : {
394 140009 : n = ::readv(fd_, op.iovecs, op.iovec_count);
395 : }
396 140009 : while (n < 0 && errno == EINTR);
397 :
398 140009 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
399 : {
400 139808 : int err = (n < 0) ? errno : 0;
401 139808 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
402 :
403 139808 : if (svc_.scheduler().try_consume_inline_budget())
404 : {
405 111894 : if (err)
406 MIS 0 : *ec = make_err(err);
407 HIT 111894 : else if (n == 0)
408 5 : *ec = capy::error::eof;
409 : else
410 111889 : *ec = {};
411 111894 : *bytes_out = bytes;
412 111894 : return dispatch_coro(ex, h);
413 : }
414 27914 : op.h = h;
415 27914 : op.ex = ex;
416 27914 : op.ec_out = ec;
417 27914 : op.bytes_out = bytes_out;
418 27914 : op.start(token, this);
419 27914 : op.impl_ptr = shared_from_this();
420 27914 : op.complete(err, bytes);
421 27914 : svc_.post(&op);
422 27914 : return std::noop_coroutine();
423 : }
424 :
425 : // EAGAIN — register with reactor
426 201 : op.h = h;
427 201 : op.ex = ex;
428 201 : op.ec_out = ec;
429 201 : op.bytes_out = bytes_out;
430 201 : op.fd = fd_;
431 201 : op.start(token, this);
432 201 : op.impl_ptr = shared_from_this();
433 :
434 201 : register_op(
435 201 : op, desc_state_.read_op, desc_state_.read_ready,
436 201 : desc_state_.read_cancel_pending);
437 201 : return std::noop_coroutine();
438 : }
439 :
440 : inline std::coroutine_handle<>
441 139810 : epoll_socket::write_some(
442 : std::coroutine_handle<> h,
443 : capy::executor_ref ex,
444 : io_buffer_param param,
445 : std::stop_token token,
446 : std::error_code* ec,
447 : std::size_t* bytes_out)
448 : {
449 139810 : auto& op = wr_;
450 139810 : op.reset();
451 :
452 139810 : capy::mutable_buffer bufs[epoll_write_op::max_buffers];
453 139810 : op.iovec_count =
454 139810 : static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
455 :
456 139810 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
457 : {
458 1 : op.h = h;
459 1 : op.ex = ex;
460 1 : op.ec_out = ec;
461 1 : op.bytes_out = bytes_out;
462 1 : op.start(token, this);
463 1 : op.impl_ptr = shared_from_this();
464 1 : op.complete(0, 0);
465 1 : svc_.post(&op);
466 1 : return std::noop_coroutine();
467 : }
468 :
469 279618 : for (int i = 0; i < op.iovec_count; ++i)
470 : {
471 139809 : op.iovecs[i].iov_base = bufs[i].data();
472 139809 : op.iovecs[i].iov_len = bufs[i].size();
473 : }
474 :
475 : // Speculative write
476 139809 : msghdr msg{};
477 139809 : msg.msg_iov = op.iovecs;
478 139809 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
479 :
480 : ssize_t n;
481 : do
482 : {
483 139809 : n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
484 : }
485 139809 : while (n < 0 && errno == EINTR);
486 :
487 139809 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
488 : {
489 139809 : int err = (n < 0) ? errno : 0;
490 139809 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
491 :
492 139809 : if (svc_.scheduler().try_consume_inline_budget())
493 : {
494 111876 : *ec = err ? make_err(err) : std::error_code{};
495 111876 : *bytes_out = bytes;
496 111876 : return dispatch_coro(ex, h);
497 : }
498 27933 : op.h = h;
499 27933 : op.ex = ex;
500 27933 : op.ec_out = ec;
501 27933 : op.bytes_out = bytes_out;
502 27933 : op.start(token, this);
503 27933 : op.impl_ptr = shared_from_this();
504 27933 : op.complete(err, bytes);
505 27933 : svc_.post(&op);
506 27933 : return std::noop_coroutine();
507 : }
508 :
509 : // EAGAIN — register with reactor
510 MIS 0 : op.h = h;
511 0 : op.ex = ex;
512 0 : op.ec_out = ec;
513 0 : op.bytes_out = bytes_out;
514 0 : op.fd = fd_;
515 0 : op.start(token, this);
516 0 : op.impl_ptr = shared_from_this();
517 :
518 0 : register_op(
519 0 : op, desc_state_.write_op, desc_state_.write_ready,
520 0 : desc_state_.write_cancel_pending);
521 0 : return std::noop_coroutine();
522 : }
523 :
524 : inline std::error_code
525 HIT 3 : epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
526 : {
527 : int how;
528 3 : switch (what)
529 : {
530 1 : case tcp_socket::shutdown_receive:
531 1 : how = SHUT_RD;
532 1 : break;
533 1 : case tcp_socket::shutdown_send:
534 1 : how = SHUT_WR;
535 1 : break;
536 1 : case tcp_socket::shutdown_both:
537 1 : how = SHUT_RDWR;
538 1 : break;
539 MIS 0 : default:
540 0 : return make_err(EINVAL);
541 : }
542 HIT 3 : if (::shutdown(fd_, how) != 0)
543 MIS 0 : return make_err(errno);
544 HIT 3 : return {};
545 : }
546 :
547 : inline std::error_code
548 5 : epoll_socket::set_no_delay(bool value) noexcept
549 : {
550 5 : int flag = value ? 1 : 0;
551 5 : if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
552 MIS 0 : return make_err(errno);
553 HIT 5 : return {};
554 : }
555 :
556 : inline bool
557 5 : epoll_socket::no_delay(std::error_code& ec) const noexcept
558 : {
559 5 : int flag = 0;
560 5 : socklen_t len = sizeof(flag);
561 5 : if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
562 : {
563 MIS 0 : ec = make_err(errno);
564 0 : return false;
565 : }
566 HIT 5 : ec = {};
567 5 : return flag != 0;
568 : }
569 :
570 : inline std::error_code
571 4 : epoll_socket::set_keep_alive(bool value) noexcept
572 : {
573 4 : int flag = value ? 1 : 0;
574 4 : if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
575 MIS 0 : return make_err(errno);
576 HIT 4 : return {};
577 : }
578 :
579 : inline bool
580 4 : epoll_socket::keep_alive(std::error_code& ec) const noexcept
581 : {
582 4 : int flag = 0;
583 4 : socklen_t len = sizeof(flag);
584 4 : if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
585 : {
586 MIS 0 : ec = make_err(errno);
587 0 : return false;
588 : }
589 HIT 4 : ec = {};
590 4 : return flag != 0;
591 : }
592 :
593 : inline std::error_code
594 1 : epoll_socket::set_receive_buffer_size(int size) noexcept
595 : {
596 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
597 MIS 0 : return make_err(errno);
598 HIT 1 : return {};
599 : }
600 :
601 : inline int
602 3 : epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
603 : {
604 3 : int size = 0;
605 3 : socklen_t len = sizeof(size);
606 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
607 : {
608 MIS 0 : ec = make_err(errno);
609 0 : return 0;
610 : }
611 HIT 3 : ec = {};
612 3 : return size;
613 : }
614 :
615 : inline std::error_code
616 1 : epoll_socket::set_send_buffer_size(int size) noexcept
617 : {
618 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
619 MIS 0 : return make_err(errno);
620 HIT 1 : return {};
621 : }
622 :
623 : inline int
624 3 : epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
625 : {
626 3 : int size = 0;
627 3 : socklen_t len = sizeof(size);
628 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
629 : {
630 MIS 0 : ec = make_err(errno);
631 0 : return 0;
632 : }
633 HIT 3 : ec = {};
634 3 : return size;
635 : }
636 :
637 : inline std::error_code
638 10 : epoll_socket::set_linger(bool enabled, int timeout) noexcept
639 : {
640 10 : if (timeout < 0)
641 1 : return make_err(EINVAL);
642 : struct ::linger lg;
643 9 : lg.l_onoff = enabled ? 1 : 0;
644 9 : lg.l_linger = timeout;
645 9 : if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
646 MIS 0 : return make_err(errno);
647 HIT 9 : return {};
648 : }
649 :
650 : inline tcp_socket::linger_options
651 3 : epoll_socket::linger(std::error_code& ec) const noexcept
652 : {
653 3 : struct ::linger lg{};
654 3 : socklen_t len = sizeof(lg);
655 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
656 : {
657 MIS 0 : ec = make_err(errno);
658 0 : return {};
659 : }
660 HIT 3 : ec = {};
661 3 : return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
662 : }
663 :
664 : inline void
665 187 : epoll_socket::cancel() noexcept
666 : {
667 187 : auto self = weak_from_this().lock();
668 187 : if (!self)
669 MIS 0 : return;
670 :
671 HIT 187 : conn_.request_cancel();
672 187 : rd_.request_cancel();
673 187 : wr_.request_cancel();
674 :
675 187 : epoll_op* conn_claimed = nullptr;
676 187 : epoll_op* rd_claimed = nullptr;
677 187 : epoll_op* wr_claimed = nullptr;
678 : {
679 187 : std::lock_guard lock(desc_state_.mutex);
680 187 : if (desc_state_.connect_op == &conn_)
681 MIS 0 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
682 : else
683 HIT 187 : desc_state_.connect_cancel_pending = true;
684 187 : if (desc_state_.read_op == &rd_)
685 3 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
686 : else
687 184 : desc_state_.read_cancel_pending = true;
688 187 : if (desc_state_.write_op == &wr_)
689 MIS 0 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
690 : else
691 HIT 187 : desc_state_.write_cancel_pending = true;
692 187 : }
693 :
694 187 : if (conn_claimed)
695 : {
696 MIS 0 : conn_.impl_ptr = self;
697 0 : svc_.post(&conn_);
698 0 : svc_.work_finished();
699 : }
700 HIT 187 : if (rd_claimed)
701 : {
702 3 : rd_.impl_ptr = self;
703 3 : svc_.post(&rd_);
704 3 : svc_.work_finished();
705 : }
706 187 : if (wr_claimed)
707 : {
708 MIS 0 : wr_.impl_ptr = self;
709 0 : svc_.post(&wr_);
710 0 : svc_.work_finished();
711 : }
712 HIT 187 : }
713 :
714 : inline void
715 98 : epoll_socket::cancel_single_op(epoll_op& op) noexcept
716 : {
717 98 : auto self = weak_from_this().lock();
718 98 : if (!self)
719 MIS 0 : return;
720 :
721 HIT 98 : op.request_cancel();
722 :
723 98 : epoll_op** desc_op_ptr = nullptr;
724 98 : if (&op == &conn_)
725 MIS 0 : desc_op_ptr = &desc_state_.connect_op;
726 HIT 98 : else if (&op == &rd_)
727 98 : desc_op_ptr = &desc_state_.read_op;
728 MIS 0 : else if (&op == &wr_)
729 0 : desc_op_ptr = &desc_state_.write_op;
730 :
731 HIT 98 : if (desc_op_ptr)
732 : {
733 98 : epoll_op* claimed = nullptr;
734 : {
735 98 : std::lock_guard lock(desc_state_.mutex);
736 98 : if (*desc_op_ptr == &op)
737 98 : claimed = std::exchange(*desc_op_ptr, nullptr);
738 MIS 0 : else if (&op == &conn_)
739 0 : desc_state_.connect_cancel_pending = true;
740 0 : else if (&op == &rd_)
741 0 : desc_state_.read_cancel_pending = true;
742 0 : else if (&op == &wr_)
743 0 : desc_state_.write_cancel_pending = true;
744 HIT 98 : }
745 98 : if (claimed)
746 : {
747 98 : op.impl_ptr = self;
748 98 : svc_.post(&op);
749 98 : svc_.work_finished();
750 : }
751 : }
752 98 : }
753 :
754 : inline void
755 42730 : epoll_socket::close_socket() noexcept
756 : {
757 42730 : auto self = weak_from_this().lock();
758 42730 : if (self)
759 : {
760 42730 : conn_.request_cancel();
761 42730 : rd_.request_cancel();
762 42730 : wr_.request_cancel();
763 :
764 42730 : epoll_op* conn_claimed = nullptr;
765 42730 : epoll_op* rd_claimed = nullptr;
766 42730 : epoll_op* wr_claimed = nullptr;
767 : {
768 42730 : std::lock_guard lock(desc_state_.mutex);
769 42730 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
770 42730 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
771 42730 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
772 42730 : desc_state_.read_ready = false;
773 42730 : desc_state_.write_ready = false;
774 42730 : desc_state_.read_cancel_pending = false;
775 42730 : desc_state_.write_cancel_pending = false;
776 42730 : desc_state_.connect_cancel_pending = false;
777 42730 : }
778 :
779 42730 : if (conn_claimed)
780 : {
781 MIS 0 : conn_.impl_ptr = self;
782 0 : svc_.post(&conn_);
783 0 : svc_.work_finished();
784 : }
785 HIT 42730 : if (rd_claimed)
786 : {
787 1 : rd_.impl_ptr = self;
788 1 : svc_.post(&rd_);
789 1 : svc_.work_finished();
790 : }
791 42730 : if (wr_claimed)
792 : {
793 MIS 0 : wr_.impl_ptr = self;
794 0 : svc_.post(&wr_);
795 0 : svc_.work_finished();
796 : }
797 :
798 HIT 42730 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
799 94 : desc_state_.impl_ref_ = self;
800 : }
801 :
802 42730 : if (fd_ >= 0)
803 : {
804 9477 : if (desc_state_.registered_events != 0)
805 9477 : svc_.scheduler().deregister_descriptor(fd_);
806 9477 : ::close(fd_);
807 9477 : fd_ = -1;
808 : }
809 :
810 42730 : desc_state_.fd = -1;
811 42730 : desc_state_.registered_events = 0;
812 :
813 42730 : local_endpoint_ = endpoint{};
814 42730 : remote_endpoint_ = endpoint{};
815 42730 : }
816 :
817 205 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
818 205 : : state_(
819 : std::make_unique<epoll_socket_state>(
820 205 : ctx.use_service<epoll_scheduler>()))
821 : {
822 205 : }
823 :
824 410 : inline epoll_socket_service::~epoll_socket_service() {}
825 :
826 : inline void
827 205 : epoll_socket_service::shutdown()
828 : {
829 205 : std::lock_guard lock(state_->mutex_);
830 :
831 205 : while (auto* impl = state_->socket_list_.pop_front())
832 MIS 0 : impl->close_socket();
833 :
834 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
835 : // drains completed_ops_, calling destroy() on each queued op. If we
836 : // released our shared_ptrs now, an epoll_op::destroy() could free the
837 : // last ref to an impl whose embedded descriptor_state is still linked
838 : // in the queue — use-after-free on the next pop(). Letting ~state_
839 : // release the ptrs (during service destruction, after scheduler
840 : // shutdown) keeps every impl alive until all ops have been drained.
841 HIT 205 : }
842 :
843 : inline io_object::implementation*
844 14254 : epoll_socket_service::construct()
845 : {
846 14254 : auto impl = std::make_shared<epoll_socket>(*this);
847 14254 : auto* raw = impl.get();
848 :
849 : {
850 14254 : std::lock_guard lock(state_->mutex_);
851 14254 : state_->socket_list_.push_back(raw);
852 14254 : state_->socket_ptrs_.emplace(raw, std::move(impl));
853 14254 : }
854 :
855 14254 : return raw;
856 14254 : }
857 :
858 : inline void
859 14254 : epoll_socket_service::destroy(io_object::implementation* impl)
860 : {
861 14254 : auto* epoll_impl = static_cast<epoll_socket*>(impl);
862 14254 : epoll_impl->close_socket();
863 14254 : std::lock_guard lock(state_->mutex_);
864 14254 : state_->socket_list_.remove(epoll_impl);
865 14254 : state_->socket_ptrs_.erase(epoll_impl);
866 14254 : }
867 :
868 : inline std::error_code
869 4745 : epoll_socket_service::open_socket(tcp_socket::implementation& impl)
870 : {
871 4745 : auto* epoll_impl = static_cast<epoll_socket*>(&impl);
872 4745 : epoll_impl->close_socket();
873 :
874 4745 : int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
875 4745 : if (fd < 0)
876 MIS 0 : return make_err(errno);
877 :
878 HIT 4745 : epoll_impl->fd_ = fd;
879 :
880 : // Register fd with epoll (edge-triggered mode)
881 4745 : epoll_impl->desc_state_.fd = fd;
882 : {
883 4745 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
884 4745 : epoll_impl->desc_state_.read_op = nullptr;
885 4745 : epoll_impl->desc_state_.write_op = nullptr;
886 4745 : epoll_impl->desc_state_.connect_op = nullptr;
887 4745 : }
888 4745 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
889 :
890 4745 : return {};
891 : }
892 :
893 : inline void
894 23731 : epoll_socket_service::close(io_object::handle& h)
895 : {
896 23731 : static_cast<epoll_socket*>(h.get())->close_socket();
897 23731 : }
898 :
899 : inline void
900 56046 : epoll_socket_service::post(epoll_op* op)
901 : {
902 56046 : state_->sched_.post(op);
903 56046 : }
904 :
905 : inline void
906 4935 : epoll_socket_service::work_started() noexcept
907 : {
908 4935 : state_->sched_.work_started();
909 4935 : }
910 :
911 : inline void
912 197 : epoll_socket_service::work_finished() noexcept
913 : {
914 197 : state_->sched_.work_finished();
915 197 : }
916 :
917 : } // namespace boost::corosio::detail
918 :
919 : #endif // BOOST_COROSIO_HAS_EPOLL
920 :
921 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
|