ext4: fix potential htree corruption when growing large_dir directories
[linux-2.6-microblaze.git] / fs / io-wq.c
index 4eba531..843d4a7 100644 (file)
@@ -9,15 +9,12 @@
 #include <linux/init.h>
 #include <linux/errno.h>
 #include <linux/sched/signal.h>
-#include <linux/mm.h>
-#include <linux/sched/mm.h>
 #include <linux/percpu.h>
 #include <linux/slab.h>
 #include <linux/rculist_nulls.h>
 #include <linux/cpu.h>
 #include <linux/tracehook.h>
 
-#include "../kernel/sched/sched.h"
 #include "io-wq.h"
 
 #define WORKER_IDLE_TIMEOUT    (5 * HZ)
@@ -68,6 +65,7 @@ struct io_worker {
 struct io_wqe_acct {
        unsigned nr_workers;
        unsigned max_workers;
+       int index;
        atomic_t nr_running;
 };
 
@@ -96,31 +94,29 @@ struct io_wqe {
 
        struct io_wq *wq;
        struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
+
+       cpumask_var_t cpu_mask;
 };
 
 /*
  * Per io_wq state
   */
 struct io_wq {
-       struct io_wqe **wqes;
        unsigned long state;
 
        free_work_fn *free_work;
        io_wq_work_fn *do_work;
 
-       struct task_struct *manager;
-
        struct io_wq_hash *hash;
 
-       refcount_t refs;
-       struct completion exited;
-
        atomic_t worker_refs;
        struct completion worker_done;
 
        struct hlist_node cpuhp_node;
 
-       pid_t task_pid;
+       struct task_struct *task;
+
+       struct io_wqe *wqes[];
 };
 
 static enum cpuhp_state io_wq_online;
@@ -133,8 +129,7 @@ struct io_cb_cancel_data {
        bool cancel_all;
 };
 
-static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
-                                      struct io_cb_cancel_data *match);
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
 
 static bool io_worker_get(struct io_worker *worker)
 {
@@ -147,23 +142,26 @@ static void io_worker_release(struct io_worker *worker)
                complete(&worker->ref_done);
 }
 
+static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
+{
+       return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
+}
+
 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
                                                   struct io_wq_work *work)
 {
-       if (work->flags & IO_WQ_WORK_UNBOUND)
-               return &wqe->acct[IO_WQ_ACCT_UNBOUND];
-
-       return &wqe->acct[IO_WQ_ACCT_BOUND];
+       return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
 }
 
 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
 {
-       struct io_wqe *wqe = worker->wqe;
-
-       if (worker->flags & IO_WORKER_F_BOUND)
-               return &wqe->acct[IO_WQ_ACCT_BOUND];
+       return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
+}
 
-       return &wqe->acct[IO_WQ_ACCT_UNBOUND];
+static void io_worker_ref_put(struct io_wq *wq)
+{
+       if (atomic_dec_and_test(&wq->worker_refs))
+               complete(&wq->worker_done);
 }
 
 static void io_worker_exit(struct io_worker *worker)
@@ -193,8 +191,7 @@ static void io_worker_exit(struct io_worker *worker)
        raw_spin_unlock_irq(&wqe->lock);
 
        kfree_rcu(worker, rcu);
-       if (atomic_dec_and_test(&wqe->wq->worker_refs))
-               complete(&wqe->wq->worker_done);
+       io_worker_ref_put(wqe->wq);
        do_exit(0);
 }
 
@@ -209,7 +206,7 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
 
 /*
  * Check head of free list for an available worker. If one isn't available,
- * caller must wake up the wq manager to create one.
+ * caller must create one.
  */
 static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
        __must_hold(RCU)
