io_uring: fix task_work cap overshooting
authorJens Axboe <axboe@kernel.dk>
Tue, 26 Nov 2024 20:42:27 +0000 (13:42 -0700)
committerJens Axboe <axboe@kernel.dk>
Tue, 26 Nov 2024 20:42:27 +0000 (13:42 -0700)
A previous commit fixed task_work overrunning by a lot more than what
the user asked for, by adding a retry list. However, it didn't cap the
overall count, hence for multiple task_work runs inside the same wait
loop, it'd still overshoot the target by potentially a large amount.

Cap it generally inside the wait path. Note that this will still
overshoot the default limit of 20, but should overshoot by no more than
limit-1 in addition to the limit. That still provides a ceiling over how
much task_work will be run, rather than still having gaps where it was
uncapped essentially.

Fixes: f46b9cdb22f7 ("io_uring: limit local tw done")
Signed-off-by: Jens Axboe <axboe@kernel.dk>
io_uring/io_uring.c

index bfa9388..ae199e4 100644 (file)
@@ -1277,6 +1277,8 @@ static int __io_run_local_work_loop(struct llist_node **node,
                                    struct io_tw_state *ts,
                                    int events)
 {
+       int ret = 0;
+
        while (*node) {
                struct llist_node *next = (*node)->next;
                struct io_kiocb *req = container_of(*node, struct io_kiocb,
@@ -1285,27 +1287,27 @@ static int __io_run_local_work_loop(struct llist_node **node,
                                io_poll_task_func, io_req_rw_complete,
                                req, ts);
                *node = next;
-               if (--events <= 0)
+               if (++ret >= events)
                        break;
        }
 
-       return events;
+       return ret;
 }
 
 static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
-                              int min_events)
+                              int min_events, int max_events)
 {
        struct llist_node *node;
        unsigned int loops = 0;
-       int ret, limit;
+       int ret = 0;
 
        if (WARN_ON_ONCE(ctx->submitter_task != current))
                return -EEXIST;
        if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
                atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
-       limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
 again:
-       ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
+       min_events -= ret;
+       ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, max_events);
        if (ctx->retry_llist.first)
                goto retry_done;
 
@@ -1314,11 +1316,10 @@ again:
         * running the pending items.
         */
        node = llist_reverse_order(llist_del_all(&ctx->work_llist));
-       ret = __io_run_local_work_loop(&node, ts, ret);
+       ret += __io_run_local_work_loop(&node, ts, max_events - ret);
        ctx->retry_llist.first = node;
        loops++;
 
-       ret = limit - ret;
        if (io_run_local_work_continue(ctx, ret, min_events))
                goto again;
 retry_done:
@@ -1337,16 +1338,18 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
 
        if (!io_local_work_pending(ctx))
                return 0;
-       return __io_run_local_work(ctx, &ts, min_events);
+       return __io_run_local_work(ctx, &ts, min_events,
+                                       max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
 }
 
-static int io_run_local_work(struct io_ring_ctx *ctx, int min_events)
+static int io_run_local_work(struct io_ring_ctx *ctx, int min_events,
+                            int max_events)
 {
        struct io_tw_state ts = {};
        int ret;
 
        mutex_lock(&ctx->uring_lock);
-       ret = __io_run_local_work(ctx, &ts, min_events);
+       ret = __io_run_local_work(ctx, &ts, min_events, max_events);
        mutex_unlock(&ctx->uring_lock);
        return ret;
 }
@@ -2352,7 +2355,7 @@ int io_run_task_work_sig(struct io_ring_ctx *ctx)
 {
        if (io_local_work_pending(ctx)) {
                __set_current_state(TASK_RUNNING);
-               if (io_run_local_work(ctx, INT_MAX) > 0)
+               if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
                        return 0;
        }
        if (io_run_task_work() > 0)
@@ -2515,7 +2518,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
        if (!io_allowed_run_tw(ctx))
                return -EEXIST;
        if (io_local_work_pending(ctx))
-               io_run_local_work(ctx, min_events);
+               io_run_local_work(ctx, min_events,
+                                 max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
        io_run_task_work();
 
        if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
@@ -2586,7 +2590,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
                 * now rather than let the caller do another wait loop.
                 */
                if (io_local_work_pending(ctx))
-                       io_run_local_work(ctx, nr_wait);
+                       io_run_local_work(ctx, nr_wait, nr_wait);
                io_run_task_work();
 
                /*
@@ -3098,7 +3102,7 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
 
        if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
            io_allowed_defer_tw_run(ctx))
-               ret |= io_run_local_work(ctx, INT_MAX) > 0;
+               ret |= io_run_local_work(ctx, INT_MAX, INT_MAX) > 0;
        ret |= io_cancel_defer_files(ctx, tctx, cancel_all);
        mutex_lock(&ctx->uring_lock);
        ret |= io_poll_remove_all(ctx, tctx, cancel_all);