cancelable ssize_t poll1 stop_token const token int fd short event int

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
cancelable<ssize_t> poll1(stop_token const &token, int fd, short event, int timeout) {
fd_stop_token fd_token;
stop_callback guard(token, [&fd_token]{
fd_token.cancel();
});
if (token.stop_requested()) {
return make_canceled<ssize_t>();
}
pollfd fds[2] = { };
fds[0].fd = fd;
fds[0].events = event;
fds[1].fd = fd_token.get_fd();
fds[1].events = POLLIN;
auto ret = poll(fds, 2, timeout);
if (ret == 0) {
return 0;
}
if (ret < 0) { // On error, -1 is returned, and errno is set appropriately.
return -1;
}
if (fds[1].revents & POLLIN) {
return make_canceled<ssize_t>();
}
// On success, a positive number is returned; this is the number of
// structures which have nonzero revents fields (in other words, those
// descriptors with events or errors reported)
return 1;
}
template<class Function, class ...Args>
cancelable<ssize_t> blocking_sysio(Function &&f, short event, stop_token const &token, int fd, Args &&...args) {
int flags = fcntl(fd, F_GETFD);
if (flags & O_NONBLOCK) {
// если получили неблокирующий fd
// то проверяем не прервали ли нашу операцию?
if (token.stop_requested()) {
return make_canceled<ssize_t>();
}
if constexpr (std::is_invocable_v<std::decay_t<Function>>) {
return std::invoke(std::forward<Function>(f), fd, std::forward<Args>(args)...);
}
return 0;
}
// блокирующий fd -> создаем неблокирующий
int new_fd = CHECKED(dup(fd));
scope_guard {
close(new_fd);
};
CHECKED(fcntl(new_fd, F_SETFD, flags | O_NONBLOCK | O_CLOEXEC)); // старые флаги + O_NONBLOCK
int ret = 0;
do {
auto res = detail::poll1(token, new_fd, event, -1); // poll проверит запросили ли остановку.
if (res.was_canceled()) {
return res.canceled();
}
Expects(res.value() > 0);
ret = std::invoke(std::forward<Function>(f), new_fd, std::forward<Args>(args)...);
// если это EAGAIN, значит кто-то наши данные уже считал
// -> идем ждать дальше
} while (ret == -1 && errno == EAGAIN);
return ret;
}
cancelable<ssize_t> read(stop_token const &token, int fd, void *buf, size_t count) {
return detail::blocking_sysio(detail::read, POLLIN, token, fd, buf, count);
}