@@ -233,7 +230,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 
 /*
  * We need a worker. If we find a free one, we're good. If not, and we're
- * below the max number of workers, wake up the manager to create one.
+ * below the max number of workers, create one.
  */
 static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 {
@@ -243,14 +240,18 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
         * Most likely an attempt to queue unbounded work on an io_wq that
         * wasn't setup with any unbounded workers.
         */
-       WARN_ON_ONCE(!acct->max_workers);
+       if (unlikely(!acct->max_workers))
+               pr_warn_once("io-wq is not configured for unbound workers");
 
        rcu_read_lock();
        ret = io_wqe_activate_free_worker(wqe);
        rcu_read_unlock();
 
-       if (!ret && acct->nr_workers < acct->max_workers)
-               wake_up_process(wqe->wq->manager);
+       if (!ret && acct->nr_workers < acct->max_workers) {
+               atomic_inc(&acct->nr_running);
+               atomic_inc(&wqe->wq->worker_refs);
+               create_io_worker(wqe->wq, wqe, acct->index);
+       }
 }
 
 static void io_wqe_inc_running(struct io_worker *worker)
@@ -260,14 +261,61 @@ static void io_wqe_inc_running(struct io_worker *worker)
        atomic_inc(&acct->nr_running);
 }
 
+struct create_worker_data {
+       struct callback_head work;
+       struct io_wqe *wqe;
+       int index;
+};
+
+static void create_worker_cb(struct callback_head *cb)
+{
+       struct create_worker_data *cwd;
+       struct io_wq *wq;
+
+       cwd = container_of(cb, struct create_worker_data, work);
+       wq = cwd->wqe->wq;
+       create_io_worker(wq, cwd->wqe, cwd->index);
+       kfree(cwd);
+}
+
+static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
+{
+       struct create_worker_data *cwd;
+       struct io_wq *wq = wqe->wq;
+
+       /* raced with exit, just ignore create call */
+       if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
+               goto fail;
+
+       cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
+       if (cwd) {
+               init_task_work(&cwd->work, create_worker_cb);
+               cwd->wqe = wqe;
+               cwd->index = acct->index;
+               if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
+                       return;
+
+               kfree(cwd);
+       }
+fail:
+       atomic_dec(&acct->nr_running);
+       io_worker_ref_put(wq);
+}
+
 static void io_wqe_dec_running(struct io_worker *worker)
        __must_hold(wqe->lock)
 {
        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
        struct io_wqe *wqe = worker->wqe;
 
-       if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
-               io_wqe_wake_worker(wqe, acct);
+       if (!(worker->flags & IO_WORKER_F_UP))
+               return;
+
+       if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
+               atomic_inc(&acct->nr_running);
+               atomic_inc(&wqe->wq->worker_refs);
+               io_queue_worker_create(wqe, acct);
+       }
 }
 
 /*
@@ -280,6 +328,8 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
 {
        bool worker_bound, work_bound;
 
+       BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
+
        if (worker->flags & IO_WORKER_F_FREE) {
                worker->flags &= ~IO_WORKER_F_FREE;
                hlist_nulls_del_init_rcu(&worker->nulls_node);
@@ -292,16 +342,11 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
        worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
        work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
        if (worker_bound != work_bound) {
+               int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
                io_wqe_dec_running(worker);
-               if (work_bound) {
-                       worker->flags |= IO_WORKER_F_BOUND;
-                       wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
-                       wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
-               } else {
-                       worker->flags &= ~IO_WORKER_F_BOUND;
-                       wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
-                       wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
-               }
+               worker->flags ^= IO_WORKER_F_BOUND;
+               wqe->acct[index].nr_workers--;
+               wqe->acct[index ^ 1].nr_workers++;
                io_wqe_inc_running(worker);
         }
 }
@@ -486,9 +531,8 @@ static int io_wqe_worker(void *data)
        char buf[TASK_COMM_LEN];
 
        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
-       io_wqe_inc_running(worker);
 
-       snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task_pid);
+       snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
        set_task_comm(current, buf);
 
        while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
@@ -516,17 +560,13 @@ loop:
                if (ret)
                        continue;
                /* timed out, exit unless we're the fixed worker */
-               if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
-                   !(worker->flags & IO_WORKER_F_FIXED))
+               if (!(worker->flags & IO_WORKER_F_FIXED))
                        break;
        }
 
        if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
                raw_spin_lock_irq(&wqe->lock);
-               if (!wq_list_empty(&wqe->work_list))
-                       io_worker_handle_work(worker);
-               else
-                       raw_spin_unlock_irq(&wqe->lock);
+               io_worker_handle_work(worker);
        }
 
        io_worker_exit(worker);
