io_uring: optimise io_get_cqe()
authorPavel Begunkov <asml.silence@gmail.com>
Tue, 12 Apr 2022 14:09:51 +0000 (15:09 +0100)
committerJens Axboe <axboe@kernel.dk>
Mon, 25 Apr 2022 00:02:46 +0000 (18:02 -0600)
io_get_cqe() is expensive because of a bunch of loads, masking, etc.
However, most of the time we should have enough of entries in the CQ,
so we can cache two pointers representing a range of contiguous CQE
memory we can use. When the range is exhausted we'll go through a slower
path to set up a new range. When there are no CQEs avaliable, pointers
will naturally point to the same address.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/487eeef00f3146537b3d9c1a9cef2fc0b9a86f81.1649771823.git.asml.silence@gmail.com
[axboe: santinel -> sentinel]
Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index c3cadb3..fc47f8f 100644 (file)
@@ -416,6 +416,13 @@ struct io_ring_ctx {
        unsigned long           check_cq_overflow;
 
        struct {
+               /*
+                * We cache a range of free CQEs we can use, once exhausted it
+                * should go through a slower range setup, see __io_get_cqe()
+                */
+               struct io_uring_cqe     *cqe_cached;
+               struct io_uring_cqe     *cqe_sentinel;
+
                unsigned                cached_cq_tail;
                unsigned                cq_entries;
                struct io_ev_fd __rcu   *io_ev_fd;
@@ -1844,21 +1851,38 @@ static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
        return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
 }
 
-static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
+/*
+ * writes to the cq entry need to come after reading head; the
+ * control dependency is enough as we're using WRITE_ONCE to
+ * fill the cq entry
+ */
+static noinline struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
 {
        struct io_rings *rings = ctx->rings;
-       unsigned tail, mask = ctx->cq_entries - 1;
-
-       /*
-        * writes to the cq entry need to come after reading head; the
-        * control dependency is enough as we're using WRITE_ONCE to
-        * fill the cq entry
-        */
-       if (__io_cqring_events(ctx) == ctx->cq_entries)
+       unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
+       unsigned int free, queued, len;
+
+       /* userspace may cheat modifying the tail, be safe and do min */
+       queued = min(__io_cqring_events(ctx), ctx->cq_entries);
+       free = ctx->cq_entries - queued;
+       /* we need a contiguous range, limit based on the current array offset */
+       len = min(free, ctx->cq_entries - off);
+       if (!len)
                return NULL;
 
-       tail = ctx->cached_cq_tail++;
-       return &rings->cqes[tail & mask];
+       ctx->cached_cq_tail++;
+       ctx->cqe_cached = &rings->cqes[off];
+       ctx->cqe_sentinel = ctx->cqe_cached + len;
+       return ctx->cqe_cached++;
+}
+
+static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
+{
+       if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
+               ctx->cached_cq_tail++;
+               return ctx->cqe_cached++;
+       }
+       return __io_get_cqe(ctx);
 }
 
 static void io_eventfd_signal(struct io_ring_ctx *ctx)