LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_socket_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 75.6 % 398 301 97
Test Date: 2026-02-17 21:42:07 Functions: 94.7 % 38 36 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/socket_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_socket.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      23                 : 
      24                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      25                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      26                 : #include <boost/corosio/detail/make_err.hpp>
      27                 : 
      28                 : #include <boost/corosio/detail/except.hpp>
      29                 : 
      30                 : #include <boost/capy/buffers.hpp>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <fcntl.h>
      34                 : #include <netinet/in.h>
      35                 : #include <netinet/tcp.h>
      36                 : #include <sys/socket.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : #include <memory>
      40                 : #include <mutex>
      41                 : #include <unordered_map>
      42                 : 
      43                 : /*
      44                 :     select Socket Implementation
      45                 :     ============================
      46                 : 
      47                 :     This mirrors the epoll_sockets design for behavioral consistency.
      48                 :     Each I/O operation follows the same pattern:
      49                 :       1. Try the syscall immediately (non-blocking socket)
      50                 :       2. If it succeeds or fails with a real error, post to completion queue
      51                 :       3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
      52                 : 
      53                 :     Cancellation
      54                 :     ------------
      55                 :     See op.hpp for the completion/cancellation race handling via the
      56                 :     `registered` atomic. cancel() must complete pending operations (post
      57                 :     them with cancelled flag) so coroutines waiting on them can resume.
      58                 :     close_socket() calls cancel() first to ensure this.
      59                 : 
      60                 :     Impl Lifetime with shared_ptr
      61                 :     -----------------------------
      62                 :     Socket impls use enable_shared_from_this. The service owns impls via
      63                 :     shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
      64                 :     removal. When a user calls close(), we call cancel() which posts pending
      65                 :     ops to the scheduler.
      66                 : 
      67                 :     CRITICAL: The posted ops must keep the impl alive until they complete.
      68                 :     Otherwise the scheduler would process a freed op (use-after-free). The
      69                 :     cancel() method captures shared_from_this() into op.impl_ptr before
      70                 :     posting. When the op completes, impl_ptr is cleared, allowing the impl
      71                 :     to be destroyed if no other references exist.
      72                 : 
      73                 :     Service Ownership
      74                 :     -----------------
      75                 :     select_socket_service owns all socket impls. destroy() removes the
      76                 :     shared_ptr from the map, but the impl may survive if ops still hold
      77                 :     impl_ptr refs. shutdown() closes all sockets and clears the map; any
      78                 :     in-flight ops will complete and release their refs.
      79                 : */
      80                 : 
      81                 : namespace boost::corosio::detail {
      82                 : 
      83                 : /** State for select socket service. */
      84                 : class select_socket_state
      85                 : {
      86                 : public:
      87 HIT         135 :     explicit select_socket_state(select_scheduler& sched) noexcept
      88             135 :         : sched_(sched)
      89                 :     {
      90             135 :     }
      91                 : 
      92                 :     select_scheduler& sched_;
      93                 :     std::mutex mutex_;
      94                 :     intrusive_list<select_socket> socket_list_;
      95                 :     std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
      96                 :         socket_ptrs_;
      97                 : };
      98                 : 
      99                 : /** select socket service implementation.
     100                 : 
     101                 :     Inherits from socket_service to enable runtime polymorphism.
     102                 :     Uses key_type = socket_service for service lookup.
     103                 : */
     104                 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
     105                 : {
     106                 : public:
     107                 :     explicit select_socket_service(capy::execution_context& ctx);
     108                 :     ~select_socket_service() override;
     109                 : 
     110                 :     select_socket_service(select_socket_service const&)            = delete;
     111                 :     select_socket_service& operator=(select_socket_service const&) = delete;
     112                 : 
     113                 :     void shutdown() override;
     114                 : 
     115                 :     io_object::implementation* construct() override;
     116                 :     void destroy(io_object::implementation*) override;
     117                 :     void close(io_object::handle&) override;
     118                 :     std::error_code open_socket(tcp_socket::implementation& impl) override;
     119                 : 
     120           10820 :     select_scheduler& scheduler() const noexcept
     121                 :     {
     122           10820 :         return state_->sched_;
     123                 :     }
     124                 :     void post(select_op* op);
     125                 :     void work_started() noexcept;
     126                 :     void work_finished() noexcept;
     127                 : 
     128                 : private:
     129                 :     std::unique_ptr<select_socket_state> state_;
     130                 : };
     131                 : 
     132                 : // Backward compatibility alias
     133                 : using select_sockets = select_socket_service;
     134                 : 
     135                 : inline void
     136              97 : select_op::canceller::operator()() const noexcept
     137                 : {
     138              97 :     op->cancel();
     139              97 : }
     140                 : 
     141                 : inline void
     142 MIS           0 : select_connect_op::cancel() noexcept
     143                 : {
     144               0 :     if (socket_impl_)
     145               0 :         socket_impl_->cancel_single_op(*this);
     146                 :     else
     147               0 :         request_cancel();
     148               0 : }
     149                 : 
     150                 : inline void
     151 HIT          97 : select_read_op::cancel() noexcept
     152                 : {
     153              97 :     if (socket_impl_)
     154              97 :         socket_impl_->cancel_single_op(*this);
     155                 :     else
     156 MIS           0 :         request_cancel();
     157 HIT          97 : }
     158                 : 
     159                 : inline void
     160 MIS           0 : select_write_op::cancel() noexcept
     161                 : {
     162               0 :     if (socket_impl_)
     163               0 :         socket_impl_->cancel_single_op(*this);
     164                 :     else
     165               0 :         request_cancel();
     166               0 : }
     167                 : 
     168                 : inline void
     169 HIT        3457 : select_connect_op::operator()()
     170                 : {
     171            3457 :     stop_cb.reset();
     172                 : 
     173            3457 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     174                 : 
     175                 :     // Cache endpoints on successful connect
     176            3457 :     if (success && socket_impl_)
     177                 :     {
     178                 :         // Query local endpoint via getsockname (may fail, but remote is always known)
     179            3456 :         endpoint local_ep;
     180            3456 :         sockaddr_in local_addr{};
     181            3456 :         socklen_t local_len = sizeof(local_addr);
     182            3456 :         if (::getsockname(
     183            3456 :                 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     184            3456 :             local_ep = from_sockaddr_in(local_addr);
     185                 :         // Always cache remote endpoint; local may be default if getsockname failed
     186            3456 :         static_cast<select_socket*>(socket_impl_)
     187            3456 :             ->set_endpoints(local_ep, target_endpoint);
     188                 :     }
     189                 : 
     190            3457 :     if (ec_out)
     191                 :     {
     192            3457 :         if (cancelled.load(std::memory_order_acquire))
     193 MIS           0 :             *ec_out = capy::error::canceled;
     194 HIT        3457 :         else if (errn != 0)
     195               1 :             *ec_out = make_err(errn);
     196                 :         else
     197            3456 :             *ec_out = {};
     198                 :     }
     199                 : 
     200            3457 :     if (bytes_out)
     201 MIS           0 :         *bytes_out = bytes_transferred;
     202                 : 
     203                 :     // Move to stack before destroying the frame
     204 HIT        3457 :     capy::executor_ref saved_ex(ex);
     205            3457 :     std::coroutine_handle<> saved_h(h);
     206            3457 :     impl_ptr.reset();
     207            3457 :     dispatch_coro(saved_ex, saved_h).resume();
     208            3457 : }
     209                 : 
     210           10388 : inline select_socket::select_socket(select_socket_service& svc) noexcept
     211           10388 :     : svc_(svc)
     212                 : {
     213           10388 : }
     214                 : 
     215                 : inline std::coroutine_handle<>
     216            3457 : select_socket::connect(
     217                 :     std::coroutine_handle<> h,
     218                 :     capy::executor_ref ex,
     219                 :     endpoint ep,
     220                 :     std::stop_token token,
     221                 :     std::error_code* ec)
     222                 : {
     223            3457 :     auto& op = conn_;
     224            3457 :     op.reset();
     225            3457 :     op.h               = h;
     226            3457 :     op.ex              = ex;
     227            3457 :     op.ec_out          = ec;
     228            3457 :     op.fd              = fd_;
     229            3457 :     op.target_endpoint = ep; // Store target for endpoint caching
     230            3457 :     op.start(token, this);
     231                 : 
     232            3457 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     233                 :     int result =
     234            3457 :         ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
     235                 : 
     236            3457 :     if (result == 0)
     237                 :     {
     238                 :         // Sync success - cache endpoints immediately
     239 MIS           0 :         sockaddr_in local_addr{};
     240               0 :         socklen_t local_len = sizeof(local_addr);
     241               0 :         if (::getsockname(
     242               0 :                 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     243               0 :             local_endpoint_ = detail::from_sockaddr_in(local_addr);
     244               0 :         remote_endpoint_ = ep;
     245                 : 
     246               0 :         op.complete(0, 0);
     247               0 :         op.impl_ptr = shared_from_this();
     248               0 :         svc_.post(&op);
     249                 :         // completion is always posted to scheduler queue, never inline.
     250               0 :         return std::noop_coroutine();
     251                 :     }
     252                 : 
     253 HIT        3457 :     if (errno == EINPROGRESS)
     254                 :     {
     255            3457 :         svc_.work_started();
     256            3457 :         op.impl_ptr = shared_from_this();
     257                 : 
     258                 :         // Set registering BEFORE register_fd to close the race window where
     259                 :         // reactor sees an event before we set registered. The reactor treats
     260                 :         // registering the same as registered when claiming the op.
     261            3457 :         op.registered.store(
     262                 :             select_registration_state::registering, std::memory_order_release);
     263            3457 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     264                 : 
     265                 :         // Transition to registered. If this fails, reactor or cancel already
     266                 :         // claimed the op (state is now unregistered), so we're done. However,
     267                 :         // we must still deregister the fd because cancel's deregister_fd may
     268                 :         // have run before our register_fd, leaving the fd orphaned.
     269            3457 :         auto expected = select_registration_state::registering;
     270            3457 :         if (!op.registered.compare_exchange_strong(
     271                 :                 expected, select_registration_state::registered,
     272                 :                 std::memory_order_acq_rel))
     273                 :         {
     274 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     275                 :             // completion is always posted to scheduler queue, never inline.
     276               0 :             return std::noop_coroutine();
     277                 :         }
     278                 : 
     279                 :         // If cancelled was set before we registered, handle it now.
     280 HIT        3457 :         if (op.cancelled.load(std::memory_order_acquire))
     281                 :         {
     282 MIS           0 :             auto prev = op.registered.exchange(
     283                 :                 select_registration_state::unregistered,
     284                 :                 std::memory_order_acq_rel);
     285               0 :             if (prev != select_registration_state::unregistered)
     286                 :             {
     287               0 :                 svc_.scheduler().deregister_fd(
     288                 :                     fd_, select_scheduler::event_write);
     289               0 :                 op.impl_ptr = shared_from_this();
     290               0 :                 svc_.post(&op);
     291               0 :                 svc_.work_finished();
     292                 :             }
     293                 :         }
     294                 :         // completion is always posted to scheduler queue, never inline.
     295 HIT        3457 :         return std::noop_coroutine();
     296                 :     }
     297                 : 
     298 MIS           0 :     op.complete(errno, 0);
     299               0 :     op.impl_ptr = shared_from_this();
     300               0 :     svc_.post(&op);
     301                 :     // completion is always posted to scheduler queue, never inline.
     302               0 :     return std::noop_coroutine();
     303                 : }
     304                 : 
     305                 : inline std::coroutine_handle<>
     306 HIT       59829 : select_socket::read_some(
     307                 :     std::coroutine_handle<> h,
     308                 :     capy::executor_ref ex,
     309                 :     io_buffer_param param,
     310                 :     std::stop_token token,
     311                 :     std::error_code* ec,
     312                 :     std::size_t* bytes_out)
     313                 : {
     314           59829 :     auto& op = rd_;
     315           59829 :     op.reset();
     316           59829 :     op.h         = h;
     317           59829 :     op.ex        = ex;
     318           59829 :     op.ec_out    = ec;
     319           59829 :     op.bytes_out = bytes_out;
     320           59829 :     op.fd        = fd_;
     321           59829 :     op.start(token, this);
     322                 : 
     323           59829 :     capy::mutable_buffer bufs[select_read_op::max_buffers];
     324           59829 :     op.iovec_count =
     325           59829 :         static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
     326                 : 
     327           59829 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     328                 :     {
     329               1 :         op.empty_buffer_read = true;
     330               1 :         op.complete(0, 0);
     331               1 :         op.impl_ptr = shared_from_this();
     332               1 :         svc_.post(&op);
     333               1 :         return std::noop_coroutine();
     334                 :     }
     335                 : 
     336          119656 :     for (int i = 0; i < op.iovec_count; ++i)
     337                 :     {
     338           59828 :         op.iovecs[i].iov_base = bufs[i].data();
     339           59828 :         op.iovecs[i].iov_len  = bufs[i].size();
     340                 :     }
     341                 : 
     342           59828 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     343                 : 
     344           59828 :     if (n > 0)
     345                 :     {
     346           59542 :         op.complete(0, static_cast<std::size_t>(n));
     347           59542 :         op.impl_ptr = shared_from_this();
     348           59542 :         svc_.post(&op);
     349           59542 :         return std::noop_coroutine();
     350                 :     }
     351                 : 
     352             286 :     if (n == 0)
     353                 :     {
     354               5 :         op.complete(0, 0);
     355               5 :         op.impl_ptr = shared_from_this();
     356               5 :         svc_.post(&op);
     357               5 :         return std::noop_coroutine();
     358                 :     }
     359                 : 
     360             281 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     361                 :     {
     362             281 :         svc_.work_started();
     363             281 :         op.impl_ptr = shared_from_this();
     364                 : 
     365                 :         // Set registering BEFORE register_fd to close the race window where
     366                 :         // reactor sees an event before we set registered.
     367             281 :         op.registered.store(
     368                 :             select_registration_state::registering, std::memory_order_release);
     369             281 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     370                 : 
     371                 :         // Transition to registered. If this fails, reactor or cancel already
     372                 :         // claimed the op (state is now unregistered), so we're done. However,
     373                 :         // we must still deregister the fd because cancel's deregister_fd may
     374                 :         // have run before our register_fd, leaving the fd orphaned.
     375             281 :         auto expected = select_registration_state::registering;
     376             281 :         if (!op.registered.compare_exchange_strong(
     377                 :                 expected, select_registration_state::registered,
     378                 :                 std::memory_order_acq_rel))
     379                 :         {
     380 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     381               0 :             return std::noop_coroutine();
     382                 :         }
     383                 : 
     384                 :         // If cancelled was set before we registered, handle it now.
     385 HIT         281 :         if (op.cancelled.load(std::memory_order_acquire))
     386                 :         {
     387 MIS           0 :             auto prev = op.registered.exchange(
     388                 :                 select_registration_state::unregistered,
     389                 :                 std::memory_order_acq_rel);
     390               0 :             if (prev != select_registration_state::unregistered)
     391                 :             {
     392               0 :                 svc_.scheduler().deregister_fd(
     393                 :                     fd_, select_scheduler::event_read);
     394               0 :                 op.impl_ptr = shared_from_this();
     395               0 :                 svc_.post(&op);
     396               0 :                 svc_.work_finished();
     397                 :             }
     398                 :         }
     399 HIT         281 :         return std::noop_coroutine();
     400                 :     }
     401                 : 
     402 MIS           0 :     op.complete(errno, 0);
     403               0 :     op.impl_ptr = shared_from_this();
     404               0 :     svc_.post(&op);
     405               0 :     return std::noop_coroutine();
     406                 : }
     407                 : 
     408                 : inline std::coroutine_handle<>
     409 HIT       59667 : select_socket::write_some(
     410                 :     std::coroutine_handle<> h,
     411                 :     capy::executor_ref ex,
     412                 :     io_buffer_param param,
     413                 :     std::stop_token token,
     414                 :     std::error_code* ec,
     415                 :     std::size_t* bytes_out)
     416                 : {
     417           59667 :     auto& op = wr_;
     418           59667 :     op.reset();
     419           59667 :     op.h         = h;
     420           59667 :     op.ex        = ex;
     421           59667 :     op.ec_out    = ec;
     422           59667 :     op.bytes_out = bytes_out;
     423           59667 :     op.fd        = fd_;
     424           59667 :     op.start(token, this);
     425                 : 
     426           59667 :     capy::mutable_buffer bufs[select_write_op::max_buffers];
     427           59667 :     op.iovec_count =
     428           59667 :         static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
     429                 : 
     430           59667 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     431                 :     {
     432               1 :         op.complete(0, 0);
     433               1 :         op.impl_ptr = shared_from_this();
     434               1 :         svc_.post(&op);
     435               1 :         return std::noop_coroutine();
     436                 :     }
     437                 : 
     438          119332 :     for (int i = 0; i < op.iovec_count; ++i)
     439                 :     {
     440           59666 :         op.iovecs[i].iov_base = bufs[i].data();
     441           59666 :         op.iovecs[i].iov_len  = bufs[i].size();
     442                 :     }
     443                 : 
     444           59666 :     msghdr msg{};
     445           59666 :     msg.msg_iov    = op.iovecs;
     446           59666 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     447                 : 
     448           59666 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     449                 : 
     450           59666 :     if (n > 0)
     451                 :     {
     452           59665 :         op.complete(0, static_cast<std::size_t>(n));
     453           59665 :         op.impl_ptr = shared_from_this();
     454           59665 :         svc_.post(&op);
     455           59665 :         return std::noop_coroutine();
     456                 :     }
     457                 : 
     458               1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     459                 :     {
     460 MIS           0 :         svc_.work_started();
     461               0 :         op.impl_ptr = shared_from_this();
     462                 : 
     463                 :         // Set registering BEFORE register_fd to close the race window where
     464                 :         // reactor sees an event before we set registered.
     465               0 :         op.registered.store(
     466                 :             select_registration_state::registering, std::memory_order_release);
     467               0 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     468                 : 
     469                 :         // Transition to registered. If this fails, reactor or cancel already
     470                 :         // claimed the op (state is now unregistered), so we're done. However,
     471                 :         // we must still deregister the fd because cancel's deregister_fd may
     472                 :         // have run before our register_fd, leaving the fd orphaned.
     473               0 :         auto expected = select_registration_state::registering;
     474               0 :         if (!op.registered.compare_exchange_strong(
     475                 :                 expected, select_registration_state::registered,
     476                 :                 std::memory_order_acq_rel))
     477                 :         {
     478               0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     479               0 :             return std::noop_coroutine();
     480                 :         }
     481                 : 
     482                 :         // If cancelled was set before we registered, handle it now.
     483               0 :         if (op.cancelled.load(std::memory_order_acquire))
     484                 :         {
     485               0 :             auto prev = op.registered.exchange(
     486                 :                 select_registration_state::unregistered,
     487                 :                 std::memory_order_acq_rel);
     488               0 :             if (prev != select_registration_state::unregistered)
     489                 :             {
     490               0 :                 svc_.scheduler().deregister_fd(
     491                 :                     fd_, select_scheduler::event_write);
     492               0 :                 op.impl_ptr = shared_from_this();
     493               0 :                 svc_.post(&op);
     494               0 :                 svc_.work_finished();
     495                 :             }
     496                 :         }
     497               0 :         return std::noop_coroutine();
     498                 :     }
     499                 : 
     500 HIT           1 :     op.complete(errno ? errno : EIO, 0);
     501               1 :     op.impl_ptr = shared_from_this();
     502               1 :     svc_.post(&op);
     503               1 :     return std::noop_coroutine();
     504                 : }
     505                 : 
     506                 : inline std::error_code
     507               3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
     508                 : {
     509                 :     int how;
     510               3 :     switch (what)
     511                 :     {
     512               1 :     case tcp_socket::shutdown_receive:
     513               1 :         how = SHUT_RD;
     514               1 :         break;
     515               1 :     case tcp_socket::shutdown_send:
     516               1 :         how = SHUT_WR;
     517               1 :         break;
     518               1 :     case tcp_socket::shutdown_both:
     519               1 :         how = SHUT_RDWR;
     520               1 :         break;
     521 MIS           0 :     default:
     522               0 :         return make_err(EINVAL);
     523                 :     }
     524 HIT           3 :     if (::shutdown(fd_, how) != 0)
     525 MIS           0 :         return make_err(errno);
     526 HIT           3 :     return {};
     527                 : }
     528                 : 
     529                 : inline std::error_code
     530               5 : select_socket::set_no_delay(bool value) noexcept
     531                 : {
     532               5 :     int flag = value ? 1 : 0;
     533               5 :     if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
     534 MIS           0 :         return make_err(errno);
     535 HIT           5 :     return {};
     536                 : }
     537                 : 
     538                 : inline bool
     539               5 : select_socket::no_delay(std::error_code& ec) const noexcept
     540                 : {
     541               5 :     int flag      = 0;
     542               5 :     socklen_t len = sizeof(flag);
     543               5 :     if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
     544                 :     {
     545 MIS           0 :         ec = make_err(errno);
     546               0 :         return false;
     547                 :     }
     548 HIT           5 :     ec = {};
     549               5 :     return flag != 0;
     550                 : }
     551                 : 
     552                 : inline std::error_code
     553               4 : select_socket::set_keep_alive(bool value) noexcept
     554                 : {
     555               4 :     int flag = value ? 1 : 0;
     556               4 :     if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
     557 MIS           0 :         return make_err(errno);
     558 HIT           4 :     return {};
     559                 : }
     560                 : 
     561                 : inline bool
     562               4 : select_socket::keep_alive(std::error_code& ec) const noexcept
     563                 : {
     564               4 :     int flag      = 0;
     565               4 :     socklen_t len = sizeof(flag);
     566               4 :     if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
     567                 :     {
     568 MIS           0 :         ec = make_err(errno);
     569               0 :         return false;
     570                 :     }
     571 HIT           4 :     ec = {};
     572               4 :     return flag != 0;
     573                 : }
     574                 : 
     575                 : inline std::error_code
     576               1 : select_socket::set_receive_buffer_size(int size) noexcept
     577                 : {
     578               1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
     579 MIS           0 :         return make_err(errno);
     580 HIT           1 :     return {};
     581                 : }
     582                 : 
     583                 : inline int
     584               3 : select_socket::receive_buffer_size(std::error_code& ec) const noexcept
     585                 : {
     586               3 :     int size      = 0;
     587               3 :     socklen_t len = sizeof(size);
     588               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
     589                 :     {
     590 MIS           0 :         ec = make_err(errno);
     591               0 :         return 0;
     592                 :     }
     593 HIT           3 :     ec = {};
     594               3 :     return size;
     595                 : }
     596                 : 
     597                 : inline std::error_code
     598               1 : select_socket::set_send_buffer_size(int size) noexcept
     599                 : {
     600               1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
     601 MIS           0 :         return make_err(errno);
     602 HIT           1 :     return {};
     603                 : }
     604                 : 
     605                 : inline int
     606               3 : select_socket::send_buffer_size(std::error_code& ec) const noexcept
     607                 : {
     608               3 :     int size      = 0;
     609               3 :     socklen_t len = sizeof(size);
     610               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
     611                 :     {
     612 MIS           0 :         ec = make_err(errno);
     613               0 :         return 0;
     614                 :     }
     615 HIT           3 :     ec = {};
     616               3 :     return size;
     617                 : }
     618                 : 
     619                 : inline std::error_code
     620               6 : select_socket::set_linger(bool enabled, int timeout) noexcept
     621                 : {
     622               6 :     if (timeout < 0)
     623               1 :         return make_err(EINVAL);
     624                 :     struct ::linger lg;
     625               5 :     lg.l_onoff  = enabled ? 1 : 0;
     626               5 :     lg.l_linger = timeout;
     627               5 :     if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
     628 MIS           0 :         return make_err(errno);
     629 HIT           5 :     return {};
     630                 : }
     631                 : 
     632                 : inline tcp_socket::linger_options
     633               3 : select_socket::linger(std::error_code& ec) const noexcept
     634                 : {
     635               3 :     struct ::linger lg{};
     636               3 :     socklen_t len = sizeof(lg);
     637               3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
     638                 :     {
     639 MIS           0 :         ec = make_err(errno);
     640               0 :         return {};
     641                 :     }
     642 HIT           3 :     ec = {};
     643               3 :     return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
     644                 : }
     645                 : 
     646                 : inline void
     647             177 : select_socket::cancel() noexcept
     648                 : {
     649             177 :     auto self = weak_from_this().lock();
     650             177 :     if (!self)
     651 MIS           0 :         return;
     652                 : 
     653 HIT         531 :     auto cancel_op = [this, &self](select_op& op, int events) {
     654             531 :         auto prev = op.registered.exchange(
     655                 :             select_registration_state::unregistered, std::memory_order_acq_rel);
     656             531 :         op.request_cancel();
     657             531 :         if (prev != select_registration_state::unregistered)
     658                 :         {
     659              92 :             svc_.scheduler().deregister_fd(fd_, events);
     660              92 :             op.impl_ptr = self;
     661              92 :             svc_.post(&op);
     662              92 :             svc_.work_finished();
     663                 :         }
     664             708 :     };
     665                 : 
     666             177 :     cancel_op(conn_, select_scheduler::event_write);
     667             177 :     cancel_op(rd_, select_scheduler::event_read);
     668             177 :     cancel_op(wr_, select_scheduler::event_write);
     669             177 : }
     670                 : 
     671                 : inline void
     672              97 : select_socket::cancel_single_op(select_op& op) noexcept
     673                 : {
     674              97 :     auto self = weak_from_this().lock();
     675              97 :     if (!self)
     676 MIS           0 :         return;
     677                 : 
     678                 :     // Called from stop_token callback to cancel a specific pending operation.
     679 HIT          97 :     auto prev = op.registered.exchange(
     680                 :         select_registration_state::unregistered, std::memory_order_acq_rel);
     681              97 :     op.request_cancel();
     682                 : 
     683              97 :     if (prev != select_registration_state::unregistered)
     684                 :     {
     685                 :         // Determine which event type to deregister
     686              65 :         int events = 0;
     687              65 :         if (&op == &conn_ || &op == &wr_)
     688 MIS           0 :             events = select_scheduler::event_write;
     689 HIT          65 :         else if (&op == &rd_)
     690              65 :             events = select_scheduler::event_read;
     691                 : 
     692              65 :         svc_.scheduler().deregister_fd(fd_, events);
     693                 : 
     694              65 :         op.impl_ptr = self;
     695              65 :         svc_.post(&op);
     696              65 :         svc_.work_finished();
     697                 :     }
     698              97 : }
     699                 : 
     700                 : inline void
     701           31168 : select_socket::close_socket() noexcept
     702                 : {
     703           31168 :     auto self = weak_from_this().lock();
     704           31168 :     if (self)
     705                 :     {
     706           93504 :         auto cancel_op = [this, &self](select_op& op, int events) {
     707           93504 :             auto prev = op.registered.exchange(
     708                 :                 select_registration_state::unregistered,
     709                 :                 std::memory_order_acq_rel);
     710           93504 :             op.request_cancel();
     711           93504 :             if (prev != select_registration_state::unregistered)
     712                 :             {
     713               1 :                 svc_.scheduler().deregister_fd(fd_, events);
     714               1 :                 op.impl_ptr = self;
     715               1 :                 svc_.post(&op);
     716               1 :                 svc_.work_finished();
     717                 :             }
     718          124672 :         };
     719                 : 
     720           31168 :         cancel_op(conn_, select_scheduler::event_write);
     721           31168 :         cancel_op(rd_, select_scheduler::event_read);
     722           31168 :         cancel_op(wr_, select_scheduler::event_write);
     723                 :     }
     724                 : 
     725           31168 :     if (fd_ >= 0)
     726                 :     {
     727            6924 :         svc_.scheduler().deregister_fd(
     728                 :             fd_, select_scheduler::event_read | select_scheduler::event_write);
     729            6924 :         ::close(fd_);
     730            6924 :         fd_ = -1;
     731                 :     }
     732                 : 
     733           31168 :     local_endpoint_  = endpoint{};
     734           31168 :     remote_endpoint_ = endpoint{};
     735           31168 : }
     736                 : 
     737             135 : inline select_socket_service::select_socket_service(
     738             135 :     capy::execution_context& ctx)
     739             135 :     : state_(
     740                 :           std::make_unique<select_socket_state>(
     741             135 :               ctx.use_service<select_scheduler>()))
     742                 : {
     743             135 : }
     744                 : 
     745             270 : inline select_socket_service::~select_socket_service() {}
     746                 : 
     747                 : inline void
     748             135 : select_socket_service::shutdown()
     749                 : {
     750             135 :     std::lock_guard lock(state_->mutex_);
     751                 : 
     752             135 :     while (auto* impl = state_->socket_list_.pop_front())
     753 MIS           0 :         impl->close_socket();
     754                 : 
     755                 :     // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
     756                 :     // drains completed_ops_, calling destroy() on each queued op. Letting
     757                 :     // ~state_ release the ptrs (during service destruction, after scheduler
     758                 :     // shutdown) keeps every impl alive until all ops have been drained.
     759 HIT         135 : }
     760                 : 
     761                 : inline io_object::implementation*
     762           10388 : select_socket_service::construct()
     763                 : {
     764           10388 :     auto impl = std::make_shared<select_socket>(*this);
     765           10388 :     auto* raw = impl.get();
     766                 : 
     767                 :     {
     768           10388 :         std::lock_guard lock(state_->mutex_);
     769           10388 :         state_->socket_list_.push_back(raw);
     770           10388 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     771           10388 :     }
     772                 : 
     773           10388 :     return raw;
     774           10388 : }
     775                 : 
     776                 : inline void
     777           10388 : select_socket_service::destroy(io_object::implementation* impl)
     778                 : {
     779           10388 :     auto* select_impl = static_cast<select_socket*>(impl);
     780           10388 :     select_impl->close_socket();
     781           10388 :     std::lock_guard lock(state_->mutex_);
     782           10388 :     state_->socket_list_.remove(select_impl);
     783           10388 :     state_->socket_ptrs_.erase(select_impl);
     784           10388 : }
     785                 : 
     786                 : inline std::error_code
     787            3468 : select_socket_service::open_socket(tcp_socket::implementation& impl)
     788                 : {
     789            3468 :     auto* select_impl = static_cast<select_socket*>(&impl);
     790            3468 :     select_impl->close_socket();
     791                 : 
     792            3468 :     int fd = ::socket(AF_INET, SOCK_STREAM, 0);
     793            3468 :     if (fd < 0)
     794 MIS           0 :         return make_err(errno);
     795                 : 
     796                 :     // Set non-blocking and close-on-exec
     797 HIT        3468 :     int flags = ::fcntl(fd, F_GETFL, 0);
     798            3468 :     if (flags == -1)
     799                 :     {
     800 MIS           0 :         int errn = errno;
     801               0 :         ::close(fd);
     802               0 :         return make_err(errn);
     803                 :     }
     804 HIT        3468 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     805                 :     {
     806 MIS           0 :         int errn = errno;
     807               0 :         ::close(fd);
     808               0 :         return make_err(errn);
     809                 :     }
     810 HIT        3468 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     811                 :     {
     812 MIS           0 :         int errn = errno;
     813               0 :         ::close(fd);
     814               0 :         return make_err(errn);
     815                 :     }
     816                 : 
     817                 :     // Check fd is within select() limits
     818 HIT        3468 :     if (fd >= FD_SETSIZE)
     819                 :     {
     820 MIS           0 :         ::close(fd);
     821               0 :         return make_err(EMFILE); // Too many open files
     822                 :     }
     823                 : 
     824 HIT        3468 :     select_impl->fd_ = fd;
     825            3468 :     return {};
     826                 : }
     827                 : 
     828                 : inline void
     829           17312 : select_socket_service::close(io_object::handle& h)
     830                 : {
     831           17312 :     static_cast<select_socket*>(h.get())->close_socket();
     832           17312 : }
     833                 : 
     834                 : inline void
     835          119373 : select_socket_service::post(select_op* op)
     836                 : {
     837          119373 :     state_->sched_.post(op);
     838          119373 : }
     839                 : 
     840                 : inline void
     841            3738 : select_socket_service::work_started() noexcept
     842                 : {
     843            3738 :     state_->sched_.work_started();
     844            3738 : }
     845                 : 
     846                 : inline void
     847             158 : select_socket_service::work_finished() noexcept
     848                 : {
     849             158 :     state_->sched_.work_finished();
     850             158 : }
     851                 : 
     852                 : } // namespace boost::corosio::detail
     853                 : 
     854                 : #endif // BOOST_COROSIO_HAS_SELECT
     855                 : 
     856                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
        

Generated by: LCOV version 2.3