io_uring: remove submission references
authorPavel Begunkov <asml.silence@gmail.com>
Wed, 11 Aug 2021 18:28:29 +0000 (19:28 +0100)
committerJens Axboe <axboe@kernel.dk>
Mon, 23 Aug 2021 19:10:32 +0000 (13:10 -0600)
Requests are by default given with two references, submission and
completion. Completion references are straightforward, they represent
request ownership and are put when a request is completed or so.
Submission references are a bit more trickier. They're needed when
io_issue_sqe() followed deep into the submission stack (e.g. in fs,
block, drivers, etc.), request may have given away for concurrent
execution or already completed, and the code unwinding back to
io_issue_sqe() may be accessing some pieces of our requests, e.g.
file or iov.

Now, we prevent such async/in-depth completions by pushing requests
through task_work. Punting to io-wq is also done through task_works,
apart from a couple of cases with a pretty well known context. So,
there're two cases:
1) io_issue_sqe() from the task context and protected by ->uring_lock.
Either requests return back to io_uring or handed to task_work, which
won't be executed because we're currently controlling that task. So,
we can be sure that requests are staying alive all the time and we don't
need submission references to pin them.

2) io_issue_sqe() from io-wq, which doesn't hold the mutex. The role of
submission reference is played by io-wq reference, which is put by
io_wq_submit_work(). Hence, it should be fine.

Considering that, we can carefully kill the submission reference.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/6b68f1c763229a590f2a27148aee77767a8d7750.1628705069.git.asml.silence@gmail.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index 9d1c188..6ad6c56 100644 (file)
@@ -1705,7 +1705,6 @@ static inline void io_req_complete(struct io_kiocb *req, long res)
 static void io_req_complete_failed(struct io_kiocb *req, long res)
 {
        req_set_fail(req);
-       io_put_req(req);
        io_req_complete_post(req, res, 0);
 }
 
@@ -1760,7 +1759,14 @@ static bool io_flush_cached_reqs(struct io_ring_ctx *ctx)
        return nr != 0;
 }
 
+/*
+ * A request might get retired back into the request caches even before opcode
+ * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
+ * Because of that, io_alloc_req() should be called only under ->uring_lock
+ * and with extra caution to not get a request that is still worked on.
+ */
 static struct io_kiocb *io_alloc_req(struct io_ring_ctx *ctx)
+       __must_hold(&ctx->uring_lock)
 {
        struct io_submit_state *state = &ctx->submit_state;
        gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
@@ -1875,8 +1881,6 @@ static void io_fail_links(struct io_kiocb *req)
 
                trace_io_uring_fail_link(req, link);
                io_cqring_fill_event(link->ctx, link->user_data, -ECANCELED, 0);
-
-               io_put_req(link);
                io_put_req_deferred(link);
                link = nxt;
        }
@@ -2158,8 +2162,6 @@ static void io_submit_flush_completions(struct io_ring_ctx *ctx)
        for (i = 0; i < nr; i++) {
                struct io_kiocb *req = state->compl_reqs[i];
 
-               /* submission and completion refs */
-               io_put_req(req);
                if (req_ref_put_and_test(req))
                        io_req_free_batch(&rb, req, &ctx->submit_state);
        }
@@ -2264,7 +2266,6 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
                if (READ_ONCE(req->result) == -EAGAIN && resubmit &&
                    !(req->flags & REQ_F_DONT_REISSUE)) {
                        req->iopoll_completed = 0;
-                       req_ref_get(req);
                        io_req_task_queue_reissue(req);
                        continue;
                }
@@ -2762,7 +2763,6 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret,
        if (check_reissue && (req->flags & REQ_F_REISSUE)) {
                req->flags &= ~REQ_F_REISSUE;
                if (io_resubmit_prep(req)) {
-                       req_ref_get(req);
                        io_req_task_queue_reissue(req);
                } else {
                        int cflags = 0;
@@ -3188,9 +3188,6 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
 
        req->rw.kiocb.ki_flags &= ~IOCB_WAITQ;
        list_del_init(&wait->entry);
-
-       /* submit ref gets dropped, acquire a new one */
-       req_ref_get(req);
        io_req_task_queue(req);
        return 1;
 }
@@ -5242,10 +5239,6 @@ static bool io_poll_remove_one(struct io_kiocb *req)
                io_cqring_fill_event(req->ctx, req->user_data, -ECANCELED, 0);
                io_commit_cqring(req->ctx);
                req_set_fail(req);
-
-               /* non-poll requests have submit ref still */
-               if (req->opcode != IORING_OP_POLL_ADD)
-                       io_put_req(req);
                io_put_req_deferred(req);
        }
        return do_complete;
@@ -6280,6 +6273,9 @@ static void io_wq_submit_work(struct io_wq_work *work)
        struct io_kiocb *timeout;
        int ret = 0;
 
+       /* will be dropped by ->io_free_work() after returning to io-wq */
+       req_ref_get(req);
+
        timeout = io_prep_linked_timeout(req);
        if (timeout)
                io_queue_linked_timeout(timeout);
@@ -6302,11 +6298,8 @@ static void io_wq_submit_work(struct io_wq_work *work)
        }
 
        /* avoid locking problems by failing it from a clean context */
-       if (ret) {
-               /* io-wq is going to take one down */
-               req_ref_get(req);
+       if (ret)
                io_req_task_queue_fail(req, ret);
-       }
 }
 
 static inline struct io_fixed_file *io_fixed_file_slot(struct io_file_table *table,
@@ -6448,6 +6441,8 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
            nxt->opcode != IORING_OP_LINK_TIMEOUT)
                return NULL;
 
+       /* linked timeouts should have two refs once prep'ed */
+       req_ref_get(nxt);
        nxt->timeout.head = req;
        nxt->flags |= REQ_F_LTIMEOUT_ACTIVE;
        req->flags |= REQ_F_LINK_TIMEOUT;
@@ -6468,7 +6463,6 @@ issue_sqe:
         * doesn't support non-blocking read/write attempts
         */
        if (likely(!ret)) {
-               /* drop submission reference */
                if (req->flags & REQ_F_COMPLETE_INLINE) {
                        struct io_ring_ctx *ctx = req->ctx;
                        struct io_submit_state *state = &ctx->submit_state;
@@ -6476,8 +6470,6 @@ issue_sqe:
                        state->compl_reqs[state->compl_nr++] = req;
                        if (state->compl_nr == ARRAY_SIZE(state->compl_reqs))
                                io_submit_flush_completions(ctx);
-               } else {
-                       io_put_req(req);
                }
        } else if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                switch (io_arm_poll_handler(req)) {
@@ -6557,8 +6549,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
        req->user_data = READ_ONCE(sqe->user_data);
        req->file = NULL;
        req->fixed_rsrc_refs = NULL;
-       /* one is dropped after submission, the other at completion */
-       atomic_set(&req->refs, 2);
+       atomic_set(&req->refs, 1);
        req->task = current;
 
        /* enforce forwards compatibility on users */