@@ -552,8 +592,7 @@ void io_wq_worker_running(struct task_struct *tsk)
 
 /*
  * Called when worker is going to sleep. If there are no workers currently
- * running and we have work pending, wake up a free one or have the manager
- * set one up.
+ * running and we have work pending, wake up a free one or create a new one.
  */
 void io_wq_worker_sleeping(struct task_struct *tsk)
 {
@@ -573,7 +612,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
        raw_spin_unlock_irq(&worker->wqe->lock);
 }
 
-static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 {
        struct io_wqe_acct *acct = &wqe->acct[index];
        struct io_worker *worker;
@@ -583,7 +622,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 
        worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
        if (!worker)
-               return false;
+               goto fail;
 
        refcount_set(&worker->ref, 1);
        worker->nulls_node.pprev = NULL;
@@ -591,19 +630,18 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
        spin_lock_init(&worker->lock);
        init_completion(&worker->ref_done);
 
-       atomic_inc(&wq->worker_refs);
-
        tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
        if (IS_ERR(tsk)) {
-               if (atomic_dec_and_test(&wq->worker_refs))
-                       complete(&wq->worker_done);
                kfree(worker);
-               return false;
+fail:
+               atomic_dec(&acct->nr_running);
+               io_worker_ref_put(wq);
+               return;
        }
 
        tsk->pf_io_worker = worker;
        worker->task = tsk;
-       set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node));
+       set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
        tsk->flags |= PF_NO_SETAFFINITY;
 
        raw_spin_lock_irq(&wqe->lock);
@@ -617,20 +655,6 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
        acct->nr_workers++;
        raw_spin_unlock_irq(&wqe->lock);
        wake_up_new_task(tsk);
-       return true;
-}
-
-static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
-       __must_hold(wqe->lock)
-{
-       struct io_wqe_acct *acct = &wqe->acct[index];
-
-       if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state))
-               return false;
-       /* if we have available workers or no work, no need */
-       if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
-               return false;
-       return acct->nr_workers < acct->max_workers;
 }
 
 /*
@@ -665,93 +689,11 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
        return false;
 }
 
-static void io_wq_check_workers(struct io_wq *wq)
-{
-       int node;
-
-       for_each_node(node) {
-               struct io_wqe *wqe = wq->wqes[node];
-               bool fork_worker[2] = { false, false };
-
-               if (!node_online(node))
-                       continue;
-
-               raw_spin_lock_irq(&wqe->lock);
-               if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
-                       fork_worker[IO_WQ_ACCT_BOUND] = true;
-               if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
-                       fork_worker[IO_WQ_ACCT_UNBOUND] = true;
-               raw_spin_unlock_irq(&wqe->lock);
-               if (fork_worker[IO_WQ_ACCT_BOUND])
-                       create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
-               if (fork_worker[IO_WQ_ACCT_UNBOUND])
-                       create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
-       }
-}
-
 static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
 {
        return true;
 }
 
-static void io_wq_cancel_pending(struct io_wq *wq)
-{
-       struct io_cb_cancel_data match = {
-               .fn             = io_wq_work_match_all,
-               .cancel_all     = true,
-       };
-       int node;
-
-       for_each_node(node)
-               io_wqe_cancel_pending_work(wq->wqes[node], &match);
-}
-
-/*
- * Manager thread. Tasked with creating new workers, if we need them.
- */
-static int io_wq_manager(void *data)
-{
-       struct io_wq *wq = data;
-       char buf[TASK_COMM_LEN];
-       int node;
-
-       snprintf(buf, sizeof(buf), "iou-mgr-%d", wq->task_pid);
-       set_task_comm(current, buf);
-
-       do {
-               set_current_state(TASK_INTERRUPTIBLE);
-               io_wq_check_workers(wq);
-               schedule_timeout(HZ);
-               if (signal_pending(current)) {
-                       struct ksignal ksig;
-
-                       if (!get_signal(&ksig))
-                               continue;
-                       set_bit(IO_WQ_BIT_EXIT, &wq->state);
-               }
-       } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
-
-       io_wq_check_workers(wq);
-
-       rcu_read_lock();
-       for_each_node(node)
-               io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
-       rcu_read_unlock();
-
-       if (atomic_dec_and_test(&wq->worker_refs))
-               complete(&wq->worker_done);
-       wait_for_completion(&wq->worker_done);
-
-       spin_lock_irq(&wq->hash->wait.lock);
-       for_each_node(node)
-               list_del_init(&wq->wqes[node]->wait.entry);
-       spin_unlock_irq(&wq->hash->wait.lock);
-
-       io_wq_cancel_pending(wq);
-       complete(&wq->exited);
-       do_exit(0);
-}
-
 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 {
        struct io_wq *wq = wqe->wq;
@@ -783,39 +725,13 @@ append:
        wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
 }
 
