include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

81.0% Lines (337/416) 93.3% Functions (28/30)
include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 229 explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 229 }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code
128 open_socket(tcp_socket::implementation& impl,
129 int family, int type, int protocol) override;
130
131 487264 epoll_scheduler& scheduler() const noexcept
132 {
133 487264 return state_->sched_;
134 }
135 void post(epoll_op* op);
136 void work_started() noexcept;
137 void work_finished() noexcept;
138
139 private:
140 std::unique_ptr<epoll_socket_state> state_;
141 };
142
143 //--------------------------------------------------------------------------
144 //
145 // Implementation
146 //
147 //--------------------------------------------------------------------------
148
149 // Register an op with the reactor, handling cached edge events.
150 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151 inline void
152 4506 epoll_socket::register_op(
153 epoll_op& op,
154 epoll_op*& desc_slot,
155 bool& ready_flag,
156 bool& cancel_flag) noexcept
157 {
158 4506 svc_.work_started();
159
160 4506 std::lock_guard lock(desc_state_.mutex);
161 4506 bool io_done = false;
162 4506 if (ready_flag)
163 {
164 142 ready_flag = false;
165 142 op.perform_io();
166 142 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167 142 if (!io_done)
168 142 op.errn = 0;
169 }
170
171 4506 if (cancel_flag)
172 {
173 95 cancel_flag = false;
174 95 op.cancelled.store(true, std::memory_order_relaxed);
175 }
176
177 4506 if (io_done || op.cancelled.load(std::memory_order_acquire))
178 {
179 95 svc_.post(&op);
180 95 svc_.work_finished();
181 }
182 else
183 {
184 4411 desc_slot = &op;
185 }
186 4506 }
187
188 inline void
189 105 epoll_op::canceller::operator()() const noexcept
190 {
191 105 op->cancel();
192 105 }
193
194 inline void
195 epoll_connect_op::cancel() noexcept
196 {
197 if (socket_impl_)
198 socket_impl_->cancel_single_op(*this);
199 else
200 request_cancel();
201 }
202
203 inline void
204 99 epoll_read_op::cancel() noexcept
205 {
206 99 if (socket_impl_)
207 99 socket_impl_->cancel_single_op(*this);
208 else
209 request_cancel();
210 99 }
211
212 inline void
213 epoll_write_op::cancel() noexcept
214 {
215 if (socket_impl_)
216 socket_impl_->cancel_single_op(*this);
217 else
218 request_cancel();
219 }
220
221 inline void
222 77728 epoll_op::operator()()
223 {
224 77728 stop_cb.reset();
225
226 77728 socket_impl_->svc_.scheduler().reset_inline_budget();
227
228 77728 if (cancelled.load(std::memory_order_acquire))
229 206 *ec_out = capy::error::canceled;
230 77522 else if (errn != 0)
231 *ec_out = make_err(errn);
232 77522 else if (is_read_operation() && bytes_transferred == 0)
233 *ec_out = capy::error::eof;
234 else
235 77522 *ec_out = {};
236
237 77728 *bytes_out = bytes_transferred;
238
239 // Move to stack before resuming coroutine. The coroutine might close
240 // the socket, releasing the last wrapper ref. If impl_ptr were the
241 // last ref and we destroyed it while still in operator(), we'd have
242 // use-after-free. Moving to local ensures destruction happens at
243 // function exit, after all member accesses are complete.
244 77728 capy::executor_ref saved_ex(ex);
245 77728 std::coroutine_handle<> saved_h(h);
246 77728 auto prevent_premature_destruction = std::move(impl_ptr);
247 77728 dispatch_coro(saved_ex, saved_h).resume();
248 77728 }
249
250 inline void
251 4304 epoll_connect_op::operator()()
252 {
253 4304 stop_cb.reset();
254
255 4304 socket_impl_->svc_.scheduler().reset_inline_budget();
256
257 4304 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258
259 // Cache endpoints on successful connect
260 4304 if (success && socket_impl_)
261 {
262 4302 endpoint local_ep;
263 4302 sockaddr_storage local_storage{};
264 4302 socklen_t local_len = sizeof(local_storage);
265 4302 if (::getsockname(
266 fd, reinterpret_cast<sockaddr*>(&local_storage),
267 4302 &local_len) == 0)
268 4302 local_ep = from_sockaddr(local_storage);
269 4302 static_cast<epoll_socket*>(socket_impl_)
270 4302 ->set_endpoints(local_ep, target_endpoint);
271 }
272
273 4304 if (cancelled.load(std::memory_order_acquire))
274 *ec_out = capy::error::canceled;
275 4304 else if (errn != 0)
276 2 *ec_out = make_err(errn);
277 else
278 4302 *ec_out = {};
279
280 // Move to stack before resuming. See epoll_op::operator()() for rationale.
281 4304 capy::executor_ref saved_ex(ex);
282 4304 std::coroutine_handle<> saved_h(h);
283 4304 auto prevent_premature_destruction = std::move(impl_ptr);
284 4304 dispatch_coro(saved_ex, saved_h).resume();
285 4304 }
286
287 12969 inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288 12969 : svc_(svc)
289 {
290 12969 }
291
292 12969 inline epoll_socket::~epoll_socket() = default;
293
294 inline std::coroutine_handle<>
295 4304 epoll_socket::connect(
296 std::coroutine_handle<> h,
297 capy::executor_ref ex,
298 endpoint ep,
299 std::stop_token token,
300 std::error_code* ec)
301 {
302 4304 auto& op = conn_;
303
304 4304 sockaddr_storage storage{};
305 socklen_t addrlen =
306 4304 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 int result =
308 4304 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309
310 4304 if (result == 0)
311 {
312 sockaddr_storage local_storage{};
313 socklen_t local_len = sizeof(local_storage);
314 if (::getsockname(
315 fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 &local_len) == 0)
317 local_endpoint_ = detail::from_sockaddr(local_storage);
318 remote_endpoint_ = ep;
319 }
320
321 4304 if (result == 0 || errno != EINPROGRESS)
322 {
323 int err = (result < 0) ? errno : 0;
324 if (svc_.scheduler().try_consume_inline_budget())
325 {
326 *ec = err ? make_err(err) : std::error_code{};
327 return dispatch_coro(ex, h);
328 }
329 op.reset();
330 op.h = h;
331 op.ex = ex;
332 op.ec_out = ec;
333 op.fd = fd_;
334 op.target_endpoint = ep;
335 op.start(token, this);
336 op.impl_ptr = shared_from_this();
337 op.complete(err, 0);
338 svc_.post(&op);
339 return std::noop_coroutine();
340 }
341
342 // EINPROGRESS — register with reactor
343 4304 op.reset();
344 4304 op.h = h;
345 4304 op.ex = ex;
346 4304 op.ec_out = ec;
347 4304 op.fd = fd_;
348 4304 op.target_endpoint = ep;
349 4304 op.start(token, this);
350 4304 op.impl_ptr = shared_from_this();
351
352 4304 register_op(
353 4304 op, desc_state_.connect_op, desc_state_.write_ready,
354 4304 desc_state_.connect_cancel_pending);
355 4304 return std::noop_coroutine();
356 }
357
358 inline std::coroutine_handle<>
359 194197 epoll_socket::read_some(
360 std::coroutine_handle<> h,
361 capy::executor_ref ex,
362 io_buffer_param param,
363 std::stop_token token,
364 std::error_code* ec,
365 std::size_t* bytes_out)
366 {
367 194197 auto& op = rd_;
368 194197 op.reset();
369
370 194197 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371 194197 op.iovec_count =
372 194197 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373
374 194197 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 {
376 1 op.empty_buffer_read = true;
377 1 op.h = h;
378 1 op.ex = ex;
379 1 op.ec_out = ec;
380 1 op.bytes_out = bytes_out;
381 1 op.start(token, this);
382 1 op.impl_ptr = shared_from_this();
383 1 op.complete(0, 0);
384 1 svc_.post(&op);
385 1 return std::noop_coroutine();
386 }
387
388 388392 for (int i = 0; i < op.iovec_count; ++i)
389 {
390 194196 op.iovecs[i].iov_base = bufs[i].data();
391 194196 op.iovecs[i].iov_len = bufs[i].size();
392 }
393
394 // Speculative read
395 ssize_t n;
396 do
397 {
398 194196 n = ::readv(fd_, op.iovecs, op.iovec_count);
399 }
400 194196 while (n < 0 && errno == EINTR);
401
402 194196 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 {
404 193994 int err = (n < 0) ? errno : 0;
405 193994 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406
407 193994 if (svc_.scheduler().try_consume_inline_budget())
408 {
409 155241 if (err)
410 *ec = make_err(err);
411 155241 else if (n == 0)
412 5 *ec = capy::error::eof;
413 else
414 155236 *ec = {};
415 155241 *bytes_out = bytes;
416 155241 return dispatch_coro(ex, h);
417 }
418 38753 op.h = h;
419 38753 op.ex = ex;
420 38753 op.ec_out = ec;
421 38753 op.bytes_out = bytes_out;
422 38753 op.start(token, this);
423 38753 op.impl_ptr = shared_from_this();
424 38753 op.complete(err, bytes);
425 38753 svc_.post(&op);
426 38753 return std::noop_coroutine();
427 }
428
429 // EAGAIN — register with reactor
430 202 op.h = h;
431 202 op.ex = ex;
432 202 op.ec_out = ec;
433 202 op.bytes_out = bytes_out;
434 202 op.fd = fd_;
435 202 op.start(token, this);
436 202 op.impl_ptr = shared_from_this();
437
438 202 register_op(
439 202 op, desc_state_.read_op, desc_state_.read_ready,
440 202 desc_state_.read_cancel_pending);
441 202 return std::noop_coroutine();
442 }
443
444 inline std::coroutine_handle<>
445 193997 epoll_socket::write_some(
446 std::coroutine_handle<> h,
447 capy::executor_ref ex,
448 io_buffer_param param,
449 std::stop_token token,
450 std::error_code* ec,
451 std::size_t* bytes_out)
452 {
453 193997 auto& op = wr_;
454 193997 op.reset();
455
456 193997 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457 193997 op.iovec_count =
458 193997 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459
460 193997 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461 {
462 1 op.h = h;
463 1 op.ex = ex;
464 1 op.ec_out = ec;
465 1 op.bytes_out = bytes_out;
466 1 op.start(token, this);
467 1 op.impl_ptr = shared_from_this();
468 1 op.complete(0, 0);
469 1 svc_.post(&op);
470 1 return std::noop_coroutine();
471 }
472
473 387992 for (int i = 0; i < op.iovec_count; ++i)
474 {
475 193996 op.iovecs[i].iov_base = bufs[i].data();
476 193996 op.iovecs[i].iov_len = bufs[i].size();
477 }
478
479 // Speculative write
480 193996 msghdr msg{};
481 193996 msg.msg_iov = op.iovecs;
482 193996 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483
484 ssize_t n;
485 do
486 {
487 193996 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488 }
489 193996 while (n < 0 && errno == EINTR);
490
491 193996 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492 {
493 193996 int err = (n < 0) ? errno : 0;
494 193996 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495
496 193996 if (svc_.scheduler().try_consume_inline_budget())
497 {
498 155225 *ec = err ? make_err(err) : std::error_code{};
499 155225 *bytes_out = bytes;
500 155225 return dispatch_coro(ex, h);
501 }
502 38771 op.h = h;
503 38771 op.ex = ex;
504 38771 op.ec_out = ec;
505 38771 op.bytes_out = bytes_out;
506 38771 op.start(token, this);
507 38771 op.impl_ptr = shared_from_this();
508 38771 op.complete(err, bytes);
509 38771 svc_.post(&op);
510 38771 return std::noop_coroutine();
511 }
512
513 // EAGAIN — register with reactor
514 op.h = h;
515 op.ex = ex;
516 op.ec_out = ec;
517 op.bytes_out = bytes_out;
518 op.fd = fd_;
519 op.start(token, this);
520 op.impl_ptr = shared_from_this();
521
522 register_op(
523 op, desc_state_.write_op, desc_state_.write_ready,
524 desc_state_.write_cancel_pending);
525 return std::noop_coroutine();
526 }
527
528 inline std::error_code
529 3 epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530 {
531 int how;
532 3 switch (what)
533 {
534 1 case tcp_socket::shutdown_receive:
535 1 how = SHUT_RD;
536 1 break;
537 1 case tcp_socket::shutdown_send:
538 1 how = SHUT_WR;
539 1 break;
540 1 case tcp_socket::shutdown_both:
541 1 how = SHUT_RDWR;
542 1 break;
543 default:
544 return make_err(EINVAL);
545 }
546 3 if (::shutdown(fd_, how) != 0)
547 return make_err(errno);
548 3 return {};
549 }
550
551 inline std::error_code
552 32 epoll_socket::set_option(
553 int level, int optname,
554 void const* data, std::size_t size) noexcept
555 {
556 32 if (::setsockopt(fd_, level, optname, data,
557 32 static_cast<socklen_t>(size)) != 0)
558 return make_err(errno);
559 32 return {};
560 }
561
562 inline std::error_code
563 31 epoll_socket::get_option(
564 int level, int optname,
565 void* data, std::size_t* size) const noexcept
566 {
567 31 socklen_t len = static_cast<socklen_t>(*size);
568 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
569 return make_err(errno);
570 31 *size = static_cast<std::size_t>(len);
571 31 return {};
572 }
573
574 inline void
575 187 epoll_socket::cancel() noexcept
576 {
577 187 auto self = weak_from_this().lock();
578 187 if (!self)
579 return;
580
581 187 conn_.request_cancel();
582 187 rd_.request_cancel();
583 187 wr_.request_cancel();
584
585 187 epoll_op* conn_claimed = nullptr;
586 187 epoll_op* rd_claimed = nullptr;
587 187 epoll_op* wr_claimed = nullptr;
588 {
589 187 std::lock_guard lock(desc_state_.mutex);
590 187 if (desc_state_.connect_op == &conn_)
591 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592 else
593 187 desc_state_.connect_cancel_pending = true;
594 187 if (desc_state_.read_op == &rd_)
595 3 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596 else
597 184 desc_state_.read_cancel_pending = true;
598 187 if (desc_state_.write_op == &wr_)
599 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600 else
601 187 desc_state_.write_cancel_pending = true;
602 187 }
603
604 187 if (conn_claimed)
605 {
606 conn_.impl_ptr = self;
607 svc_.post(&conn_);
608 svc_.work_finished();
609 }
610 187 if (rd_claimed)
611 {
612 3 rd_.impl_ptr = self;
613 3 svc_.post(&rd_);
614 3 svc_.work_finished();
615 }
616 187 if (wr_claimed)
617 {
618 wr_.impl_ptr = self;
619 svc_.post(&wr_);
620 svc_.work_finished();
621 }
622 187 }
623
624 inline void
625 99 epoll_socket::cancel_single_op(epoll_op& op) noexcept
626 {
627 99 auto self = weak_from_this().lock();
628 99 if (!self)
629 return;
630
631 99 op.request_cancel();
632
633 99 epoll_op** desc_op_ptr = nullptr;
634 99 if (&op == &conn_)
635 desc_op_ptr = &desc_state_.connect_op;
636 99 else if (&op == &rd_)
637 99 desc_op_ptr = &desc_state_.read_op;
638 else if (&op == &wr_)
639 desc_op_ptr = &desc_state_.write_op;
640
641 99 if (desc_op_ptr)
642 {
643 99 epoll_op* claimed = nullptr;
644 {
645 99 std::lock_guard lock(desc_state_.mutex);
646 99 if (*desc_op_ptr == &op)
647 99 claimed = std::exchange(*desc_op_ptr, nullptr);
648 else if (&op == &conn_)
649 desc_state_.connect_cancel_pending = true;
650 else if (&op == &rd_)
651 desc_state_.read_cancel_pending = true;
652 else if (&op == &wr_)
653 desc_state_.write_cancel_pending = true;
654 99 }
655 99 if (claimed)
656 {
657 99 op.impl_ptr = self;
658 99 svc_.post(&op);
659 99 svc_.work_finished();
660 }
661 }
662 99 }
663
664 inline void
665 38878 epoll_socket::close_socket() noexcept
666 {
667 38878 auto self = weak_from_this().lock();
668 38878 if (self)
669 {
670 38878 conn_.request_cancel();
671 38878 rd_.request_cancel();
672 38878 wr_.request_cancel();
673
674 38878 epoll_op* conn_claimed = nullptr;
675 38878 epoll_op* rd_claimed = nullptr;
676 38878 epoll_op* wr_claimed = nullptr;
677 {
678 38878 std::lock_guard lock(desc_state_.mutex);
679 38878 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680 38878 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
681 38878 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
682 38878 desc_state_.read_ready = false;
683 38878 desc_state_.write_ready = false;
684 38878 desc_state_.read_cancel_pending = false;
685 38878 desc_state_.write_cancel_pending = false;
686 38878 desc_state_.connect_cancel_pending = false;
687 38878 }
688
689 38878 if (conn_claimed)
690 {
691 conn_.impl_ptr = self;
692 svc_.post(&conn_);
693 svc_.work_finished();
694 }
695 38878 if (rd_claimed)
696 {
697 1 rd_.impl_ptr = self;
698 1 svc_.post(&rd_);
699 1 svc_.work_finished();
700 }
701 38878 if (wr_claimed)
702 {
703 wr_.impl_ptr = self;
704 svc_.post(&wr_);
705 svc_.work_finished();
706 }
707
708 38878 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709 97 desc_state_.impl_ref_ = self;
710 }
711
712 38878 if (fd_ >= 0)
713 {
714 8621 if (desc_state_.registered_events != 0)
715 8621 svc_.scheduler().deregister_descriptor(fd_);
716 8621 ::close(fd_);
717 8621 fd_ = -1;
718 }
719
720 38878 desc_state_.fd = -1;
721 38878 desc_state_.registered_events = 0;
722
723 38878 local_endpoint_ = endpoint{};
724 38878 remote_endpoint_ = endpoint{};
725 38878 }
726
727 229 inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728 229 : state_(
729 std::make_unique<epoll_socket_state>(
730 229 ctx.use_service<epoll_scheduler>()))
731 {
732 229 }
733
734 458 inline epoll_socket_service::~epoll_socket_service() {}
735
736 inline void
737 229 epoll_socket_service::shutdown()
738 {
739 229 std::lock_guard lock(state_->mutex_);
740
741 229 while (auto* impl = state_->socket_list_.pop_front())
742 impl->close_socket();
743
744 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745 // drains completed_ops_, calling destroy() on each queued op. If we
746 // released our shared_ptrs now, an epoll_op::destroy() could free the
747 // last ref to an impl whose embedded descriptor_state is still linked
748 // in the queue — use-after-free on the next pop(). Letting ~state_
749 // release the ptrs (during service destruction, after scheduler
750 // shutdown) keeps every impl alive until all ops have been drained.
751 229 }
752
753 inline io_object::implementation*
754 12969 epoll_socket_service::construct()
755 {
756 12969 auto impl = std::make_shared<epoll_socket>(*this);
757 12969 auto* raw = impl.get();
758
759 {
760 12969 std::lock_guard lock(state_->mutex_);
761 12969 state_->socket_list_.push_back(raw);
762 12969 state_->socket_ptrs_.emplace(raw, std::move(impl));
763 12969 }
764
765 12969 return raw;
766 12969 }
767
768 inline void
769 12969 epoll_socket_service::destroy(io_object::implementation* impl)
770 {
771 12969 auto* epoll_impl = static_cast<epoll_socket*>(impl);
772 12969 epoll_impl->close_socket();
773 12969 std::lock_guard lock(state_->mutex_);
774 12969 state_->socket_list_.remove(epoll_impl);
775 12969 state_->socket_ptrs_.erase(epoll_impl);
776 12969 }
777
778 inline std::error_code
779 4319 epoll_socket_service::open_socket(
780 tcp_socket::implementation& impl,
781 int family, int type, int protocol)
782 {
783 4319 auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784 4319 epoll_impl->close_socket();
785
786 4319 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787 4319 if (fd < 0)
788 return make_err(errno);
789
790 4319 if (family == AF_INET6)
791 {
792 5 int one = 1;
793 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794 }
795
796 4319 epoll_impl->fd_ = fd;
797
798 // Register fd with epoll (edge-triggered mode)
799 4319 epoll_impl->desc_state_.fd = fd;
800 {
801 4319 std::lock_guard lock(epoll_impl->desc_state_.mutex);
802 4319 epoll_impl->desc_state_.read_op = nullptr;
803 4319 epoll_impl->desc_state_.write_op = nullptr;
804 4319 epoll_impl->desc_state_.connect_op = nullptr;
805 4319 }
806 4319 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807
808 4319 return {};
809 }
810
811 inline void
812 21590 epoll_socket_service::close(io_object::handle& h)
813 {
814 21590 static_cast<epoll_socket*>(h.get())->close_socket();
815 21590 }
816
817 inline void
818 77724 epoll_socket_service::post(epoll_op* op)
819 {
820 77724 state_->sched_.post(op);
821 77724 }
822
823 inline void
824 4506 epoll_socket_service::work_started() noexcept
825 {
826 4506 state_->sched_.work_started();
827 4506 }
828
829 inline void
830 198 epoll_socket_service::work_finished() noexcept
831 {
832 198 state_->sched_.work_finished();
833 198 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_HAS_EPOLL
838
839 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
840