include/boost/corosio/native/detail/select/select_socket_service.hpp

75.6% Lines (301/398) 94.6% Functions (35/37)
include/boost/corosio/native/detail/select/select_socket_service.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 135 explicit select_socket_state(select_scheduler& sched) noexcept
88 135 : sched_(sched)
89 {
90 135 }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code open_socket(tcp_socket::implementation& impl) override;
119
120 10820 select_scheduler& scheduler() const noexcept
121 {
122 10820 return state_->sched_;
123 }
124 void post(select_op* op);
125 void work_started() noexcept;
126 void work_finished() noexcept;
127
128 private:
129 std::unique_ptr<select_socket_state> state_;
130 };
131
132 // Backward compatibility alias
133 using select_sockets = select_socket_service;
134
135 inline void
136 97 select_op::canceller::operator()() const noexcept
137 {
138 97 op->cancel();
139 97 }
140
141 inline void
142 select_connect_op::cancel() noexcept
143 {
144 if (socket_impl_)
145 socket_impl_->cancel_single_op(*this);
146 else
147 request_cancel();
148 }
149
150 inline void
151 97 select_read_op::cancel() noexcept
152 {
153 97 if (socket_impl_)
154 97 socket_impl_->cancel_single_op(*this);
155 else
156 request_cancel();
157 97 }
158
159 inline void
160 select_write_op::cancel() noexcept
161 {
162 if (socket_impl_)
163 socket_impl_->cancel_single_op(*this);
164 else
165 request_cancel();
166 }
167
168 inline void
169 3457 select_connect_op::operator()()
170 {
171 3457 stop_cb.reset();
172
173 3457 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
174
175 // Cache endpoints on successful connect
176 3457 if (success && socket_impl_)
177 {
178 // Query local endpoint via getsockname (may fail, but remote is always known)
179 3456 endpoint local_ep;
180 3456 sockaddr_in local_addr{};
181 3456 socklen_t local_len = sizeof(local_addr);
182 3456 if (::getsockname(
183 3456 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
184 3456 local_ep = from_sockaddr_in(local_addr);
185 // Always cache remote endpoint; local may be default if getsockname failed
186 3456 static_cast<select_socket*>(socket_impl_)
187 3456 ->set_endpoints(local_ep, target_endpoint);
188 }
189
190 3457 if (ec_out)
191 {
192 3457 if (cancelled.load(std::memory_order_acquire))
193 *ec_out = capy::error::canceled;
194 3457 else if (errn != 0)
195 1 *ec_out = make_err(errn);
196 else
197 3456 *ec_out = {};
198 }
199
200 3457 if (bytes_out)
201 *bytes_out = bytes_transferred;
202
203 // Move to stack before destroying the frame
204 3457 capy::executor_ref saved_ex(ex);
205 3457 std::coroutine_handle<> saved_h(h);
206 3457 impl_ptr.reset();
207 3457 dispatch_coro(saved_ex, saved_h).resume();
208 3457 }
209
210 10388 inline select_socket::select_socket(select_socket_service& svc) noexcept
211 10388 : svc_(svc)
212 {
213 10388 }
214
215 inline std::coroutine_handle<>
216 3457 select_socket::connect(
217 std::coroutine_handle<> h,
218 capy::executor_ref ex,
219 endpoint ep,
220 std::stop_token token,
221 std::error_code* ec)
222 {
223 3457 auto& op = conn_;
224 3457 op.reset();
225 3457 op.h = h;
226 3457 op.ex = ex;
227 3457 op.ec_out = ec;
228 3457 op.fd = fd_;
229 3457 op.target_endpoint = ep; // Store target for endpoint caching
230 3457 op.start(token, this);
231
232 3457 sockaddr_in addr = detail::to_sockaddr_in(ep);
233 int result =
234 3457 ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
235
236 3457 if (result == 0)
237 {
238 // Sync success - cache endpoints immediately
239 sockaddr_in local_addr{};
240 socklen_t local_len = sizeof(local_addr);
241 if (::getsockname(
242 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
243 local_endpoint_ = detail::from_sockaddr_in(local_addr);
244 remote_endpoint_ = ep;
245
246 op.complete(0, 0);
247 op.impl_ptr = shared_from_this();
248 svc_.post(&op);
249 // completion is always posted to scheduler queue, never inline.
250 return std::noop_coroutine();
251 }
252
253 3457 if (errno == EINPROGRESS)
254 {
255 3457 svc_.work_started();
256 3457 op.impl_ptr = shared_from_this();
257
258 // Set registering BEFORE register_fd to close the race window where
259 // reactor sees an event before we set registered. The reactor treats
260 // registering the same as registered when claiming the op.
261 3457 op.registered.store(
262 select_registration_state::registering, std::memory_order_release);
263 3457 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
264
265 // Transition to registered. If this fails, reactor or cancel already
266 // claimed the op (state is now unregistered), so we're done. However,
267 // we must still deregister the fd because cancel's deregister_fd may
268 // have run before our register_fd, leaving the fd orphaned.
269 3457 auto expected = select_registration_state::registering;
270 3457 if (!op.registered.compare_exchange_strong(
271 expected, select_registration_state::registered,
272 std::memory_order_acq_rel))
273 {
274 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
275 // completion is always posted to scheduler queue, never inline.
276 return std::noop_coroutine();
277 }
278
279 // If cancelled was set before we registered, handle it now.
280 3457 if (op.cancelled.load(std::memory_order_acquire))
281 {
282 auto prev = op.registered.exchange(
283 select_registration_state::unregistered,
284 std::memory_order_acq_rel);
285 if (prev != select_registration_state::unregistered)
286 {
287 svc_.scheduler().deregister_fd(
288 fd_, select_scheduler::event_write);
289 op.impl_ptr = shared_from_this();
290 svc_.post(&op);
291 svc_.work_finished();
292 }
293 }
294 // completion is always posted to scheduler queue, never inline.
295 3457 return std::noop_coroutine();
296 }
297
298 op.complete(errno, 0);
299 op.impl_ptr = shared_from_this();
300 svc_.post(&op);
301 // completion is always posted to scheduler queue, never inline.
302 return std::noop_coroutine();
303 }
304
305 inline std::coroutine_handle<>
306 59829 select_socket::read_some(
307 std::coroutine_handle<> h,
308 capy::executor_ref ex,
309 io_buffer_param param,
310 std::stop_token token,
311 std::error_code* ec,
312 std::size_t* bytes_out)
313 {
314 59829 auto& op = rd_;
315 59829 op.reset();
316 59829 op.h = h;
317 59829 op.ex = ex;
318 59829 op.ec_out = ec;
319 59829 op.bytes_out = bytes_out;
320 59829 op.fd = fd_;
321 59829 op.start(token, this);
322
323 59829 capy::mutable_buffer bufs[select_read_op::max_buffers];
324 59829 op.iovec_count =
325 59829 static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
326
327 59829 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
328 {
329 1 op.empty_buffer_read = true;
330 1 op.complete(0, 0);
331 1 op.impl_ptr = shared_from_this();
332 1 svc_.post(&op);
333 1 return std::noop_coroutine();
334 }
335
336 119656 for (int i = 0; i < op.iovec_count; ++i)
337 {
338 59828 op.iovecs[i].iov_base = bufs[i].data();
339 59828 op.iovecs[i].iov_len = bufs[i].size();
340 }
341
342 59828 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
343
344 59828 if (n > 0)
345 {
346 59542 op.complete(0, static_cast<std::size_t>(n));
347 59542 op.impl_ptr = shared_from_this();
348 59542 svc_.post(&op);
349 59542 return std::noop_coroutine();
350 }
351
352 286 if (n == 0)
353 {
354 5 op.complete(0, 0);
355 5 op.impl_ptr = shared_from_this();
356 5 svc_.post(&op);
357 5 return std::noop_coroutine();
358 }
359
360 281 if (errno == EAGAIN || errno == EWOULDBLOCK)
361 {
362 281 svc_.work_started();
363 281 op.impl_ptr = shared_from_this();
364
365 // Set registering BEFORE register_fd to close the race window where
366 // reactor sees an event before we set registered.
367 281 op.registered.store(
368 select_registration_state::registering, std::memory_order_release);
369 281 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
370
371 // Transition to registered. If this fails, reactor or cancel already
372 // claimed the op (state is now unregistered), so we're done. However,
373 // we must still deregister the fd because cancel's deregister_fd may
374 // have run before our register_fd, leaving the fd orphaned.
375 281 auto expected = select_registration_state::registering;
376 281 if (!op.registered.compare_exchange_strong(
377 expected, select_registration_state::registered,
378 std::memory_order_acq_rel))
379 {
380 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
381 return std::noop_coroutine();
382 }
383
384 // If cancelled was set before we registered, handle it now.
385 281 if (op.cancelled.load(std::memory_order_acquire))
386 {
387 auto prev = op.registered.exchange(
388 select_registration_state::unregistered,
389 std::memory_order_acq_rel);
390 if (prev != select_registration_state::unregistered)
391 {
392 svc_.scheduler().deregister_fd(
393 fd_, select_scheduler::event_read);
394 op.impl_ptr = shared_from_this();
395 svc_.post(&op);
396 svc_.work_finished();
397 }
398 }
399 281 return std::noop_coroutine();
400 }
401
402 op.complete(errno, 0);
403 op.impl_ptr = shared_from_this();
404 svc_.post(&op);
405 return std::noop_coroutine();
406 }
407
408 inline std::coroutine_handle<>
409 59667 select_socket::write_some(
410 std::coroutine_handle<> h,
411 capy::executor_ref ex,
412 io_buffer_param param,
413 std::stop_token token,
414 std::error_code* ec,
415 std::size_t* bytes_out)
416 {
417 59667 auto& op = wr_;
418 59667 op.reset();
419 59667 op.h = h;
420 59667 op.ex = ex;
421 59667 op.ec_out = ec;
422 59667 op.bytes_out = bytes_out;
423 59667 op.fd = fd_;
424 59667 op.start(token, this);
425
426 59667 capy::mutable_buffer bufs[select_write_op::max_buffers];
427 59667 op.iovec_count =
428 59667 static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
429
430 59667 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
431 {
432 1 op.complete(0, 0);
433 1 op.impl_ptr = shared_from_this();
434 1 svc_.post(&op);
435 1 return std::noop_coroutine();
436 }
437
438 119332 for (int i = 0; i < op.iovec_count; ++i)
439 {
440 59666 op.iovecs[i].iov_base = bufs[i].data();
441 59666 op.iovecs[i].iov_len = bufs[i].size();
442 }
443
444 59666 msghdr msg{};
445 59666 msg.msg_iov = op.iovecs;
446 59666 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
447
448 59666 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
449
450 59666 if (n > 0)
451 {
452 59665 op.complete(0, static_cast<std::size_t>(n));
453 59665 op.impl_ptr = shared_from_this();
454 59665 svc_.post(&op);
455 59665 return std::noop_coroutine();
456 }
457
458 1 if (errno == EAGAIN || errno == EWOULDBLOCK)
459 {
460 svc_.work_started();
461 op.impl_ptr = shared_from_this();
462
463 // Set registering BEFORE register_fd to close the race window where
464 // reactor sees an event before we set registered.
465 op.registered.store(
466 select_registration_state::registering, std::memory_order_release);
467 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
468
469 // Transition to registered. If this fails, reactor or cancel already
470 // claimed the op (state is now unregistered), so we're done. However,
471 // we must still deregister the fd because cancel's deregister_fd may
472 // have run before our register_fd, leaving the fd orphaned.
473 auto expected = select_registration_state::registering;
474 if (!op.registered.compare_exchange_strong(
475 expected, select_registration_state::registered,
476 std::memory_order_acq_rel))
477 {
478 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
479 return std::noop_coroutine();
480 }
481
482 // If cancelled was set before we registered, handle it now.
483 if (op.cancelled.load(std::memory_order_acquire))
484 {
485 auto prev = op.registered.exchange(
486 select_registration_state::unregistered,
487 std::memory_order_acq_rel);
488 if (prev != select_registration_state::unregistered)
489 {
490 svc_.scheduler().deregister_fd(
491 fd_, select_scheduler::event_write);
492 op.impl_ptr = shared_from_this();
493 svc_.post(&op);
494 svc_.work_finished();
495 }
496 }
497 return std::noop_coroutine();
498 }
499
500 1 op.complete(errno ? errno : EIO, 0);
501 1 op.impl_ptr = shared_from_this();
502 1 svc_.post(&op);
503 1 return std::noop_coroutine();
504 }
505
506 inline std::error_code
507 3 select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
508 {
509 int how;
510 3 switch (what)
511 {
512 1 case tcp_socket::shutdown_receive:
513 1 how = SHUT_RD;
514 1 break;
515 1 case tcp_socket::shutdown_send:
516 1 how = SHUT_WR;
517 1 break;
518 1 case tcp_socket::shutdown_both:
519 1 how = SHUT_RDWR;
520 1 break;
521 default:
522 return make_err(EINVAL);
523 }
524 3 if (::shutdown(fd_, how) != 0)
525 return make_err(errno);
526 3 return {};
527 }
528
529 inline std::error_code
530 5 select_socket::set_no_delay(bool value) noexcept
531 {
532 5 int flag = value ? 1 : 0;
533 5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
534 return make_err(errno);
535 5 return {};
536 }
537
538 inline bool
539 5 select_socket::no_delay(std::error_code& ec) const noexcept
540 {
541 5 int flag = 0;
542 5 socklen_t len = sizeof(flag);
543 5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
544 {
545 ec = make_err(errno);
546 return false;
547 }
548 5 ec = {};
549 5 return flag != 0;
550 }
551
552 inline std::error_code
553 4 select_socket::set_keep_alive(bool value) noexcept
554 {
555 4 int flag = value ? 1 : 0;
556 4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
557 return make_err(errno);
558 4 return {};
559 }
560
561 inline bool
562 4 select_socket::keep_alive(std::error_code& ec) const noexcept
563 {
564 4 int flag = 0;
565 4 socklen_t len = sizeof(flag);
566 4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
567 {
568 ec = make_err(errno);
569 return false;
570 }
571 4 ec = {};
572 4 return flag != 0;
573 }
574
575 inline std::error_code
576 1 select_socket::set_receive_buffer_size(int size) noexcept
577 {
578 1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
579 return make_err(errno);
580 1 return {};
581 }
582
583 inline int
584 3 select_socket::receive_buffer_size(std::error_code& ec) const noexcept
585 {
586 3 int size = 0;
587 3 socklen_t len = sizeof(size);
588 3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
589 {
590 ec = make_err(errno);
591 return 0;
592 }
593 3 ec = {};
594 3 return size;
595 }
596
597 inline std::error_code
598 1 select_socket::set_send_buffer_size(int size) noexcept
599 {
600 1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
601 return make_err(errno);
602 1 return {};
603 }
604
605 inline int
606 3 select_socket::send_buffer_size(std::error_code& ec) const noexcept
607 {
608 3 int size = 0;
609 3 socklen_t len = sizeof(size);
610 3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
611 {
612 ec = make_err(errno);
613 return 0;
614 }
615 3 ec = {};
616 3 return size;
617 }
618
619 inline std::error_code
620 6 select_socket::set_linger(bool enabled, int timeout) noexcept
621 {
622 6 if (timeout < 0)
623 1 return make_err(EINVAL);
624 struct ::linger lg;
625 5 lg.l_onoff = enabled ? 1 : 0;
626 5 lg.l_linger = timeout;
627 5 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
628 return make_err(errno);
629 5 return {};
630 }
631
632 inline tcp_socket::linger_options
633 3 select_socket::linger(std::error_code& ec) const noexcept
634 {
635 3 struct ::linger lg{};
636 3 socklen_t len = sizeof(lg);
637 3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
638 {
639 ec = make_err(errno);
640 return {};
641 }
642 3 ec = {};
643 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
644 }
645
646 inline void
647 177 select_socket::cancel() noexcept
648 {
649 177 auto self = weak_from_this().lock();
650 177 if (!self)
651 return;
652
653 531 auto cancel_op = [this, &self](select_op& op, int events) {
654 531 auto prev = op.registered.exchange(
655 select_registration_state::unregistered, std::memory_order_acq_rel);
656 531 op.request_cancel();
657 531 if (prev != select_registration_state::unregistered)
658 {
659 92 svc_.scheduler().deregister_fd(fd_, events);
660 92 op.impl_ptr = self;
661 92 svc_.post(&op);
662 92 svc_.work_finished();
663 }
664 708 };
665
666 177 cancel_op(conn_, select_scheduler::event_write);
667 177 cancel_op(rd_, select_scheduler::event_read);
668 177 cancel_op(wr_, select_scheduler::event_write);
669 177 }
670
671 inline void
672 97 select_socket::cancel_single_op(select_op& op) noexcept
673 {
674 97 auto self = weak_from_this().lock();
675 97 if (!self)
676 return;
677
678 // Called from stop_token callback to cancel a specific pending operation.
679 97 auto prev = op.registered.exchange(
680 select_registration_state::unregistered, std::memory_order_acq_rel);
681 97 op.request_cancel();
682
683 97 if (prev != select_registration_state::unregistered)
684 {
685 // Determine which event type to deregister
686 65 int events = 0;
687 65 if (&op == &conn_ || &op == &wr_)
688 events = select_scheduler::event_write;
689 65 else if (&op == &rd_)
690 65 events = select_scheduler::event_read;
691
692 65 svc_.scheduler().deregister_fd(fd_, events);
693
694 65 op.impl_ptr = self;
695 65 svc_.post(&op);
696 65 svc_.work_finished();
697 }
698 97 }
699
700 inline void
701 31168 select_socket::close_socket() noexcept
702 {
703 31168 auto self = weak_from_this().lock();
704 31168 if (self)
705 {
706 93504 auto cancel_op = [this, &self](select_op& op, int events) {
707 93504 auto prev = op.registered.exchange(
708 select_registration_state::unregistered,
709 std::memory_order_acq_rel);
710 93504 op.request_cancel();
711 93504 if (prev != select_registration_state::unregistered)
712 {
713 1 svc_.scheduler().deregister_fd(fd_, events);
714 1 op.impl_ptr = self;
715 1 svc_.post(&op);
716 1 svc_.work_finished();
717 }
718 124672 };
719
720 31168 cancel_op(conn_, select_scheduler::event_write);
721 31168 cancel_op(rd_, select_scheduler::event_read);
722 31168 cancel_op(wr_, select_scheduler::event_write);
723 }
724
725 31168 if (fd_ >= 0)
726 {
727 6924 svc_.scheduler().deregister_fd(
728 fd_, select_scheduler::event_read | select_scheduler::event_write);
729 6924 ::close(fd_);
730 6924 fd_ = -1;
731 }
732
733 31168 local_endpoint_ = endpoint{};
734 31168 remote_endpoint_ = endpoint{};
735 31168 }
736
737 135 inline select_socket_service::select_socket_service(
738 135 capy::execution_context& ctx)
739 135 : state_(
740 std::make_unique<select_socket_state>(
741 135 ctx.use_service<select_scheduler>()))
742 {
743 135 }
744
745 270 inline select_socket_service::~select_socket_service() {}
746
747 inline void
748 135 select_socket_service::shutdown()
749 {
750 135 std::lock_guard lock(state_->mutex_);
751
752 135 while (auto* impl = state_->socket_list_.pop_front())
753 impl->close_socket();
754
755 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
756 // drains completed_ops_, calling destroy() on each queued op. Letting
757 // ~state_ release the ptrs (during service destruction, after scheduler
758 // shutdown) keeps every impl alive until all ops have been drained.
759 135 }
760
761 inline io_object::implementation*
762 10388 select_socket_service::construct()
763 {
764 10388 auto impl = std::make_shared<select_socket>(*this);
765 10388 auto* raw = impl.get();
766
767 {
768 10388 std::lock_guard lock(state_->mutex_);
769 10388 state_->socket_list_.push_back(raw);
770 10388 state_->socket_ptrs_.emplace(raw, std::move(impl));
771 10388 }
772
773 10388 return raw;
774 10388 }
775
776 inline void
777 10388 select_socket_service::destroy(io_object::implementation* impl)
778 {
779 10388 auto* select_impl = static_cast<select_socket*>(impl);
780 10388 select_impl->close_socket();
781 10388 std::lock_guard lock(state_->mutex_);
782 10388 state_->socket_list_.remove(select_impl);
783 10388 state_->socket_ptrs_.erase(select_impl);
784 10388 }
785
786 inline std::error_code
787 3468 select_socket_service::open_socket(tcp_socket::implementation& impl)
788 {
789 3468 auto* select_impl = static_cast<select_socket*>(&impl);
790 3468 select_impl->close_socket();
791
792 3468 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
793 3468 if (fd < 0)
794 return make_err(errno);
795
796 // Set non-blocking and close-on-exec
797 3468 int flags = ::fcntl(fd, F_GETFL, 0);
798 3468 if (flags == -1)
799 {
800 int errn = errno;
801 ::close(fd);
802 return make_err(errn);
803 }
804 3468 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
805 {
806 int errn = errno;
807 ::close(fd);
808 return make_err(errn);
809 }
810 3468 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
811 {
812 int errn = errno;
813 ::close(fd);
814 return make_err(errn);
815 }
816
817 // Check fd is within select() limits
818 3468 if (fd >= FD_SETSIZE)
819 {
820 ::close(fd);
821 return make_err(EMFILE); // Too many open files
822 }
823
824 3468 select_impl->fd_ = fd;
825 3468 return {};
826 }
827
828 inline void
829 17312 select_socket_service::close(io_object::handle& h)
830 {
831 17312 static_cast<select_socket*>(h.get())->close_socket();
832 17312 }
833
834 inline void
835 119373 select_socket_service::post(select_op* op)
836 {
837 119373 state_->sched_.post(op);
838 119373 }
839
840 inline void
841 3738 select_socket_service::work_started() noexcept
842 {
843 3738 state_->sched_.work_started();
844 3738 }
845
846 inline void
847 158 select_socket_service::work_finished() noexcept
848 {
849 158 state_->sched_.work_finished();
850 158 }
851
852 } // namespace boost::corosio::detail
853
854 #endif // BOOST_COROSIO_HAS_SELECT
855
856 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
857