-static int io_wq_fork_manager(struct io_wq *wq)
-{
-       struct task_struct *tsk;
-
-       if (wq->manager)
-               return 0;
-
-       WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state));
-
-       init_completion(&wq->worker_done);
-       atomic_set(&wq->worker_refs, 1);
-       tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
-       if (!IS_ERR(tsk)) {
-               wq->manager = get_task_struct(tsk);
-               wake_up_new_task(tsk);
-               return 0;
-       }
-
-       if (atomic_dec_and_test(&wq->worker_refs))
-               complete(&wq->worker_done);
-
-       return PTR_ERR(tsk);
-}
-
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 {
        struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
        int work_flags;
        unsigned long flags;
 
-       /* Can only happen if manager creation fails after exec */
-       if (io_wq_fork_manager(wqe->wq) ||
-           test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
+       if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
                io_run_cancel(work, wqe);
                return;
        }
@@ -970,39 +886,31 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
                            int sync, void *key)
 {
        struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
-       int ret;
 
        list_del_init(&wait->entry);
 
        rcu_read_lock();
-       ret = io_wqe_activate_free_worker(wqe);
+       io_wqe_activate_free_worker(wqe);
        rcu_read_unlock();
-
-       if (!ret)
-               wake_up_process(wqe->wq->manager);
-
        return 1;
 }
 
 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 {
-       int ret = -ENOMEM, node;
+       int ret, node;
        struct io_wq *wq;
 
        if (WARN_ON_ONCE(!data->free_work || !data->do_work))
                return ERR_PTR(-EINVAL);
+       if (WARN_ON_ONCE(!bounded))
+               return ERR_PTR(-EINVAL);
 
-       wq = kzalloc(sizeof(*wq), GFP_KERNEL);
+       wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
        if (!wq)
                return ERR_PTR(-ENOMEM);
-
-       wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
-       if (!wq->wqes)
-               goto err_wq;
-
        ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
        if (ret)
-               goto err_wqes;
+               goto err_wq;
 
        refcount_inc(&data->hash->refs);
        wq->hash = data->hash;
@@ -1019,8 +927,13 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
                wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
                if (!wqe)
                        goto err;
+               if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
+                       goto err;
+               cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
                wq->wqes[node] = wqe;
                wqe->node = alloc_node;
+               wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
+               wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
                wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
                atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
                wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
@@ -1035,33 +948,73 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
                INIT_LIST_HEAD(&wqe->all_list);
        }
 
-       wq->task_pid = current->pid;
-       init_completion(&wq->exited);
-       refcount_set(&wq->refs, 1);
-
-       ret = io_wq_fork_manager(wq);
-       if (!ret)
-               return wq;
+       wq->task = get_task_struct(data->task);
+       atomic_set(&wq->worker_refs, 1);
+       init_completion(&wq->worker_done);
+       return wq;
 err:
        io_wq_put_hash(data->hash);
        cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
-       for_each_node(node)
+       for_each_node(node) {
+               if (!wq->wqes[node])
+                       continue;
+               free_cpumask_var(wq->wqes[node]->cpu_mask);
                kfree(wq->wqes[node]);
-err_wqes:
-       kfree(wq->wqes);
+       }
 err_wq:
        kfree(wq);
        return ERR_PTR(ret);
 }
 
