io_uring: calculate CQEs from the user visible value
authorDylan Yudaken <dylany@meta.com>
Tue, 8 Nov 2022 15:30:16 +0000 (07:30 -0800)
committerJens Axboe <axboe@kernel.dk>
Tue, 8 Nov 2022 17:36:15 +0000 (10:36 -0700)
io_cqring_wait (and it's wake function io_has_work) used cached_cq_tail in
order to calculate the number of CQEs. cached_cq_tail is set strictly
before the user visible rings->cq.tail

However as far as userspace is concerned,  if io_uring_enter(2) is called
with a minimum number of events, they will verify by checking
rings->cq.tail.

It is therefore possible for io_uring_enter(2) to return early with fewer
events visible to the user.

Instead make the wait functions read from the user visible value, so there
will be no discrepency.

This is triggered eventually by the following reproducer:

struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
unsigned int cqe_ready;
struct io_uring ring;
int ret, i;

ret = io_uring_queue_init(N, &ring, 0);
assert(!ret);
while(true) {
for (i = 0; i < N; i++) {
sqe = io_uring_get_sqe(&ring);
io_uring_prep_nop(sqe);
sqe->flags |= IOSQE_ASYNC;
}
ret = io_uring_submit(&ring);
assert(ret == N);

do {
ret = io_uring_wait_cqes(&ring, &cqe, N, NULL, NULL);
} while(ret == -EINTR);
cqe_ready = io_uring_cq_ready(&ring);
assert(!ret);
assert(cqe_ready == N);
io_uring_cq_advance(&ring, N);
}

Fixes: ad3eb2c89fb2 ("io_uring: split overflow state into SQ and CQ side")
Signed-off-by: Dylan Yudaken <dylany@meta.com>
Link: https://lore.kernel.org/r/20221108153016.1854297-1-dylany@meta.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
io_uring/io_uring.c

index ac8c488..4a1e482 100644 (file)
@@ -176,6 +176,11 @@ static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
        return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
 }
 
+static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
+{
+       return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
+}
+
 static bool io_match_linked(struct io_kiocb *head)
 {
        struct io_kiocb *req;
@@ -2315,7 +2320,7 @@ static inline bool io_has_work(struct io_ring_ctx *ctx)
 static inline bool io_should_wake(struct io_wait_queue *iowq)
 {
        struct io_ring_ctx *ctx = iowq->ctx;
-       int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
+       int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
 
        /*
         * Wake up if we have enough events, or if a timeout occurred since we
@@ -2399,7 +2404,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
                        return ret;
                io_cqring_overflow_flush(ctx);
 
-               if (io_cqring_events(ctx) >= min_events)
+               /* if user messes with these they will just get an early return */
+               if (__io_cqring_events_user(ctx) >= min_events)
                        return 0;
        } while (ret > 0);