1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
27  

27  

28  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
29  

29  

30  
#include <boost/capy/buffers.hpp>
30  
#include <boost/capy/buffers.hpp>
31  

31  

32  
#include <errno.h>
32  
#include <errno.h>
33  
#include <fcntl.h>
33  
#include <fcntl.h>
34  
#include <netinet/in.h>
34  
#include <netinet/in.h>
35  
#include <netinet/tcp.h>
35  
#include <netinet/tcp.h>
36  
#include <sys/socket.h>
36  
#include <sys/socket.h>
37  
#include <unistd.h>
37  
#include <unistd.h>
38  

38  

39  
#include <memory>
39  
#include <memory>
40  
#include <mutex>
40  
#include <mutex>
41  
#include <unordered_map>
41  
#include <unordered_map>
42  

42  

43  
/*
43  
/*
44  
    select Socket Implementation
44  
    select Socket Implementation
45  
    ============================
45  
    ============================
46  

46  

47  
    This mirrors the epoll_sockets design for behavioral consistency.
47  
    This mirrors the epoll_sockets design for behavioral consistency.
48  
    Each I/O operation follows the same pattern:
48  
    Each I/O operation follows the same pattern:
49  
      1. Try the syscall immediately (non-blocking socket)
49  
      1. Try the syscall immediately (non-blocking socket)
50  
      2. If it succeeds or fails with a real error, post to completion queue
50  
      2. If it succeeds or fails with a real error, post to completion queue
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52  

52  

53  
    Cancellation
53  
    Cancellation
54  
    ------------
54  
    ------------
55  
    See op.hpp for the completion/cancellation race handling via the
55  
    See op.hpp for the completion/cancellation race handling via the
56  
    `registered` atomic. cancel() must complete pending operations (post
56  
    `registered` atomic. cancel() must complete pending operations (post
57  
    them with cancelled flag) so coroutines waiting on them can resume.
57  
    them with cancelled flag) so coroutines waiting on them can resume.
58  
    close_socket() calls cancel() first to ensure this.
58  
    close_socket() calls cancel() first to ensure this.
59  

59  

60  
    Impl Lifetime with shared_ptr
60  
    Impl Lifetime with shared_ptr
61  
    -----------------------------
61  
    -----------------------------
62  
    Socket impls use enable_shared_from_this. The service owns impls via
62  
    Socket impls use enable_shared_from_this. The service owns impls via
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64  
    removal. When a user calls close(), we call cancel() which posts pending
64  
    removal. When a user calls close(), we call cancel() which posts pending
65  
    ops to the scheduler.
65  
    ops to the scheduler.
66  

66  

67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
71  
    to be destroyed if no other references exist.
71  
    to be destroyed if no other references exist.
72  

72  

73  
    Service Ownership
73  
    Service Ownership
74  
    -----------------
74  
    -----------------
75  
    select_socket_service owns all socket impls. destroy() removes the
75  
    select_socket_service owns all socket impls. destroy() removes the
76  
    shared_ptr from the map, but the impl may survive if ops still hold
76  
    shared_ptr from the map, but the impl may survive if ops still hold
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
78  
    in-flight ops will complete and release their refs.
78  
    in-flight ops will complete and release their refs.
79  
*/
79  
*/
80  

80  

