LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 81.4 % 237 193 44
Test Date: 2026-02-17 21:42:07 Functions: 100.0 % 21 21

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
      23                 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
      24                 : 
      25                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/make_err.hpp>
      28                 : 
      29                 : #include <memory>
      30                 : #include <mutex>
      31                 : #include <unordered_map>
      32                 : #include <utility>
      33                 : 
      34                 : #include <errno.h>
      35                 : #include <netinet/in.h>
      36                 : #include <sys/epoll.h>
      37                 : #include <sys/socket.h>
      38                 : #include <unistd.h>
      39                 : 
      40                 : namespace boost::corosio::detail {
      41                 : 
      42                 : /** State for epoll acceptor service. */
      43                 : class epoll_acceptor_state
      44                 : {
      45                 : public:
      46 HIT         205 :     explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
      47             205 :         : sched_(sched)
      48                 :     {
      49             205 :     }
      50                 : 
      51                 :     epoll_scheduler& sched_;
      52                 :     std::mutex mutex_;
      53                 :     intrusive_list<epoll_acceptor> acceptor_list_;
      54                 :     std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
      55                 :         acceptor_ptrs_;
      56                 : };
      57                 : 
      58                 : /** epoll acceptor service implementation.
      59                 : 
      60                 :     Inherits from acceptor_service to enable runtime polymorphism.
      61                 :     Uses key_type = acceptor_service for service lookup.
      62                 : */
      63                 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
      64                 : {
      65                 : public:
      66                 :     explicit epoll_acceptor_service(capy::execution_context& ctx);
      67                 :     ~epoll_acceptor_service() override;
      68                 : 
      69                 :     epoll_acceptor_service(epoll_acceptor_service const&)            = delete;
      70                 :     epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
      71                 : 
      72                 :     void shutdown() override;
      73                 : 
      74                 :     io_object::implementation* construct() override;
      75                 :     void destroy(io_object::implementation*) override;
      76                 :     void close(io_object::handle&) override;
      77                 :     std::error_code open_acceptor(
      78                 :         tcp_acceptor::implementation& impl, endpoint ep, int backlog) override;
      79                 : 
      80            4871 :     epoll_scheduler& scheduler() const noexcept
      81                 :     {
      82            4871 :         return state_->sched_;
      83                 :     }
      84                 :     void post(epoll_op* op);
      85                 :     void work_started() noexcept;
      86                 :     void work_finished() noexcept;
      87                 : 
      88                 :     /** Get the socket service for creating peer sockets during accept. */
      89                 :     epoll_socket_service* socket_service() const noexcept;
      90                 : 
      91                 : private:
      92                 :     capy::execution_context& ctx_;
      93                 :     std::unique_ptr<epoll_acceptor_state> state_;
      94                 : };
      95                 : 
      96                 : //--------------------------------------------------------------------------
      97                 : //
      98                 : // Implementation
      99                 : //
     100                 : //--------------------------------------------------------------------------
     101                 : 
     102                 : inline void
     103               6 : epoll_accept_op::cancel() noexcept
     104                 : {
     105               6 :     if (acceptor_impl_)
     106               6 :         acceptor_impl_->cancel_single_op(*this);
     107                 :     else
     108 MIS           0 :         request_cancel();
     109 HIT           6 : }
     110                 : 
     111                 : inline void
     112            4741 : epoll_accept_op::operator()()
     113                 : {
     114            4741 :     stop_cb.reset();
     115                 : 
     116            4741 :     static_cast<epoll_acceptor*>(acceptor_impl_)
     117            4741 :         ->service()
     118            4741 :         .scheduler()
     119            4741 :         .reset_inline_budget();
     120                 : 
     121            4741 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     122                 : 
     123            4741 :     if (cancelled.load(std::memory_order_acquire))
     124               9 :         *ec_out = capy::error::canceled;
     125            4732 :     else if (errn != 0)
     126 MIS           0 :         *ec_out = make_err(errn);
     127                 :     else
     128 HIT        4732 :         *ec_out = {};
     129                 : 
     130                 :     // Set up the peer socket on success
     131            4741 :     if (success && accepted_fd >= 0 && acceptor_impl_)
     132                 :     {
     133            4732 :         auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
     134            4732 :                                ->service()
     135            4732 :                                .socket_service();
     136            4732 :         if (socket_svc)
     137                 :         {
     138            4732 :             auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
     139            4732 :             impl.set_socket(accepted_fd);
     140                 : 
     141            4732 :             impl.desc_state_.fd = accepted_fd;
     142                 :             {
     143            4732 :                 std::lock_guard lock(impl.desc_state_.mutex);
     144            4732 :                 impl.desc_state_.read_op    = nullptr;
     145            4732 :                 impl.desc_state_.write_op   = nullptr;
     146            4732 :                 impl.desc_state_.connect_op = nullptr;
     147            4732 :             }
     148            4732 :             socket_svc->scheduler().register_descriptor(
     149                 :                 accepted_fd, &impl.desc_state_);
     150                 : 
     151            4732 :             impl.set_endpoints(
     152            4732 :                 static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
     153            4732 :                 from_sockaddr_in(peer_addr));
     154                 : 
     155            4732 :             if (impl_out)
     156            4732 :                 *impl_out = &impl;
     157            4732 :             accepted_fd = -1;
     158                 :         }
     159                 :         else
     160                 :         {
     161                 :             // No socket service — treat as error
     162 MIS           0 :             *ec_out = make_err(ENOENT);
     163               0 :             success = false;
     164                 :         }
     165                 :     }
     166                 : 
     167 HIT        4741 :     if (!success || !acceptor_impl_)
     168                 :     {
     169               9 :         if (accepted_fd >= 0)
     170                 :         {
     171 MIS           0 :             ::close(accepted_fd);
     172               0 :             accepted_fd = -1;
     173                 :         }
     174 HIT           9 :         if (impl_out)
     175               9 :             *impl_out = nullptr;
     176                 :     }
     177                 : 
     178                 :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     179            4741 :     capy::executor_ref saved_ex(ex);
     180            4741 :     std::coroutine_handle<> saved_h(h);
     181            4741 :     auto prevent_premature_destruction = std::move(impl_ptr);
     182            4741 :     dispatch_coro(saved_ex, saved_h).resume();
     183            4741 : }
     184                 : 
     185              67 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
     186              67 :     : svc_(svc)
     187                 : {
     188              67 : }
     189                 : 
     190                 : inline std::coroutine_handle<>
     191            4741 : epoll_acceptor::accept(
     192                 :     std::coroutine_handle<> h,
     193                 :     capy::executor_ref ex,
     194                 :     std::stop_token token,
     195                 :     std::error_code* ec,
     196                 :     io_object::implementation** impl_out)
     197                 : {
     198            4741 :     auto& op = acc_;
     199            4741 :     op.reset();
     200            4741 :     op.h        = h;
     201            4741 :     op.ex       = ex;
     202            4741 :     op.ec_out   = ec;
     203            4741 :     op.impl_out = impl_out;
     204            4741 :     op.fd       = fd_;
     205            4741 :     op.start(token, this);
     206                 : 
     207            4741 :     sockaddr_in addr{};
     208            4741 :     socklen_t addrlen = sizeof(addr);
     209                 :     int accepted;
     210                 :     do
     211                 :     {
     212            4741 :         accepted = ::accept4(
     213                 :             fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen,
     214                 :             SOCK_NONBLOCK | SOCK_CLOEXEC);
     215                 :     }
     216            4741 :     while (accepted < 0 && errno == EINTR);
     217                 : 
     218            4741 :     if (accepted >= 0)
     219                 :     {
     220                 :         {
     221               2 :             std::lock_guard lock(desc_state_.mutex);
     222               2 :             desc_state_.read_ready = false;
     223               2 :         }
     224                 : 
     225               2 :         if (svc_.scheduler().try_consume_inline_budget())
     226                 :         {
     227 MIS           0 :             auto* socket_svc = svc_.socket_service();
     228               0 :             if (socket_svc)
     229                 :             {
     230                 :                 auto& impl =
     231               0 :                     static_cast<epoll_socket&>(*socket_svc->construct());
     232               0 :                 impl.set_socket(accepted);
     233                 : 
     234               0 :                 impl.desc_state_.fd = accepted;
     235                 :                 {
     236               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     237               0 :                     impl.desc_state_.read_op    = nullptr;
     238               0 :                     impl.desc_state_.write_op   = nullptr;
     239               0 :                     impl.desc_state_.connect_op = nullptr;
     240               0 :                 }
     241               0 :                 socket_svc->scheduler().register_descriptor(
     242                 :                     accepted, &impl.desc_state_);
     243                 : 
     244               0 :                 impl.set_endpoints(local_endpoint_, from_sockaddr_in(addr));
     245                 : 
     246               0 :                 *ec = {};
     247               0 :                 if (impl_out)
     248               0 :                     *impl_out = &impl;
     249                 :             }
     250                 :             else
     251                 :             {
     252               0 :                 ::close(accepted);
     253               0 :                 *ec = make_err(ENOENT);
     254               0 :                 if (impl_out)
     255               0 :                     *impl_out = nullptr;
     256                 :             }
     257               0 :             return dispatch_coro(ex, h);
     258                 :         }
     259                 : 
     260 HIT           2 :         op.accepted_fd = accepted;
     261               2 :         op.peer_addr   = addr;
     262               2 :         op.complete(0, 0);
     263               2 :         op.impl_ptr = shared_from_this();
     264               2 :         svc_.post(&op);
     265               2 :         return std::noop_coroutine();
     266                 :     }
     267                 : 
     268            4739 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     269                 :     {
     270            4739 :         op.impl_ptr = shared_from_this();
     271            4739 :         svc_.work_started();
     272                 : 
     273            4739 :         std::lock_guard lock(desc_state_.mutex);
     274            4739 :         bool io_done = false;
     275            4739 :         if (desc_state_.read_ready)
     276                 :         {
     277 MIS           0 :             desc_state_.read_ready = false;
     278               0 :             op.perform_io();
     279               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     280               0 :             if (!io_done)
     281               0 :                 op.errn = 0;
     282                 :         }
     283                 : 
     284 HIT        4739 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     285                 :         {
     286 MIS           0 :             svc_.post(&op);
     287               0 :             svc_.work_finished();
     288                 :         }
     289                 :         else
     290                 :         {
     291 HIT        4739 :             desc_state_.read_op = &op;
     292                 :         }
     293            4739 :         return std::noop_coroutine();
     294            4739 :     }
     295                 : 
     296 MIS           0 :     op.complete(errno, 0);
     297               0 :     op.impl_ptr = shared_from_this();
     298               0 :     svc_.post(&op);
     299                 :     // completion is always posted to scheduler queue, never inline.
     300               0 :     return std::noop_coroutine();
     301                 : }
     302                 : 
     303                 : inline void
     304 HIT           1 : epoll_acceptor::cancel() noexcept
     305                 : {
     306               1 :     cancel_single_op(acc_);
     307               1 : }
     308                 : 
     309                 : inline void
     310               7 : epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
     311                 : {
     312               7 :     auto self = weak_from_this().lock();
     313               7 :     if (!self)
     314 MIS           0 :         return;
     315                 : 
     316 HIT           7 :     op.request_cancel();
     317                 : 
     318               7 :     epoll_op* claimed = nullptr;
     319                 :     {
     320               7 :         std::lock_guard lock(desc_state_.mutex);
     321               7 :         if (desc_state_.read_op == &op)
     322               7 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     323               7 :     }
     324               7 :     if (claimed)
     325                 :     {
     326               7 :         op.impl_ptr = self;
     327               7 :         svc_.post(&op);
     328               7 :         svc_.work_finished();
     329                 :     }
     330               7 : }
     331                 : 
     332                 : inline void
     333             264 : epoll_acceptor::close_socket() noexcept
     334                 : {
     335             264 :     auto self = weak_from_this().lock();
     336             264 :     if (self)
     337                 :     {
     338             264 :         acc_.request_cancel();
     339                 : 
     340             264 :         epoll_op* claimed = nullptr;
     341                 :         {
     342             264 :             std::lock_guard lock(desc_state_.mutex);
     343             264 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     344             264 :             desc_state_.read_ready  = false;
     345             264 :             desc_state_.write_ready = false;
     346             264 :         }
     347                 : 
     348             264 :         if (claimed)
     349                 :         {
     350               2 :             acc_.impl_ptr = self;
     351               2 :             svc_.post(&acc_);
     352               2 :             svc_.work_finished();
     353                 :         }
     354                 : 
     355             264 :         if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     356 MIS           0 :             desc_state_.impl_ref_ = self;
     357                 :     }
     358                 : 
     359 HIT         264 :     if (fd_ >= 0)
     360                 :     {
     361              64 :         if (desc_state_.registered_events != 0)
     362              64 :             svc_.scheduler().deregister_descriptor(fd_);
     363              64 :         ::close(fd_);
     364              64 :         fd_ = -1;
     365                 :     }
     366                 : 
     367             264 :     desc_state_.fd                = -1;
     368             264 :     desc_state_.registered_events = 0;
     369                 : 
     370             264 :     local_endpoint_ = endpoint{};
     371             264 : }
     372                 : 
     373             205 : inline epoll_acceptor_service::epoll_acceptor_service(
     374             205 :     capy::execution_context& ctx)
     375             205 :     : ctx_(ctx)
     376             205 :     , state_(
     377                 :           std::make_unique<epoll_acceptor_state>(
     378             205 :               ctx.use_service<epoll_scheduler>()))
     379                 : {
     380             205 : }
     381                 : 
     382             410 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
     383                 : 
     384                 : inline void
     385             205 : epoll_acceptor_service::shutdown()
     386                 : {
     387             205 :     std::lock_guard lock(state_->mutex_);
     388                 : 
     389             205 :     while (auto* impl = state_->acceptor_list_.pop_front())
     390 MIS           0 :         impl->close_socket();
     391                 : 
     392                 :     // Don't clear acceptor_ptrs_ here — same rationale as
     393                 :     // epoll_socket_service::shutdown(). Let ~state_ release ptrs
     394                 :     // after scheduler shutdown has drained all queued ops.
     395 HIT         205 : }
     396                 : 
     397                 : inline io_object::implementation*
     398              67 : epoll_acceptor_service::construct()
     399                 : {
     400              67 :     auto impl = std::make_shared<epoll_acceptor>(*this);
     401              67 :     auto* raw = impl.get();
     402                 : 
     403              67 :     std::lock_guard lock(state_->mutex_);
     404              67 :     state_->acceptor_list_.push_back(raw);
     405              67 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     406                 : 
     407              67 :     return raw;
     408              67 : }
     409                 : 
     410                 : inline void
     411              67 : epoll_acceptor_service::destroy(io_object::implementation* impl)
     412                 : {
     413              67 :     auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
     414              67 :     epoll_impl->close_socket();
     415              67 :     std::lock_guard lock(state_->mutex_);
     416              67 :     state_->acceptor_list_.remove(epoll_impl);
     417              67 :     state_->acceptor_ptrs_.erase(epoll_impl);
     418              67 : }
     419                 : 
     420                 : inline void
     421             131 : epoll_acceptor_service::close(io_object::handle& h)
     422                 : {
     423             131 :     static_cast<epoll_acceptor*>(h.get())->close_socket();
     424             131 : }
     425                 : 
     426                 : inline std::error_code
     427              66 : epoll_acceptor_service::open_acceptor(
     428                 :     tcp_acceptor::implementation& impl, endpoint ep, int backlog)
     429                 : {
     430              66 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     431              66 :     epoll_impl->close_socket();
     432                 : 
     433              66 :     int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
     434              66 :     if (fd < 0)
     435 MIS           0 :         return make_err(errno);
     436                 : 
     437 HIT          66 :     int reuse = 1;
     438              66 :     ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
     439                 : 
     440              66 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     441              66 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
     442                 :     {
     443               2 :         int errn = errno;
     444               2 :         ::close(fd);
     445               2 :         return make_err(errn);
     446                 :     }
     447                 : 
     448              64 :     if (::listen(fd, backlog) < 0)
     449                 :     {
     450 MIS           0 :         int errn = errno;
     451               0 :         ::close(fd);
     452               0 :         return make_err(errn);
     453                 :     }
     454                 : 
     455 HIT          64 :     epoll_impl->fd_ = fd;
     456                 : 
     457                 :     // Register fd with epoll (edge-triggered mode)
     458              64 :     epoll_impl->desc_state_.fd = fd;
     459                 :     {
     460              64 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     461              64 :         epoll_impl->desc_state_.read_op = nullptr;
     462              64 :     }
     463              64 :     scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
     464                 : 
     465                 :     // Cache the local endpoint (queries OS for ephemeral port if port was 0)
     466              64 :     sockaddr_in local_addr{};
     467              64 :     socklen_t local_len = sizeof(local_addr);
     468              64 :     if (::getsockname(
     469              64 :             fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     470              64 :         epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
     471                 : 
     472              64 :     return {};
     473                 : }
     474                 : 
     475                 : inline void
     476              11 : epoll_acceptor_service::post(epoll_op* op)
     477                 : {
     478              11 :     state_->sched_.post(op);
     479              11 : }
     480                 : 
     481                 : inline void
     482            4739 : epoll_acceptor_service::work_started() noexcept
     483                 : {
     484            4739 :     state_->sched_.work_started();
     485            4739 : }
     486                 : 
     487                 : inline void
     488               9 : epoll_acceptor_service::work_finished() noexcept
     489                 : {
     490               9 :     state_->sched_.work_finished();
     491               9 : }
     492                 : 
     493                 : inline epoll_socket_service*
     494            4732 : epoll_acceptor_service::socket_service() const noexcept
     495                 : {
     496            4732 :     auto* svc = ctx_.find_service<detail::socket_service>();
     497            4732 :     return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
     498                 : }
     499                 : 
     500                 : } // namespace boost::corosio::detail
     501                 : 
     502                 : #endif // BOOST_COROSIO_HAS_EPOLL
     503                 : 
     504                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3