io-wq: add an option to cancel all matched reqs
authorPavel Begunkov <asml.silence@gmail.com>
Mon, 15 Jun 2020 07:24:03 +0000 (10:24 +0300)
committerJens Axboe <axboe@kernel.dk>
Mon, 15 Jun 2020 14:51:34 +0000 (08:51 -0600)
This adds support for cancelling all io-wq works matching a predicate.
It isn't used yet, so no change in observable behaviour.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io-wq.c
fs/io-wq.h
fs/io_uring.c

index 03c7e37..e290202 100644 (file)
@@ -903,13 +903,15 @@ void io_wq_cancel_all(struct io_wq *wq)
 struct io_cb_cancel_data {
        work_cancel_fn *fn;
        void *data;
+       int nr_running;
+       int nr_pending;
+       bool cancel_all;
 };
 
 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 {
        struct io_cb_cancel_data *match = data;
        unsigned long flags;
-       bool ret = false;
 
        /*
         * Hold the lock to avoid ->cur_work going out of scope, caller
@@ -920,55 +922,55 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
            !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
            match->fn(worker->cur_work, match->data)) {
                send_sig(SIGINT, worker->task, 1);
-               ret = true;
+               match->nr_running++;
        }
        spin_unlock_irqrestore(&worker->lock, flags);
 
-       return ret;
+       return match->nr_running && !match->cancel_all;
 }
 
-static bool io_wqe_cancel_pending_work(struct io_wqe *wqe,
+static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
                                       struct io_cb_cancel_data *match)
 {
        struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
        unsigned long flags;
-       bool found = false;
 
+retry:
        spin_lock_irqsave(&wqe->lock, flags);
        wq_list_for_each(node, prev, &wqe->work_list) {
                work = container_of(node, struct io_wq_work, list);
+               if (!match->fn(work, match->data))
+                       continue;
 
-               if (match->fn(work, match->data)) {
-                       wq_list_del(&wqe->work_list, node, prev);
-                       found = true;
-                       break;
-               }
+               wq_list_del(&wqe->work_list, node, prev);
+               spin_unlock_irqrestore(&wqe->lock, flags);
+               io_run_cancel(work, wqe);
+               match->nr_pending++;
+               if (!match->cancel_all)
+                       return;
+
+               /* not safe to continue after unlock */
+               goto retry;
        }
        spin_unlock_irqrestore(&wqe->lock, flags);
-
-       if (found)
-               io_run_cancel(work, wqe);
-       return found;
 }
 
-static bool io_wqe_cancel_running_work(struct io_wqe *wqe,
+static void io_wqe_cancel_running_work(struct io_wqe *wqe,
                                       struct io_cb_cancel_data *match)
 {
-       bool found;
-
        rcu_read_lock();
-       found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
+       io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
        rcu_read_unlock();
-       return found;
 }
 
 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
-                                 void *data)
+                                 void *data, bool cancel_all)
 {
        struct io_cb_cancel_data match = {
-               .fn     = cancel,
-               .data   = data,
+               .fn             = cancel,
+               .data           = data,
+               .cancel_all     = cancel_all,
        };
        int node;
 
@@ -980,7 +982,8 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
        for_each_node(node) {
                struct io_wqe *wqe = wq->wqes[node];
 
-               if (io_wqe_cancel_pending_work(wqe, &match))
+               io_wqe_cancel_pending_work(wqe, &match);
+               if (match.nr_pending && !match.cancel_all)
                        return IO_WQ_CANCEL_OK;
        }
 
@@ -993,10 +996,15 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
        for_each_node(node) {
                struct io_wqe *wqe = wq->wqes[node];
 
-               if (io_wqe_cancel_running_work(wqe, &match))
+               io_wqe_cancel_running_work(wqe, &match);
+               if (match.nr_running && !match.cancel_all)
                        return IO_WQ_CANCEL_RUNNING;
        }
 
+       if (match.nr_running)
+               return IO_WQ_CANCEL_RUNNING;
+       if (match.nr_pending)
+               return IO_WQ_CANCEL_OK;
        return IO_WQ_CANCEL_NOTFOUND;
 }
 
@@ -1007,7 +1015,7 @@ static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data)
 
 enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
 {
-       return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork);
+       return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false);
 }
 
 static bool io_wq_pid_match(struct io_wq_work *work, void *data)
@@ -1021,7 +1029,7 @@ enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid)
 {
        void *data = (void *) (unsigned long) pid;
 
-       return io_wq_cancel_cb(wq, io_wq_pid_match, data);
+       return io_wq_cancel_cb(wq, io_wq_pid_match, data, false);
 }
 
 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
index 8e138fa..7d5bd43 100644 (file)
@@ -130,7 +130,7 @@ enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid);
 typedef bool (work_cancel_fn)(struct io_wq_work *, void *);
 
 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
-                                       void *data);
+                                       void *data, bool cancel_all);
 
 struct task_struct *io_wq_get_task(struct io_wq *wq);
 
index c04b20b..94bd885 100644 (file)
@@ -4773,7 +4773,7 @@ static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
        enum io_wq_cancel cancel_ret;
        int ret = 0;
 
-       cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
+       cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr, false);
        switch (cancel_ret) {
        case IO_WQ_CANCEL_OK:
                ret = 0;