81  
namespace boost::corosio::detail {
81  
namespace boost::corosio::detail {
82  

82  

83  
/** State for select socket service. */
83  
/** State for select socket service. */
84  
class select_socket_state
84  
class select_socket_state
85  
{
85  
{
86  
public:
86  
public:
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
88  
        : sched_(sched)
88  
        : sched_(sched)
89  
    {
89  
    {
90  
    }
90  
    }
91  

91  

92  
    select_scheduler& sched_;
92  
    select_scheduler& sched_;
93  
    std::mutex mutex_;
93  
    std::mutex mutex_;
94  
    intrusive_list<select_socket> socket_list_;
94  
    intrusive_list<select_socket> socket_list_;
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96  
        socket_ptrs_;
96  
        socket_ptrs_;
97  
};
97  
};
98  

98  

99  
/** select socket service implementation.
99  
/** select socket service implementation.
100  

100  

101  
    Inherits from socket_service to enable runtime polymorphism.
101  
    Inherits from socket_service to enable runtime polymorphism.
102  
    Uses key_type = socket_service for service lookup.
102  
    Uses key_type = socket_service for service lookup.
103  
*/
103  
*/
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105  
{
105  
{
106  
public:
106  
public:
107  
    explicit select_socket_service(capy::execution_context& ctx);
107  
    explicit select_socket_service(capy::execution_context& ctx);
108  
    ~select_socket_service() override;
108  
    ~select_socket_service() override;
109  

109  

110  
    select_socket_service(select_socket_service const&)            = delete;
110  
    select_socket_service(select_socket_service const&)            = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
112  

112  

113  
    void shutdown() override;
113  
    void shutdown() override;
114  

114  

115  
    io_object::implementation* construct() override;
115  
    io_object::implementation* construct() override;
116  
    void destroy(io_object::implementation*) override;
116  
    void destroy(io_object::implementation*) override;
117  
    void close(io_object::handle&) override;
117  
    void close(io_object::handle&) override;
118  
    std::error_code open_socket(tcp_socket::implementation& impl) override;
118  
    std::error_code open_socket(tcp_socket::implementation& impl) override;
119  

119  

120  
    select_scheduler& scheduler() const noexcept
120  
    select_scheduler& scheduler() const noexcept
121  
    {
121  
    {
122  
        return state_->sched_;
122  
        return state_->sched_;
123  
    }
123  
    }
124  
    void post(select_op* op);
124  
    void post(select_op* op);
125  
    void work_started() noexcept;
125  
    void work_started() noexcept;
126  
    void work_finished() noexcept;
126  
    void work_finished() noexcept;
127  

127  

128  
private:
128  
private:
129  
    std::unique_ptr<select_socket_state> state_;
129  
    std::unique_ptr<select_socket_state> state_;
130  
};
130  
};
131  

131  

132  
// Backward compatibility alias
132  
// Backward compatibility alias
133  
using select_sockets = select_socket_service;
133  
using select_sockets = select_socket_service;
134  

134  

135  
inline void
135  
inline void
136  
select_op::canceller::operator()() const noexcept
136  
select_op::canceller::operator()() const noexcept
137  
{
137  
{
138  
    op->cancel();
138  
    op->cancel();
139  
}
139  
}
140  

140  

141  
inline void
141  
inline void
142  
select_connect_op::cancel() noexcept
142  
select_connect_op::cancel() noexcept
143  
{
143  
{
144  
    if (socket_impl_)
144  
    if (socket_impl_)
145  
        socket_impl_->cancel_single_op(*this);
145  
        socket_impl_->cancel_single_op(*this);
146  
    else
146  
    else
147  
        request_cancel();
147  
        request_cancel();
148  
}
148  
}
149  

149  

150  
inline void
150  
inline void
151  
select_read_op::cancel() noexcept
151  
select_read_op::cancel() noexcept
152  
{
152  
{
153  
    if (socket_impl_)
153  
    if (socket_impl_)
154  
        socket_impl_->cancel_single_op(*this);
154  
        socket_impl_->cancel_single_op(*this);
155  
    else
155  
    else
156  
        request_cancel();
156  
        request_cancel();
157  
}
157  
}
158  

158  

159  
inline void
159  
inline void
160  
select_write_op::cancel() noexcept
160  
select_write_op::cancel() noexcept
161  
{
161  
{
162  
    if (socket_impl_)
162  
    if (socket_impl_)
163  
        socket_impl_->cancel_single_op(*this);
163  
        socket_impl_->cancel_single_op(*this);
164  
    else
164  
    else
165  
        request_cancel();
165  
        request_cancel();
166  
}
166  
}
167  

167  

168  
inline void
168  
inline void
169  
select_connect_op::operator()()
169  
select_connect_op::operator()()
170  
{
170  
{
171  
    stop_cb.reset();
171  
    stop_cb.reset();
172  

172  

173  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
173  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
174  

174  

175  
    // Cache endpoints on successful connect
175  
    // Cache endpoints on successful connect
176  
    if (success && socket_impl_)
176  
    if (success && socket_impl_)
177  
    {
177  
    {
178  
        // Query local endpoint via getsockname (may fail, but remote is always known)
178  
        // Query local endpoint via getsockname (may fail, but remote is always known)
179  
        endpoint local_ep;
179  
        endpoint local_ep;
180  
        sockaddr_in local_addr{};
180  
        sockaddr_in local_addr{};
181  
        socklen_t local_len = sizeof(local_addr);
181  
        socklen_t local_len = sizeof(local_addr);
182  
        if (::getsockname(
182  
        if (::getsockname(
183  
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
183  
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
184  
            local_ep = from_sockaddr_in(local_addr);
184  
            local_ep = from_sockaddr_in(local_addr);
185  
        // Always cache remote endpoint; local may be default if getsockname failed
185  
        // Always cache remote endpoint; local may be default if getsockname failed
186  
        static_cast<select_socket*>(socket_impl_)
186  
        static_cast<select_socket*>(socket_impl_)
187  
            ->set_endpoints(local_ep, target_endpoint);
187  
            ->set_endpoints(local_ep, target_endpoint);
188  
    }
188  
    }
189  

189  

190  
    if (ec_out)
190  
    if (ec_out)
191  
    {
191  
    {
192  
        if (cancelled.load(std::memory_order_acquire))
192  
        if (cancelled.load(std::memory_order_acquire))
193  
            *ec_out = capy::error::canceled;
193  
            *ec_out = capy::error::canceled;
194  
        else if (errn != 0)
194  
        else if (errn != 0)
195  
            *ec_out = make_err(errn);
195  
            *ec_out = make_err(errn);
196  
        else
196  
        else
197  
            *ec_out = {};
197  
            *ec_out = {};
198  
    }
198  
    }
199  

199  

200  
    if (bytes_out)
200  
    if (bytes_out)
201  
        *bytes_out = bytes_transferred;
201  
        *bytes_out = bytes_transferred;
202  

202  

203  
    // Move to stack before destroying the frame
203  
    // Move to stack before destroying the frame
204  
    capy::executor_ref saved_ex(ex);
204  
    capy::executor_ref saved_ex(ex);
205  
    std::coroutine_handle<> saved_h(h);
205  
    std::coroutine_handle<> saved_h(h);
206  
    impl_ptr.reset();
206  
    impl_ptr.reset();
207  
    dispatch_coro(saved_ex, saved_h).resume();
207  
    dispatch_coro(saved_ex, saved_h).resume();
208  
}
208  
}
209  

209  

210  
inline select_socket::select_socket(select_socket_service& svc) noexcept
210  
inline select_socket::select_socket(select_socket_service& svc) noexcept
211  
    : svc_(svc)
211  
    : svc_(svc)
212  
{
212  
{
213  
}
213  
}
214  

214  

215  
inline std::coroutine_handle<>
215  
inline std::coroutine_handle<>
216  
select_socket::connect(
216  
select_socket::connect(
217  
    std::coroutine_handle<> h,
217  
    std::coroutine_handle<> h,
218  
    capy::executor_ref ex,
218  
    capy::executor_ref ex,
219  
    endpoint ep,
219  
    endpoint ep,
220  
    std::stop_token token,
220  
    std::stop_token token,
221  
    std::error_code* ec)
221  
    std::error_code* ec)
222  
{
222  
{
223  
    auto& op = conn_;
223  
    auto& op = conn_;
224  
    op.reset();
224  
    op.reset();
225  
    op.h               = h;
225  
    op.h               = h;
226  
    op.ex              = ex;
226  
    op.ex              = ex;
227  
    op.ec_out          = ec;
227  
    op.ec_out          = ec;
228  
    op.fd              = fd_;
228  
    op.fd              = fd_;
229  
    op.target_endpoint = ep; // Store target for endpoint caching
229  
    op.target_endpoint = ep; // Store target for endpoint caching
230  
    op.start(token, this);
230  
    op.start(token, this);
231  

231  

232  
    sockaddr_in addr = detail::to_sockaddr_in(ep);
232  
    sockaddr_in addr = detail::to_sockaddr_in(ep);
233  
    int result =
233  
    int result =
234  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
234  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
235  

235  

236  
    if (result == 0)
236  
    if (result == 0)
237  
    {
237  
    {
238  
        // Sync success - cache endpoints immediately
238  
        // Sync success - cache endpoints immediately
239  
        sockaddr_in local_addr{};
239  
        sockaddr_in local_addr{};
240  
        socklen_t local_len = sizeof(local_addr);
240  
        socklen_t local_len = sizeof(local_addr);
241  
        if (::getsockname(
241  
        if (::getsockname(
242  
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
242  
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
243  
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
243  
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
244  
        remote_endpoint_ = ep;
244  
        remote_endpoint_ = ep;
245  

245  

246  
        op.complete(0, 0);
246  
        op.complete(0, 0);
247  
        op.impl_ptr = shared_from_this();
247  
        op.impl_ptr = shared_from_this();
248  
        svc_.post(&op);
248  
        svc_.post(&op);
249  
        // completion is always posted to scheduler queue, never inline.
249  
        // completion is always posted to scheduler queue, never inline.
250  
        return std::noop_coroutine();
250  
        return std::noop_coroutine();
251  
    }
251  
    }
252  

252  

253  
    if (errno == EINPROGRESS)
253  
    if (errno == EINPROGRESS)
254  
    {
254  
    {
255  
        svc_.work_started();
255  
        svc_.work_started();
256  
        op.impl_ptr = shared_from_this();
256  
        op.impl_ptr = shared_from_this();
257  

257  

258  
        // Set registering BEFORE register_fd to close the race window where
258  
        // Set registering BEFORE register_fd to close the race window where
259  
        // reactor sees an event before we set registered. The reactor treats
259  
        // reactor sees an event before we set registered. The reactor treats
260  
        // registering the same as registered when claiming the op.
260  
        // registering the same as registered when claiming the op.
261  
        op.registered.store(
261  
        op.registered.store(
262  
            select_registration_state::registering, std::memory_order_release);
262  
            select_registration_state::registering, std::memory_order_release);
263  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
263  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
264  

264  

265  
        // Transition to registered. If this fails, reactor or cancel already
265  
        // Transition to registered. If this fails, reactor or cancel already
266  
        // claimed the op (state is now unregistered), so we're done. However,
266  
        // claimed the op (state is now unregistered), so we're done. However,
267  
        // we must still deregister the fd because cancel's deregister_fd may
267  
        // we must still deregister the fd because cancel's deregister_fd may
268  
        // have run before our register_fd, leaving the fd orphaned.
268  
        // have run before our register_fd, leaving the fd orphaned.
269  
        auto expected = select_registration_state::registering;
269  
        auto expected = select_registration_state::registering;
270  
        if (!op.registered.compare_exchange_strong(
270  
        if (!op.registered.compare_exchange_strong(
271  
                expected, select_registration_state::registered,
271  
                expected, select_registration_state::registered,
272  
                std::memory_order_acq_rel))
272  
                std::memory_order_acq_rel))
273  
        {
273  
        {
274  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
274  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
275  
            // completion is always posted to scheduler queue, never inline.
275  
            // completion is always posted to scheduler queue, never inline.
276  
            return std::noop_coroutine();
276  
            return std::noop_coroutine();
277  
        }
277  
        }
278  

278  

279  
        // If cancelled was set before we registered, handle it now.
279  
        // If cancelled was set before we registered, handle it now.
280  
        if (op.cancelled.load(std::memory_order_acquire))
280  
        if (op.cancelled.load(std::memory_order_acquire))
281  
        {
281  
        {
282  
            auto prev = op.registered.exchange(
282  
            auto prev = op.registered.exchange(
283  
                select_registration_state::unregistered,
283  
                select_registration_state::unregistered,
284  
                std::memory_order_acq_rel);
284  
                std::memory_order_acq_rel);
285  
            if (prev != select_registration_state::unregistered)
285  
            if (prev != select_registration_state::unregistered)
286  
            {
286  
            {
287  
                svc_.scheduler().deregister_fd(
287  
                svc_.scheduler().deregister_fd(
288  
                    fd_, select_scheduler::event_write);
288  
                    fd_, select_scheduler::event_write);
289  
                op.impl_ptr = shared_from_this();
289  
                op.impl_ptr = shared_from_this();
290  
                svc_.post(&op);
290  
                svc_.post(&op);
291  
                svc_.work_finished();
291  
                svc_.work_finished();
292  
            }
292  
            }
293  
        }
293  
        }
294  
        // completion is always posted to scheduler queue, never inline.
294  
        // completion is always posted to scheduler queue, never inline.
295  
        return std::noop_coroutine();
295  
        return std::noop_coroutine();
296  
    }
296  
    }
297  

297  

298  
    op.complete(errno, 0);
298  
    op.complete(errno, 0);
299  
    op.impl_ptr = shared_from_this();
299  
    op.impl_ptr = shared_from_this();
300  
    svc_.post(&op);
300  
    svc_.post(&op);
301  
    // completion is always posted to scheduler queue, never inline.
301  
    // completion is always posted to scheduler queue, never inline.
302  
    return std::noop_coroutine();
302  
    return std::noop_coroutine();
303  
}
303  
}
304  

304  

305  
inline std::coroutine_handle<>
305  
inline std::coroutine_handle<>
306  
select_socket::read_some(
306  
select_socket::read_some(
307  
    std::coroutine_handle<> h,
307  
    std::coroutine_handle<> h,
308  
    capy::executor_ref ex,
308  
    capy::executor_ref ex,
309  
    io_buffer_param param,
309  
    io_buffer_param param,
310  
    std::stop_token token,
310  
    std::stop_token token,
311  
    std::error_code* ec,
311  
    std::error_code* ec,
312  
    std::size_t* bytes_out)
312  
    std::size_t* bytes_out)
313  
{
313  
{
314  
    auto& op = rd_;
314  
    auto& op = rd_;
315  
    op.reset();
315  
    op.reset();
316  
    op.h         = h;
316  
    op.h         = h;
317  
    op.ex        = ex;
317  
    op.ex        = ex;
318  
    op.ec_out    = ec;
318  
    op.ec_out    = ec;
319  
    op.bytes_out = bytes_out;
319  
    op.bytes_out = bytes_out;
320  
    op.fd        = fd_;
320  
    op.fd        = fd_;
321  
    op.start(token, this);
321  
    op.start(token, this);
322  

322  

323  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
323  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
324  
    op.iovec_count =
324  
    op.iovec_count =
325  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
325  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
326  

326  

327  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
327  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
328  
    {
328  
    {
329  
        op.empty_buffer_read = true;
329  
        op.empty_buffer_read = true;
330  
        op.complete(0, 0);
330  
        op.complete(0, 0);
331  
        op.impl_ptr = shared_from_this();
331  
        op.impl_ptr = shared_from_this();
332  
        svc_.post(&op);
332  
        svc_.post(&op);
333  
        return std::noop_coroutine();
333  
        return std::noop_coroutine();
334  
    }
334  
    }
335  

335  

336  
    for (int i = 0; i < op.iovec_count; ++i)
336  
    for (int i = 0; i < op.iovec_count; ++i)
337  
    {
337  
    {
338  
        op.iovecs[i].iov_base = bufs[i].data();
338  
        op.iovecs[i].iov_base = bufs[i].data();
339  
        op.iovecs[i].iov_len  = bufs[i].size();
339  
        op.iovecs[i].iov_len  = bufs[i].size();
340  
    }
340  
    }
341  

341  

342  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
342  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
343  

343  

344  
    if (n > 0)
344  
    if (n > 0)
345  
    {
345  
    {
346  
        op.complete(0, static_cast<std::size_t>(n));
346  
        op.complete(0, static_cast<std::size_t>(n));
347  
        op.impl_ptr = shared_from_this();
347  
        op.impl_ptr = shared_from_this();
348  
        svc_.post(&op);
348  
        svc_.post(&op);
349  
        return std::noop_coroutine();
349  
        return std::noop_coroutine();
350  
    }
350  
    }
351  

351  

352  
    if (n == 0)
352  
    if (n == 0)
353  
    {
353  
    {
354  
        op.complete(0, 0);
354  
        op.complete(0, 0);
355  
        op.impl_ptr = shared_from_this();
355  
        op.impl_ptr = shared_from_this();
356  
        svc_.post(&op);
356  
        svc_.post(&op);
357  
        return std::noop_coroutine();
357  
        return std::noop_coroutine();
358  
    }
358  
    }
359  

359  

360  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
360  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
361  
    {
361  
    {
362  
        svc_.work_started();
362  
        svc_.work_started();
363  
        op.impl_ptr = shared_from_this();
363  
        op.impl_ptr = shared_from_this();
364  

364  

365  
        // Set registering BEFORE register_fd to close the race window where
365  
        // Set registering BEFORE register_fd to close the race window where
366  
        // reactor sees an event before we set registered.
366  
        // reactor sees an event before we set registered.
367  
        op.registered.store(
367  
        op.registered.store(
368  
            select_registration_state::registering, std::memory_order_release);
368  
            select_registration_state::registering, std::memory_order_release);
369  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
369  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
370  

370  

371  
        // Transition to registered. If this fails, reactor or cancel already
371  
        // Transition to registered. If this fails, reactor or cancel already
372  
        // claimed the op (state is now unregistered), so we're done. However,
372  
        // claimed the op (state is now unregistered), so we're done. However,
373  
        // we must still deregister the fd because cancel's deregister_fd may
373  
        // we must still deregister the fd because cancel's deregister_fd may
374  
        // have run before our register_fd, leaving the fd orphaned.
374  
        // have run before our register_fd, leaving the fd orphaned.
375  
        auto expected = select_registration_state::registering;
375  
        auto expected = select_registration_state::registering;
376  
        if (!op.registered.compare_exchange_strong(
376  
        if (!op.registered.compare_exchange_strong(
377  
                expected, select_registration_state::registered,
377  
                expected, select_registration_state::registered,
378  
                std::memory_order_acq_rel))
378  
                std::memory_order_acq_rel))
379  
        {
379  
        {
380  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
380  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
381  
            return std::noop_coroutine();
381  
            return std::noop_coroutine();
382  
        }
382  
        }
383  

383  

384  
        // If cancelled was set before we registered, handle it now.
384  
        // If cancelled was set before we registered, handle it now.
385  
        if (op.cancelled.load(std::memory_order_acquire))
385  
        if (op.cancelled.load(std::memory_order_acquire))
386  
        {
386  
        {
387  
            auto prev = op.registered.exchange(
387  
            auto prev = op.registered.exchange(
388  
                select_registration_state::unregistered,
388  
                select_registration_state::unregistered,
389  
                std::memory_order_acq_rel);
389  
                std::memory_order_acq_rel);
390  
            if (prev != select_registration_state::unregistered)
390  
            if (prev != select_registration_state::unregistered)
391  
            {
391  
            {
392  
                svc_.scheduler().deregister_fd(
392  
                svc_.scheduler().deregister_fd(
393  
                    fd_, select_scheduler::event_read);
393  
                    fd_, select_scheduler::event_read);
394  
                op.impl_ptr = shared_from_this();
394  
                op.impl_ptr = shared_from_this();
395  
                svc_.post(&op);
395  
                svc_.post(&op);
396  
                svc_.work_finished();
396  
                svc_.work_finished();
397  
            }
397  
            }
398  
        }
398  
        }
399  
        return std::noop_coroutine();
399  
        return std::noop_coroutine();
400  
    }
400  
    }
401  

401  

402  
    op.complete(errno, 0);
402  
    op.complete(errno, 0);
403  
    op.impl_ptr = shared_from_this();
403  
    op.impl_ptr = shared_from_this();
404  
    svc_.post(&op);
404  
    svc_.post(&op);
405  
    return std::noop_coroutine();
405  
    return std::noop_coroutine();
406  
}
406  
}
407  

407  

408  
inline std::coroutine_handle<>
408  
inline std::coroutine_handle<>
409  
select_socket::write_some(
409  
select_socket::write_some(
410  
    std::coroutine_handle<> h,
410  
    std::coroutine_handle<> h,
411  
    capy::executor_ref ex,
411  
    capy::executor_ref ex,
412  
    io_buffer_param param,
412  
    io_buffer_param param,
413  
    std::stop_token token,
413  
    std::stop_token token,
414  
    std::error_code* ec,
414  
    std::error_code* ec,
415  
    std::size_t* bytes_out)
415  
    std::size_t* bytes_out)
416  
{
416  
{
417  
    auto& op = wr_;
417  
    auto& op = wr_;
418  
    op.reset();
418  
    op.reset();
419  
    op.h         = h;
419  
    op.h         = h;
420  
    op.ex        = ex;
420  
    op.ex        = ex;
421  
    op.ec_out    = ec;
421  
    op.ec_out    = ec;
422  
    op.bytes_out = bytes_out;
422  
    op.bytes_out = bytes_out;
423  
    op.fd        = fd_;
423  
    op.fd        = fd_;
424  
    op.start(token, this);
424  
    op.start(token, this);
425  

425  

426  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
426  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
427  
    op.iovec_count =
427  
    op.iovec_count =
428  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
428  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
429  

429  

430  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
430  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
431  
    {
431  
    {
432  
        op.complete(0, 0);
432  
        op.complete(0, 0);
433  
        op.impl_ptr = shared_from_this();
433  
        op.impl_ptr = shared_from_this();
434  
        svc_.post(&op);
434  
        svc_.post(&op);
435  
        return std::noop_coroutine();
435  
        return std::noop_coroutine();
436  
    }
436  
    }
437  

437  

438  
    for (int i = 0; i < op.iovec_count; ++i)
438  
    for (int i = 0; i < op.iovec_count; ++i)
439  
    {
439  
    {
440  
        op.iovecs[i].iov_base = bufs[i].data();
440  
        op.iovecs[i].iov_base = bufs[i].data();
441  
        op.iovecs[i].iov_len  = bufs[i].size();
441  
        op.iovecs[i].iov_len  = bufs[i].size();
442  
    }
442  
    }
443  

443  

444  
    msghdr msg{};
444  
    msghdr msg{};
445  
    msg.msg_iov    = op.iovecs;
445  
    msg.msg_iov    = op.iovecs;
446  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
446  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
447  

447  

448  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
448  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
449  

449  

450  
    if (n > 0)
450  
    if (n > 0)
451  
    {
451  
    {
452  
        op.complete(0, static_cast<std::size_t>(n));
452  
        op.complete(0, static_cast<std::size_t>(n));
453  
        op.impl_ptr = shared_from_this();
453  
        op.impl_ptr = shared_from_this();
454  
        svc_.post(&op);
454  
        svc_.post(&op);
455  
        return std::noop_coroutine();
455  
        return std::noop_coroutine();
456  
    }
456  
    }
457  

457  

458  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
458  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
459  
    {
459  
    {
460  
        svc_.work_started();
460  
        svc_.work_started();
461  
        op.impl_ptr = shared_from_this();
461  
        op.impl_ptr = shared_from_this();
462  

462  

463  
        // Set registering BEFORE register_fd to close the race window where
463  
        // Set registering BEFORE register_fd to close the race window where
464  
        // reactor sees an event before we set registered.
464  
        // reactor sees an event before we set registered.
465  
        op.registered.store(
465  
        op.registered.store(
466  
            select_registration_state::registering, std::memory_order_release);
466  
            select_registration_state::registering, std::memory_order_release);
467  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
467  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
468  

468  

469  
        // Transition to registered. If this fails, reactor or cancel already
469  
        // Transition to registered. If this fails, reactor or cancel already
470  
        // claimed the op (state is now unregistered), so we're done. However,
470  
        // claimed the op (state is now unregistered), so we're done. However,
471  
        // we must still deregister the fd because cancel's deregister_fd may
471  
        // we must still deregister the fd because cancel's deregister_fd may
472  
        // have run before our register_fd, leaving the fd orphaned.
472  
        // have run before our register_fd, leaving the fd orphaned.
473  
        auto expected = select_registration_state::registering;
473  
        auto expected = select_registration_state::registering;
474  
        if (!op.registered.compare_exchange_strong(
474  
        if (!op.registered.compare_exchange_strong(
475  
                expected, select_registration_state::registered,
475  
                expected, select_registration_state::registered,
476  
                std::memory_order_acq_rel))
476  
                std::memory_order_acq_rel))
477  
        {
477  
        {
478  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
478  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
479  
            return std::noop_coroutine();
479  
            return std::noop_coroutine();
480  
        }
480  
        }
481  

481  

482  
        // If cancelled was set before we registered, handle it now.
482  
        // If cancelled was set before we registered, handle it now.
483  
        if (op.cancelled.load(std::memory_order_acquire))
483  
        if (op.cancelled.load(std::memory_order_acquire))
484  
        {
484  
        {
485  
            auto prev = op.registered.exchange(
485  
            auto prev = op.registered.exchange(
486  
                select_registration_state::unregistered,
486  
                select_registration_state::unregistered,
487  
                std::memory_order_acq_rel);
487  
                std::memory_order_acq_rel);
488  
            if (prev != select_registration_state::unregistered)
488  
            if (prev != select_registration_state::unregistered)
489  
            {
489  
            {
490  
                svc_.scheduler().deregister_fd(
490  
                svc_.scheduler().deregister_fd(
491  
                    fd_, select_scheduler::event_write);
491  
                    fd_, select_scheduler::event_write);
492  
                op.impl_ptr = shared_from_this();
492  
                op.impl_ptr = shared_from_this();
493  
                svc_.post(&op);
493  
                svc_.post(&op);
494  
                svc_.work_finished();
494  
                svc_.work_finished();
495  
            }
495  
            }
496  
        }
496  
        }
497  
        return std::noop_coroutine();
497  
        return std::noop_coroutine();
498  
    }
498  
    }
499  

499  

500  
    op.complete(errno ? errno : EIO, 0);
500  
    op.complete(errno ? errno : EIO, 0);
501  
    op.impl_ptr = shared_from_this();
501  
    op.impl_ptr = shared_from_this();
502  
    svc_.post(&op);
502  
    svc_.post(&op);
503  
    return std::noop_coroutine();
503  
    return std::noop_coroutine();
504  
}
504  
}
505  

505  

506  
inline std::error_code
506  
inline std::error_code
507  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
507  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
508  
{
508  
{
509  
    int how;
509  
    int how;
510  
    switch (what)
510  
    switch (what)
511  
    {
511  
    {
512  
    case tcp_socket::shutdown_receive:
512  
    case tcp_socket::shutdown_receive:
513  
        how = SHUT_RD;
513  
        how = SHUT_RD;
514  
        break;
514  
        break;
515  
    case tcp_socket::shutdown_send:
515  
    case tcp_socket::shutdown_send:
516  
        how = SHUT_WR;
516  
        how = SHUT_WR;
517  
        break;
517  
        break;
518  
    case tcp_socket::shutdown_both:
518  
    case tcp_socket::shutdown_both:
519  
        how = SHUT_RDWR;
519  
        how = SHUT_RDWR;
520  
        break;
520  
        break;
521  
    default:
521  
    default:
522  
        return make_err(EINVAL);
522  
        return make_err(EINVAL);
523  
    }
523  
    }
524  
    if (::shutdown(fd_, how) != 0)
524  
    if (::shutdown(fd_, how) != 0)
525  
        return make_err(errno);
525  
        return make_err(errno);
526  
    return {};
526  
    return {};
527  
}
527  
}
528  

528  

529  
inline std::error_code
529  
inline std::error_code
530  
select_socket::set_no_delay(bool value) noexcept
530  
select_socket::set_no_delay(bool value) noexcept
531  
{
531  
{
532  
    int flag = value ? 1 : 0;
532  
    int flag = value ? 1 : 0;
533  
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
533  
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
534  
        return make_err(errno);
534  
        return make_err(errno);
535  
    return {};
535  
    return {};
536  
}
536  
}
537  

537  

538  
inline bool
538  
inline bool
539  
select_socket::no_delay(std::error_code& ec) const noexcept
539  
select_socket::no_delay(std::error_code& ec) const noexcept
540  
{
540  
{
541  
    int flag      = 0;
541  
    int flag      = 0;
542  
    socklen_t len = sizeof(flag);
542  
    socklen_t len = sizeof(flag);
543  
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
543  
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
544  
    {
544  
    {
545  
        ec = make_err(errno);
545  
        ec = make_err(errno);
546  
        return false;
546  
        return false;
547  
    }
547  
    }
548  
    ec = {};
548  
    ec = {};
549  
    return flag != 0;
549  
    return flag != 0;
550  
}
550  
}
551  

551  

552  
inline std::error_code
552  
inline std::error_code
553  
select_socket::set_keep_alive(bool value) noexcept
553  
select_socket::set_keep_alive(bool value) noexcept
554  
{
554  
{
555  
    int flag = value ? 1 : 0;
555  
    int flag = value ? 1 : 0;
556  
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
556  
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
557  
        return make_err(errno);
557  
        return make_err(errno);
558  
    return {};
558  
    return {};
559  
}
559  
}
560  

560  

561  
inline bool
561  
inline bool
562  
select_socket::keep_alive(std::error_code& ec) const noexcept
562  
select_socket::keep_alive(std::error_code& ec) const noexcept
563  
{
563  
{
564  
    int flag      = 0;
564  
    int flag      = 0;
565  
    socklen_t len = sizeof(flag);
565  
    socklen_t len = sizeof(flag);
566  
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
566  
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
567  
    {
567  
    {
568  
        ec = make_err(errno);
568  
        ec = make_err(errno);
569  
        return false;
569  
        return false;
570  
    }
570  
    }
571  
    ec = {};
571  
    ec = {};
572  
    return flag != 0;
572  
    return flag != 0;
573  
}
573  
}
574  

574  

575  
inline std::error_code
575  
inline std::error_code
576  
select_socket::set_receive_buffer_size(int size) noexcept
576  
select_socket::set_receive_buffer_size(int size) noexcept
577  
{
577  
{
578  
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
578  
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
579  
        return make_err(errno);
579  
        return make_err(errno);
580  
    return {};
580  
    return {};
581  
}
581  
}
582  

582  

583  
inline int
583  
inline int
584  
select_socket::receive_buffer_size(std::error_code& ec) const noexcept
584  
select_socket::receive_buffer_size(std::error_code& ec) const noexcept
585  
{
585  
{
586  
    int size      = 0;
586  
    int size      = 0;
587  
    socklen_t len = sizeof(size);
587  
    socklen_t len = sizeof(size);
588  
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
588  
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
589  
    {
589  
    {
590  
        ec = make_err(errno);
590  
        ec = make_err(errno);
591  
        return 0;
591  
        return 0;
592  
    }
592  
    }
593  
    ec = {};
593  
    ec = {};
594  
    return size;
594  
    return size;
595  
}
595  
}
596  

596  

597  
inline std::error_code
597  
inline std::error_code
598  
select_socket::set_send_buffer_size(int size) noexcept
598  
select_socket::set_send_buffer_size(int size) noexcept
599  
{
599  
{
600  
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
600  
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
601  
        return make_err(errno);
601  
        return make_err(errno);
602  
    return {};
602  
    return {};
603  
}
603  
}
604  

604  

605  
inline int
605  
inline int
606  
select_socket::send_buffer_size(std::error_code& ec) const noexcept
606  
select_socket::send_buffer_size(std::error_code& ec) const noexcept
607  
{
607  
{
608  
    int size      = 0;
608  
    int size      = 0;
609  
    socklen_t len = sizeof(size);
609  
    socklen_t len = sizeof(size);
610  
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
610  
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
611  
    {
611  
    {
612  
        ec = make_err(errno);
612  
        ec = make_err(errno);
613  
        return 0;
613  
        return 0;
614  
    }
614  
    }
615  
    ec = {};
615  
    ec = {};
616  
    return size;
616  
    return size;
617  
}
617  
}
618  

618  

619  
inline std::error_code
619  
inline std::error_code
620  
select_socket::set_linger(bool enabled, int timeout) noexcept
620  
select_socket::set_linger(bool enabled, int timeout) noexcept
621  
{
621  
{
622  
    if (timeout < 0)
622  
    if (timeout < 0)
623  
        return make_err(EINVAL);
623  
        return make_err(EINVAL);
624  
    struct ::linger lg;
624  
    struct ::linger lg;
625  
    lg.l_onoff  = enabled ? 1 : 0;
625  
    lg.l_onoff  = enabled ? 1 : 0;
626  
    lg.l_linger = timeout;
626  
    lg.l_linger = timeout;
627  
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
627  
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
628  
        return make_err(errno);
628  
        return make_err(errno);
629  
    return {};
629  
    return {};
630  
}
630  
}
631  

631  

632  
inline tcp_socket::linger_options
632  
inline tcp_socket::linger_options
633  
select_socket::linger(std::error_code& ec) const noexcept
633  
select_socket::linger(std::error_code& ec) const noexcept
634  
{
634  
{
635  
    struct ::linger lg{};
635  
    struct ::linger lg{};
636  
    socklen_t len = sizeof(lg);
636  
    socklen_t len = sizeof(lg);
637  
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
637  
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
638  
    {
638  
    {
639  
        ec = make_err(errno);
639  
        ec = make_err(errno);
640  
        return {};
640  
        return {};
641  
    }
641  
    }
642  
    ec = {};
642  
    ec = {};
643  
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
643  
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
644  
}
644  
}
645  

645  

646  
inline void
646  
inline void
647  
select_socket::cancel() noexcept
647  
select_socket::cancel() noexcept
648  
{
648  
{
649  
    auto self = weak_from_this().lock();
649  
    auto self = weak_from_this().lock();
650  
    if (!self)
650  
    if (!self)
651  
        return;
651  
        return;
652  

652  

653  
    auto cancel_op = [this, &self](select_op& op, int events) {
653  
    auto cancel_op = [this, &self](select_op& op, int events) {
654  
        auto prev = op.registered.exchange(
654  
        auto prev = op.registered.exchange(
655  
            select_registration_state::unregistered, std::memory_order_acq_rel);
655  
            select_registration_state::unregistered, std::memory_order_acq_rel);
656  
        op.request_cancel();
656  
        op.request_cancel();
657  
        if (prev != select_registration_state::unregistered)
657  
        if (prev != select_registration_state::unregistered)
658  
        {
658  
        {
659  
            svc_.scheduler().deregister_fd(fd_, events);
659  
            svc_.scheduler().deregister_fd(fd_, events);
660  
            op.impl_ptr = self;
660  
            op.impl_ptr = self;
661  
            svc_.post(&op);
661  
            svc_.post(&op);
662  
            svc_.work_finished();
662  
            svc_.work_finished();
663  
        }
663  
        }
664  
    };
664  
    };
665  

665  

666  
    cancel_op(conn_, select_scheduler::event_write);
666  
    cancel_op(conn_, select_scheduler::event_write);
667  
    cancel_op(rd_, select_scheduler::event_read);
667  
    cancel_op(rd_, select_scheduler::event_read);
668  
    cancel_op(wr_, select_scheduler::event_write);
668  
    cancel_op(wr_, select_scheduler::event_write);
669  
}
669  
}
670  

670  

671  
inline void
671  
inline void
672  
select_socket::cancel_single_op(select_op& op) noexcept
672  
select_socket::cancel_single_op(select_op& op) noexcept
673  
{
673  
{
674  
    auto self = weak_from_this().lock();
674  
    auto self = weak_from_this().lock();
675  
    if (!self)
675  
    if (!self)
676  
        return;
676  
        return;
677  

677  

678  
    // Called from stop_token callback to cancel a specific pending operation.
678  
    // Called from stop_token callback to cancel a specific pending operation.
679  
    auto prev = op.registered.exchange(
679  
    auto prev = op.registered.exchange(
680  
        select_registration_state::unregistered, std::memory_order_acq_rel);
680  
        select_registration_state::unregistered, std::memory_order_acq_rel);
681  
    op.request_cancel();
681  
    op.request_cancel();
682  

682  

683  
    if (prev != select_registration_state::unregistered)
683  
    if (prev != select_registration_state::unregistered)
684  
    {
684  
    {
685  
        // Determine which event type to deregister
685  
        // Determine which event type to deregister
686  
        int events = 0;
686  
        int events = 0;
687  
        if (&op == &conn_ || &op == &wr_)
687  
        if (&op == &conn_ || &op == &wr_)
688  
            events = select_scheduler::event_write;
688  
            events = select_scheduler::event_write;
689  
        else if (&op == &rd_)
689  
        else if (&op == &rd_)
690  
            events = select_scheduler::event_read;
690  
            events = select_scheduler::event_read;
691  

691  

692  
        svc_.scheduler().deregister_fd(fd_, events);
692  
        svc_.scheduler().deregister_fd(fd_, events);
693  

693  

694  
        op.impl_ptr = self;
694  
        op.impl_ptr = self;
695  
        svc_.post(&op);
695  
        svc_.post(&op);
696  
        svc_.work_finished();
696  
        svc_.work_finished();
697  
    }
697  
    }
698  
}
698  
}
699  

699  

700  
inline void
700  
inline void
701  
select_socket::close_socket() noexcept
701  
select_socket::close_socket() noexcept
702  
{
702  
{
703  
    auto self = weak_from_this().lock();
703  
    auto self = weak_from_this().lock();
704  
    if (self)
704  
    if (self)
705  
    {
705  
    {
706  
        auto cancel_op = [this, &self](select_op& op, int events) {
706  
        auto cancel_op = [this, &self](select_op& op, int events) {
707  
            auto prev = op.registered.exchange(
707  
            auto prev = op.registered.exchange(
708  
                select_registration_state::unregistered,
708  
                select_registration_state::unregistered,
709  
                std::memory_order_acq_rel);
709  
                std::memory_order_acq_rel);
710  
            op.request_cancel();
710  
            op.request_cancel();
711  
            if (prev != select_registration_state::unregistered)
711  
            if (prev != select_registration_state::unregistered)
712  
            {
712  
            {
713  
                svc_.scheduler().deregister_fd(fd_, events);
713  
                svc_.scheduler().deregister_fd(fd_, events);
714  
                op.impl_ptr = self;
714  
                op.impl_ptr = self;
715  
                svc_.post(&op);
715  
                svc_.post(&op);
716  
                svc_.work_finished();
716  
                svc_.work_finished();
717  
            }
717  
            }
718  
        };
718  
        };
719  

719  

720  
        cancel_op(conn_, select_scheduler::event_write);
720  
        cancel_op(conn_, select_scheduler::event_write);
721  
        cancel_op(rd_, select_scheduler::event_read);
721  
        cancel_op(rd_, select_scheduler::event_read);
722  
        cancel_op(wr_, select_scheduler::event_write);
722  
        cancel_op(wr_, select_scheduler::event_write);
723  
    }
723  
    }
724  

724  

725  
    if (fd_ >= 0)
725  
    if (fd_ >= 0)
726  
    {
726  
    {
727  
        svc_.scheduler().deregister_fd(
727  
        svc_.scheduler().deregister_fd(
728  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
728  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
729  
        ::close(fd_);
729  
        ::close(fd_);
730  
        fd_ = -1;
730  
        fd_ = -1;
731  
    }
731  
    }
732  

732  

733  
    local_endpoint_  = endpoint{};
733  
    local_endpoint_  = endpoint{};
734  
    remote_endpoint_ = endpoint{};
734  
    remote_endpoint_ = endpoint{};
735  
}
735  
}
736  

736  

737  
inline select_socket_service::select_socket_service(
737  
inline select_socket_service::select_socket_service(
738  
    capy::execution_context& ctx)
738  
    capy::execution_context& ctx)
739  
    : state_(
739  
    : state_(
740  
          std::make_unique<select_socket_state>(
740  
          std::make_unique<select_socket_state>(
741  
              ctx.use_service<select_scheduler>()))
741  
              ctx.use_service<select_scheduler>()))
742  
{
742  
{
743  
}
743  
}
744  

744  

745  
inline select_socket_service::~select_socket_service() {}
745  
inline select_socket_service::~select_socket_service() {}
746  

746  

747  
inline void
747  
inline void
748  
select_socket_service::shutdown()
748  
select_socket_service::shutdown()
749  
{
749  
{
750  
    std::lock_guard lock(state_->mutex_);
750  
    std::lock_guard lock(state_->mutex_);
751  

751  

752  
    while (auto* impl = state_->socket_list_.pop_front())
752  
    while (auto* impl = state_->socket_list_.pop_front())
753  
        impl->close_socket();
753  
        impl->close_socket();
754  

754  

755  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
755  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
756  
    // drains completed_ops_, calling destroy() on each queued op. Letting
756  
    // drains completed_ops_, calling destroy() on each queued op. Letting
757  
    // ~state_ release the ptrs (during service destruction, after scheduler
757  
    // ~state_ release the ptrs (during service destruction, after scheduler
758  
    // shutdown) keeps every impl alive until all ops have been drained.
758  
    // shutdown) keeps every impl alive until all ops have been drained.
759  
}
759  
}
760  

760  

761  
inline io_object::implementation*
761  
inline io_object::implementation*
762  
select_socket_service::construct()
762  
select_socket_service::construct()
763  
{
763  
{
764  
    auto impl = std::make_shared<select_socket>(*this);
764  
    auto impl = std::make_shared<select_socket>(*this);
765  
    auto* raw = impl.get();
765  
    auto* raw = impl.get();
766  

766  

767  
    {
767  
    {
768  
        std::lock_guard lock(state_->mutex_);
768  
        std::lock_guard lock(state_->mutex_);
769  
        state_->socket_list_.push_back(raw);
769  
        state_->socket_list_.push_back(raw);
770  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
770  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
771  
    }
771  
    }
772  

772  

773  
    return raw;
773  
    return raw;
774  
}
774  
}
775  

775  

776  
inline void
776  
inline void
777  
select_socket_service::destroy(io_object::implementation* impl)
777  
select_socket_service::destroy(io_object::implementation* impl)
778  
{
778  
{
779  
    auto* select_impl = static_cast<select_socket*>(impl);
779  
    auto* select_impl = static_cast<select_socket*>(impl);
780  
    select_impl->close_socket();
780  
    select_impl->close_socket();
781  
    std::lock_guard lock(state_->mutex_);
781  
    std::lock_guard lock(state_->mutex_);
782  
    state_->socket_list_.remove(select_impl);
782  
    state_->socket_list_.remove(select_impl);
783  
    state_->socket_ptrs_.erase(select_impl);
783  
    state_->socket_ptrs_.erase(select_impl);
784  
}
784  
}
785  

785  

786  
inline std::error_code
786  
inline std::error_code
787  
select_socket_service::open_socket(tcp_socket::implementation& impl)
787  
select_socket_service::open_socket(tcp_socket::implementation& impl)
788  
{
788  
{
789  
    auto* select_impl = static_cast<select_socket*>(&impl);
789  
    auto* select_impl = static_cast<select_socket*>(&impl);
790  
    select_impl->close_socket();
790  
    select_impl->close_socket();
791  

791  

792  
    int fd = ::socket(AF_INET, SOCK_STREAM, 0);
792  
    int fd = ::socket(AF_INET, SOCK_STREAM, 0);
793  
    if (fd < 0)
793  
    if (fd < 0)
794  
        return make_err(errno);
794  
        return make_err(errno);
795  

795  

796  
    // Set non-blocking and close-on-exec
796  
    // Set non-blocking and close-on-exec
797  
    int flags = ::fcntl(fd, F_GETFL, 0);
797  
    int flags = ::fcntl(fd, F_GETFL, 0);
798  
    if (flags == -1)
798  
    if (flags == -1)
799  
    {
799  
    {
800  
        int errn = errno;
800  
        int errn = errno;
801  
        ::close(fd);
801  
        ::close(fd);
802  
        return make_err(errn);
802  
        return make_err(errn);
803  
    }
803  
    }
804  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
804  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
805  
    {
805  
    {
806  
        int errn = errno;
806  
        int errn = errno;
807  
        ::close(fd);
807  
        ::close(fd);
808  
        return make_err(errn);
808  
        return make_err(errn);
809  
    }
809  
    }
810  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
810  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
811  
    {
811  
    {
812  
        int errn = errno;
812  
        int errn = errno;
813  
        ::close(fd);
813  
        ::close(fd);
814  
        return make_err(errn);
814  
        return make_err(errn);
815  
    }
815  
    }
816  

816  

817  
    // Check fd is within select() limits
817  
    // Check fd is within select() limits
818  
    if (fd >= FD_SETSIZE)
818  
    if (fd >= FD_SETSIZE)
819  
    {
819  
    {
820  
        ::close(fd);
820  
        ::close(fd);
821  
        return make_err(EMFILE); // Too many open files
821  
        return make_err(EMFILE); // Too many open files
822  
    }
822  
    }
823  

823  

824  
    select_impl->fd_ = fd;
824  
    select_impl->fd_ = fd;
825  
    return {};
825  
    return {};
826  
}
826  
}
827  

827  

828  
inline void
828  
inline void
829  
select_socket_service::close(io_object::handle& h)
829  
select_socket_service::close(io_object::handle& h)
830  
{
830  
{
831  
    static_cast<select_socket*>(h.get())->close_socket();
831  
    static_cast<select_socket*>(h.get())->close_socket();
832  
}
832  
}
833  

833  

834  
inline void
834  
inline void
835  
select_socket_service::post(select_op* op)
835  
select_socket_service::post(select_op* op)
836  
{
836  
{
837  
    state_->sched_.post(op);
837  
    state_->sched_.post(op);
838  
}
838  
}
839  

839  

840  
inline void
840  
inline void
841  
select_socket_service::work_started() noexcept
841  
select_socket_service::work_started() noexcept
842  
{
842  
{
843  
    state_->sched_.work_started();
843  
    state_->sched_.work_started();
844  
}
844  
}
845  

845  

846  
inline void
846  
inline void
847  
select_socket_service::work_finished() noexcept
847  
select_socket_service::work_finished() noexcept
848  
{
848  
{
849  
    state_->sched_.work_finished();
849  
    state_->sched_.work_finished();
850  
}
850  
}
851  

851  

852  
} // namespace boost::corosio::detail
852  
} // namespace boost::corosio::detail
853  

853  

854  
#endif // BOOST_COROSIO_HAS_SELECT
854  
#endif // BOOST_COROSIO_HAS_SELECT
855  

855  

856  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
856  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP