Skip to content

Commit

Permalink
kqueue: use EVFILT_USER for async if available
Browse files Browse the repository at this point in the history
Establishes a user event for kqueue to eliminate the overhead
of the pipe and the system call read(2) per wakeup event.

---------

Signed-off-by: Andy Pan <[email protected]>

Co-authored-by: Jameson Nash <[email protected]>
  • Loading branch information
2 people authored and saghul committed Aug 13, 2024
1 parent 1eac331 commit 2713454
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 1 deletion.
85 changes: 85 additions & 0 deletions src/unix/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@
#include <sys/eventfd.h>
#endif

#if UV__KQUEUE_EVFILT_USER
static uv_once_t kqueue_runtime_detection_guard = UV_ONCE_INIT;
static int kqueue_evfilt_user_support = 1;


static void uv__kqueue_runtime_detection(void) {
int kq;
struct kevent ev[2];
struct timespec timeout = {0, 0};

/* Perform the runtime detection to ensure that kqueue with
* EVFILT_USER actually works. */
kq = kqueue();
EV_SET(ev, UV__KQUEUE_EVFILT_USER_IDENT, EVFILT_USER,
EV_ADD | EV_CLEAR, 0, 0, 0);
EV_SET(ev + 1, UV__KQUEUE_EVFILT_USER_IDENT, EVFILT_USER,
0, NOTE_TRIGGER, 0, 0);
if (kevent(kq, ev, 2, ev, 1, &timeout) < 1 ||
ev[0].filter != EVFILT_USER ||
ev[0].ident != UV__KQUEUE_EVFILT_USER_IDENT ||
ev[0].flags & EV_ERROR)
/* If we wind up here, we can assume that EVFILT_USER is defined but
* broken on the current system. */
kqueue_evfilt_user_support = 0;
uv__close(kq);
}
#endif

static void uv__async_send(uv_loop_t* loop);
static int uv__async_start(uv_loop_t* loop);
static void uv__cpu_relax(void);
Expand Down Expand Up @@ -142,7 +170,11 @@ static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(w == &loop->async_io_watcher);

#ifndef __linux__
#if UV__KQUEUE_EVFILT_USER
for (;!kqueue_evfilt_user_support;) {
#else
for (;;) {
#endif
r = read(w->fd, buf, sizeof(buf));

if (r == sizeof(buf))
Expand Down Expand Up @@ -209,6 +241,19 @@ static void uv__async_send(uv_loop_t* loop) {
return;
}
#else
#if UV__KQUEUE_EVFILT_USER
struct kevent ev;

if (kqueue_evfilt_user_support) {
fd = loop->async_io_watcher.fd; /* magic number for EVFILT_USER */
EV_SET(&ev, fd, EVFILT_USER, 0, NOTE_TRIGGER, 0, 0);
r = kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL);
if (r == 0)
return;
else
abort();
}
#endif

fd = loop->async_wfd; /* write end of the pipe */
do
Expand All @@ -230,6 +275,9 @@ static void uv__async_send(uv_loop_t* loop) {
static int uv__async_start(uv_loop_t* loop) {
int pipefd[2];
int err;
#if UV__KQUEUE_EVFILT_USER
struct kevent ev;
#endif

if (loop->async_io_watcher.fd != -1)
return 0;
Expand All @@ -241,6 +289,36 @@ static int uv__async_start(uv_loop_t* loop) {

pipefd[0] = err;
pipefd[1] = -1;
#elif UV__KQUEUE_EVFILT_USER
uv_once(&kqueue_runtime_detection_guard, uv__kqueue_runtime_detection);
if (kqueue_evfilt_user_support) {
/* In order not to break the generic pattern of I/O polling, a valid
* file descriptor is required to take up a room in loop->watchers,
* thus we create one for that, but this fd will not be actually used,
* it's just a placeholder and magic number which is going to be closed
* during the cleanup, as other FDs. */
err = uv__open_cloexec("/dev/null", O_RDONLY);
if (err < 0)
return err;

pipefd[0] = err;
pipefd[1] = -1;

/* When using EVFILT_USER event to wake up the kqueue, this event must be
* registered beforehand. Otherwise, calling kevent() to issue an
* unregistered EVFILT_USER event will get an ENOENT.
* Since uv__async_send() may happen before uv__io_poll() with multi-threads,
* we can't defer this registration of EVFILT_USER event as we did for other
* events, but must perform it right away. */
EV_SET(&ev, err, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, 0);
err = kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL);
if (err < 0)
return UV__ERR(errno);
} else {
err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
if (err < 0)
return err;
}
#else
err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
if (err < 0)
Expand All @@ -251,6 +329,13 @@ static int uv__async_start(uv_loop_t* loop) {
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];

#if UV__KQUEUE_EVFILT_USER
/* Prevent the EVFILT_USER event from being added to kqueue redundantly
* and mistakenly later in uv__io_poll(). */
if (kqueue_evfilt_user_support)
loop->async_io_watcher.events = loop->async_io_watcher.pevents;
#endif

return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/unix/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1678,7 +1678,7 @@ int uv_thread_setpriority(uv_thread_t tid, int priority) {
param.sched_priority = prio;
r = pthread_setschedparam(tid, policy, &param);
if (r != 0)
return UV__ERR(errno);
return UV__ERR(errno);
}

return 0;
Expand Down
22 changes: 22 additions & 0 deletions src/unix/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#if defined(__APPLE__) || defined(__DragonFly__) || \
defined(__FreeBSD__) || defined(__NetBSD__)
#include <sys/event.h>
#endif

#define uv__msan_unpoison(p, n) \
do { \
Expand Down Expand Up @@ -496,4 +500,22 @@ int uv__get_constrained_cpu(uv__cpu_constraint* constraint);
#endif
#endif

#if defined(EVFILT_USER) && defined(NOTE_TRIGGER)
/* EVFILT_USER is available since OS X 10.6, DragonFlyBSD 4.0,
* FreeBSD 8.1, and NetBSD 10.0.
*
* Note that even though EVFILT_USER is defined on the current system,
* it may still fail to work at runtime somehow. In that case, we fall
* back to pipe-based signaling.
*/
#define UV__KQUEUE_EVFILT_USER 1
/* Magic number of identifier used for EVFILT_USER during runtime detection.
* There are no Google hits for this number when I create it. That way,
* people will be directed here if this number gets printed due to some
* kqueue error and they google for help. */
#define UV__KQUEUE_EVFILT_USER_IDENT 0x1e7e7711
#else
#define UV__KQUEUE_EVFILT_USER 0
#endif

#endif /* UV_UNIX_INTERNAL_H_ */
11 changes: 11 additions & 0 deletions src/unix/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
continue;
}

#if UV__KQUEUE_EVFILT_USER
if (ev->filter == EVFILT_USER) {
w = &loop->async_io_watcher;
assert(fd == w->fd);
uv__metrics_update_idle_time(loop);
w->cb(loop, w, w->events);
nevents++;
continue;
}
#endif

if (ev->filter == EVFILT_VNODE) {
assert(w->events == POLLIN);
assert(w->pevents == POLLIN);
Expand Down

0 comments on commit 2713454

Please sign in to comment.