-static void io_wq_destroy_manager(struct io_wq *wq)
+static bool io_task_work_match(struct callback_head *cb, void *data)
+{
+       struct create_worker_data *cwd;
+
+       if (cb->func != create_worker_cb)
+               return false;
+       cwd = container_of(cb, struct create_worker_data, work);
+       return cwd->wqe->wq == data;
+}
+
+void io_wq_exit_start(struct io_wq *wq)
+{
+       set_bit(IO_WQ_BIT_EXIT, &wq->state);
+}
+
+static void io_wq_exit_workers(struct io_wq *wq)
 {
-       if (wq->manager) {
-               wake_up_process(wq->manager);
-               wait_for_completion(&wq->exited);
-               put_task_struct(wq->manager);
-               wq->manager = NULL;
+       struct callback_head *cb;
+       int node;
+
+       if (!wq->task)
+               return;
+
+       while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
+               struct create_worker_data *cwd;
+
+               cwd = container_of(cb, struct create_worker_data, work);
+               atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
+               io_worker_ref_put(wq);
+               kfree(cwd);
        }
+
+       rcu_read_lock();
+       for_each_node(node) {
+               struct io_wqe *wqe = wq->wqes[node];
+
+               io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
+       }
+       rcu_read_unlock();
+       io_worker_ref_put(wq);
+       wait_for_completion(&wq->worker_done);
+
+       for_each_node(node) {
+               spin_lock_irq(&wq->hash->wait.lock);
+               list_del_init(&wq->wqes[node]->wait.entry);
+               spin_unlock_irq(&wq->hash->wait.lock);
+       }
+       put_task_struct(wq->task);
+       wq->task = NULL;
 }
 
 static void io_wq_destroy(struct io_wq *wq)
@@ -1070,9 +1023,6 @@ static void io_wq_destroy(struct io_wq *wq)
 
        cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
 
-       set_bit(IO_WQ_BIT_EXIT, &wq->state);
-       io_wq_destroy_manager(wq);
-
        for_each_node(node) {
                struct io_wqe *wqe = wq->wqes[node];
                struct io_cb_cancel_data match = {
@@ -1080,47 +1030,79 @@ static void io_wq_destroy(struct io_wq *wq)
                        .cancel_all     = true,
                };
                io_wqe_cancel_pending_work(wqe, &match);
+               free_cpumask_var(wqe->cpu_mask);
                kfree(wqe);
        }
        io_wq_put_hash(wq->hash);
-       kfree(wq->wqes);
        kfree(wq);
 }
 
-void io_wq_put(struct io_wq *wq)
-{
-       if (refcount_dec_and_test(&wq->refs))
-               io_wq_destroy(wq);
-}
-
 void io_wq_put_and_exit(struct io_wq *wq)
 {
-       set_bit(IO_WQ_BIT_EXIT, &wq->state);
-       io_wq_destroy_manager(wq);
-       io_wq_put(wq);
+       WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
+
+       io_wq_exit_workers(wq);
+       io_wq_destroy(wq);
 }
 
+struct online_data {
+       unsigned int cpu;
+       bool online;
+};
+
 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
 {
-       struct task_struct *task = worker->task;
-       struct rq_flags rf;
-       struct rq *rq;
-
-       rq = task_rq_lock(task, &rf);
-       do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node));
-       task->flags |= PF_NO_SETAFFINITY;
-       task_rq_unlock(rq, task, &rf);
+       struct online_data *od = data;
+
+       if (od->online)
+               cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
+       else
+               cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
        return false;
 }
 
+static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
+{
+       struct online_data od = {
+               .cpu = cpu,
+               .online = online
+       };
+       int i;
+
+       rcu_read_lock();
+       for_each_node(i)
+               io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
+       rcu_read_unlock();
+       return 0;
+}
+
 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
 {
        struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
+
+       return __io_wq_cpu_online(wq, cpu, true);
+}
+
+static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
+{
+       struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
+
+       return __io_wq_cpu_online(wq, cpu, false);
+}
+
+int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
+{
        int i;
 
        rcu_read_lock();
-       for_each_node(i)
-               io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL);
+       for_each_node(i) {
+               struct io_wqe *wqe = wq->wqes[i];
+
+               if (mask)
+                       cpumask_copy(wqe->cpu_mask, mask);
+               else
+                       cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
+       }
        rcu_read_unlock();
        return 0;
 }
@@ -1130,7 +1112,7 @@ static __init int io_wq_init(void)
        int ret;
 
        ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
-                                       io_wq_cpu_online, NULL);
+                                       io_wq_cpu_online, io_wq_cpu_offline);
        if (ret < 0)
                return ret;
        io_wq_online = ret;