io-wq: make worker creation resilient against signals
[linux-2.6-microblaze.git] / fs / io-wq.c
index 12fc193..d80e4a7 100644 (file)
@@ -23,8 +23,7 @@ enum {
        IO_WORKER_F_UP          = 1,    /* up and active */
        IO_WORKER_F_RUNNING     = 2,    /* account as running */
        IO_WORKER_F_FREE        = 4,    /* worker on free list */
-       IO_WORKER_F_FIXED       = 8,    /* static idle worker */
-       IO_WORKER_F_BOUND       = 16,   /* is doing bounded work */
+       IO_WORKER_F_BOUND       = 8,    /* is doing bounded work */
 };
 
 enum {
@@ -32,7 +31,7 @@ enum {
 };
 
 enum {
-       IO_WQE_FLAG_STALLED     = 1,    /* stalled on hash */
+       IO_ACCT_STALLED_BIT     = 0,    /* stalled on hash */
 };
 
 /*
@@ -51,7 +50,14 @@ struct io_worker {
 
        struct completion ref_done;
 
-       struct rcu_head rcu;
+       unsigned long create_state;
+       struct callback_head create_work;
+       int create_index;
+
+       union {
+               struct rcu_head rcu;
+               struct work_struct work;
+       };
 };
 
 #if BITS_PER_LONG == 64
@@ -67,25 +73,24 @@ struct io_wqe_acct {
        unsigned max_workers;
        int index;
        atomic_t nr_running;
+       struct io_wq_work_list work_list;
+       unsigned long flags;
 };
 
 enum {
        IO_WQ_ACCT_BOUND,
        IO_WQ_ACCT_UNBOUND,
+       IO_WQ_ACCT_NR,
 };
 
 /*
  * Per-node worker thread pool
  */
 struct io_wqe {
-       struct {
-               raw_spinlock_t lock;
-               struct io_wq_work_list work_list;
-               unsigned flags;
-       } ____cacheline_aligned_in_smp;
+       raw_spinlock_t lock;
+       struct io_wqe_acct acct[2];
 
        int node;
-       struct io_wqe_acct acct[2];
 
        struct hlist_nulls_head free_list;
        struct list_head all_list;
@@ -129,8 +134,11 @@ struct io_cb_cancel_data {
        bool cancel_all;
 };
 
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
 static void io_wqe_dec_running(struct io_worker *worker);
+static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
+                                       struct io_wqe_acct *acct,
+                                       struct io_cb_cancel_data *match);
 
 static bool io_worker_get(struct io_worker *worker)
 {
@@ -174,7 +182,7 @@ static void io_worker_exit(struct io_worker *worker)
                complete(&worker->ref_done);
        wait_for_completion(&worker->ref_done);
 
-       raw_spin_lock_irq(&wqe->lock);
+       raw_spin_lock(&wqe->lock);
        if (worker->flags & IO_WORKER_F_FREE)
                hlist_nulls_del_rcu(&worker->nulls_node);
        list_del_rcu(&worker->all_list);
@@ -184,18 +192,17 @@ static void io_worker_exit(struct io_worker *worker)
        worker->flags = 0;
        current->flags &= ~PF_IO_WORKER;
        preempt_enable();
-       raw_spin_unlock_irq(&wqe->lock);
+       raw_spin_unlock(&wqe->lock);
 
        kfree_rcu(worker, rcu);
        io_worker_ref_put(wqe->wq);
        do_exit(0);
 }
 
-static inline bool io_wqe_run_queue(struct io_wqe *wqe)
-       __must_hold(wqe->lock)
+static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
 {
-       if (!wq_list_empty(&wqe->work_list) &&
-           !(wqe->flags & IO_WQE_FLAG_STALLED))
+       if (!wq_list_empty(&acct->work_list) &&
+           !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
                return true;
        return false;
 }
@@ -204,7 +211,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
  * Check head of free list for an available worker. If one isn't available,
  * caller must create one.
  */
-static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
+static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
+                                       struct io_wqe_acct *acct)
        __must_hold(RCU)
 {
        struct hlist_nulls_node *n;
@@ -218,6 +226,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
        hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
                if (!io_worker_get(worker))
                        continue;
+               if (io_wqe_get_acct(worker) != acct) {
+                       io_worker_release(worker);
+                       continue;
+               }
                if (wake_up_process(worker->task)) {
                        io_worker_release(worker);
                        return true;
@@ -232,9 +244,9 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
  * We need a worker. If we find a free one, we're good. If not, and we're
  * below the max number of workers, create one.
  */
-static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
+static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 {
-       bool ret;
+       bool do_create = false;
 
        /*
         * Most likely an attempt to queue unbounded work on an io_wq that
@@ -243,24 +255,19 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
        if (unlikely(!acct->max_workers))
                pr_warn_once("io-wq is not configured for unbound workers");
 
-       rcu_read_lock();
-       ret = io_wqe_activate_free_worker(wqe);
-       rcu_read_unlock();
-
-       if (!ret) {
-               bool do_create = false;
-
-               raw_spin_lock_irq(&wqe->lock);
-               if (acct->nr_workers < acct->max_workers) {
-                       atomic_inc(&acct->nr_running);
-                       atomic_inc(&wqe->wq->worker_refs);
-                       acct->nr_workers++;
-                       do_create = true;
-               }
-               raw_spin_unlock_irq(&wqe->lock);
-               if (do_create)
-                       create_io_worker(wqe->wq, wqe, acct->index);
+       raw_spin_lock(&wqe->lock);
+       if (acct->nr_workers < acct->max_workers) {
+               acct->nr_workers++;
+               do_create = true;
        }
+       raw_spin_unlock(&wqe->lock);
+       if (do_create) {
+               atomic_inc(&acct->nr_running);
+               atomic_inc(&wqe->wq->worker_refs);
+               return create_io_worker(wqe->wq, wqe, acct->index);
+       }
+
+       return true;
 }
 
 static void io_wqe_inc_running(struct io_worker *worker)
@@ -270,53 +277,67 @@ static void io_wqe_inc_running(struct io_worker *worker)
        atomic_inc(&acct->nr_running);
 }
 
-struct create_worker_data {
-       struct callback_head work;
-       struct io_wqe *wqe;
-       int index;
-};
-
 static void create_worker_cb(struct callback_head *cb)
 {
-       struct create_worker_data *cwd;
+       struct io_worker *worker;
        struct io_wq *wq;
        struct io_wqe *wqe;
        struct io_wqe_acct *acct;
+       bool do_create = false;
 
-       cwd = container_of(cb, struct create_worker_data, work);
-       wqe = cwd->wqe;
+       worker = container_of(cb, struct io_worker, create_work);
+       wqe = worker->wqe;
        wq = wqe->wq;
-       acct = &wqe->acct[cwd->index];
-       raw_spin_lock_irq(&wqe->lock);
-       if (acct->nr_workers < acct->max_workers)
+       acct = &wqe->acct[worker->create_index];
+       raw_spin_lock(&wqe->lock);
+       if (acct->nr_workers < acct->max_workers) {
                acct->nr_workers++;
-       raw_spin_unlock_irq(&wqe->lock);
-       create_io_worker(wq, cwd->wqe, cwd->index);
-       kfree(cwd);
+               do_create = true;
+       }
+       raw_spin_unlock(&wqe->lock);
+       if (do_create) {
+               create_io_worker(wq, wqe, worker->create_index);
+       } else {
+               atomic_dec(&acct->nr_running);
+               io_worker_ref_put(wq);
+       }
+       clear_bit_unlock(0, &worker->create_state);
+       io_worker_release(worker);
 }
 
-static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
+static bool io_queue_worker_create(struct io_worker *worker,
+                                  struct io_wqe_acct *acct,
+                                  task_work_func_t func)
 {
-       struct create_worker_data *cwd;
+       struct io_wqe *wqe = worker->wqe;
        struct io_wq *wq = wqe->wq;
 
        /* raced with exit, just ignore create call */
        if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
                goto fail;
+       if (!io_worker_get(worker))
+               goto fail;
+       /*
+        * create_state manages ownership of create_work/index. We should
+        * only need one entry per worker, as the worker going to sleep
+        * will trigger the condition, and waking will clear it once it
+        * runs the task_work.
+        */
+       if (test_bit(0, &worker->create_state) ||
+           test_and_set_bit_lock(0, &worker->create_state))
+               goto fail_release;
 
-       cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
-       if (cwd) {
-               init_task_work(&cwd->work, create_worker_cb);
-               cwd->wqe = wqe;
-               cwd->index = acct->index;
-               if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
-                       return;
-
-               kfree(cwd);
-       }
+       init_task_work(&worker->create_work, func);
+       worker->create_index = acct->index;
+       if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
+               return true;
+       clear_bit_unlock(0, &worker->create_state);
+fail_release:
+       io_worker_release(worker);
 fail:
        atomic_dec(&acct->nr_running);
        io_worker_ref_put(wq);
+       return false;
 }
 
 static void io_wqe_dec_running(struct io_worker *worker)
@@ -328,10 +349,10 @@ static void io_wqe_dec_running(struct io_worker *worker)
        if (!(worker->flags & IO_WORKER_F_UP))
                return;
 
-       if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
+       if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
                atomic_inc(&acct->nr_running);
                atomic_inc(&wqe->wq->worker_refs);
-               io_queue_worker_create(wqe, acct);
+               io_queue_worker_create(worker, acct, create_worker_cb);
        }
 }
 
@@ -343,29 +364,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
                             struct io_wq_work *work)
        __must_hold(wqe->lock)
 {
-       bool worker_bound, work_bound;
-
-       BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
-
        if (worker->flags & IO_WORKER_F_FREE) {
                worker->flags &= ~IO_WORKER_F_FREE;
                hlist_nulls_del_init_rcu(&worker->nulls_node);
        }
-
-       /*
-        * If worker is moving from bound to unbound (or vice versa), then
-        * ensure we update the running accounting.
-        */
-       worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
-       work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
-       if (worker_bound != work_bound) {
-               int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
-               io_wqe_dec_running(worker);
-               worker->flags ^= IO_WORKER_F_BOUND;
-               wqe->acct[index].nr_workers--;
-               wqe->acct[index ^ 1].nr_workers++;
-               io_wqe_inc_running(worker);
-        }
 }
 
 /*
@@ -393,7 +395,7 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
 {
        struct io_wq *wq = wqe->wq;
 
-       spin_lock(&wq->hash->wait.lock);
+       spin_lock_irq(&wq->hash->wait.lock);
        if (list_empty(&wqe->wait.entry)) {
                __add_wait_queue(&wq->hash->wait, &wqe->wait);
                if (!test_bit(hash, &wq->hash->map)) {
@@ -401,24 +403,26 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
                        list_del_init(&wqe->wait.entry);
                }
        }
-       spin_unlock(&wq->hash->wait.lock);
+       spin_unlock_irq(&wq->hash->wait.lock);
 }
 
-static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
+static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
+                                          struct io_worker *worker)
        __must_hold(wqe->lock)
 {
        struct io_wq_work_node *node, *prev;
        struct io_wq_work *work, *tail;
        unsigned int stall_hash = -1U;
+       struct io_wqe *wqe = worker->wqe;
 
-       wq_list_for_each(node, prev, &wqe->work_list) {
+       wq_list_for_each(node, prev, &acct->work_list) {
                unsigned int hash;
 
                work = container_of(node, struct io_wq_work, list);
 
                /* not hashed, can run anytime */
                if (!io_wq_is_hashed(work)) {
-                       wq_list_del(&wqe->work_list, node, prev);
+                       wq_list_del(&acct->work_list, node, prev);
                        return work;
                }
 
@@ -429,7 +433,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
                /* hashed, can run if not already running */
                if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
                        wqe->hash_tail[hash] = NULL;
-                       wq_list_cut(&wqe->work_list, &tail->list, prev);
+                       wq_list_cut(&acct->work_list, &tail->list, prev);
                        return work;
                }
                if (stall_hash == -1U)
@@ -439,6 +443,11 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
        }
 
        if (stall_hash != -1U) {
+               /*
+                * Set this before dropping the lock to avoid racing with new
+                * work being added and clearing the stalled bit.
+                */
+               set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
                raw_spin_unlock(&wqe->lock);
                io_wait_on_hash(wqe, stall_hash);
                raw_spin_lock(&wqe->lock);
@@ -465,9 +474,9 @@ static void io_assign_current_work(struct io_worker *worker,
                cond_resched();
        }
 
-       spin_lock_irq(&worker->lock);
+       spin_lock(&worker->lock);
        worker->cur_work = work;
-       spin_unlock_irq(&worker->lock);
+       spin_unlock(&worker->lock);
 }
 
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
@@ -475,6 +484,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 static void io_worker_handle_work(struct io_worker *worker)
        __releases(wqe->lock)
 {
+       struct io_wqe_acct *acct = io_wqe_get_acct(worker);
        struct io_wqe *wqe = worker->wqe;
        struct io_wq *wq = wqe->wq;
        bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
@@ -489,13 +499,11 @@ get_next:
                 * can't make progress, any work completion or insertion will
                 * clear the stalled flag.
                 */
-               work = io_get_next_work(wqe);
+               work = io_get_next_work(acct, worker);
                if (work)
                        __io_worker_busy(wqe, worker, work);
-               else if (!wq_list_empty(&wqe->work_list))
-                       wqe->flags |= IO_WQE_FLAG_STALLED;
 
-               raw_spin_unlock_irq(&wqe->lock);
+               raw_spin_unlock(&wqe->lock);
                if (!work)
                        break;
                io_assign_current_work(worker, work);
@@ -525,26 +533,28 @@ get_next:
 
                        if (hash != -1U && !next_hashed) {
                                clear_bit(hash, &wq->hash->map);
+                               clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
                                if (wq_has_sleeper(&wq->hash->wait))
                                        wake_up(&wq->hash->wait);
-                               raw_spin_lock_irq(&wqe->lock);
-                               wqe->flags &= ~IO_WQE_FLAG_STALLED;
+                               raw_spin_lock(&wqe->lock);
                                /* skip unnecessary unlock-lock wqe->lock */
                                if (!work)
                                        goto get_next;
-                               raw_spin_unlock_irq(&wqe->lock);
+                               raw_spin_unlock(&wqe->lock);
                        }
                } while (work);
 
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
        } while (1);
 }
 
 static int io_wqe_worker(void *data)
 {
        struct io_worker *worker = data;
+       struct io_wqe_acct *acct = io_wqe_get_acct(worker);
        struct io_wqe *wqe = worker->wqe;
        struct io_wq *wq = wqe->wq;
+       bool last_timeout = false;
        char buf[TASK_COMM_LEN];
 
        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
@@ -557,13 +567,20 @@ static int io_wqe_worker(void *data)
 
                set_current_state(TASK_INTERRUPTIBLE);
 loop:
-               raw_spin_lock_irq(&wqe->lock);
-               if (io_wqe_run_queue(wqe)) {
+               raw_spin_lock(&wqe->lock);
+               if (io_acct_run_queue(acct)) {
                        io_worker_handle_work(worker);
                        goto loop;
                }
+               /* timed out, exit unless we're the last worker */
+               if (last_timeout && acct->nr_workers > 1) {
+                       raw_spin_unlock(&wqe->lock);
+                       __set_current_state(TASK_RUNNING);
+                       break;
+               }
+               last_timeout = false;
                __io_worker_idle(wqe, worker);
-               raw_spin_unlock_irq(&wqe->lock);
+               raw_spin_unlock(&wqe->lock);
                if (io_flush_signals())
                        continue;
                ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
@@ -572,17 +589,15 @@ loop:
 
                        if (!get_signal(&ksig))
                                continue;
-                       break;
-               }
-               if (ret)
+                       if (fatal_signal_pending(current))
+                               break;
                        continue;
-               /* timed out, exit unless we're the fixed worker */
-               if (!(worker->flags & IO_WORKER_F_FIXED))
-                       break;
+               }
+               last_timeout = !ret;
        }
 
        if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
                io_worker_handle_work(worker);
        }
 
@@ -624,12 +639,96 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
 
        worker->flags &= ~IO_WORKER_F_RUNNING;
 
-       raw_spin_lock_irq(&worker->wqe->lock);
+       raw_spin_lock(&worker->wqe->lock);
        io_wqe_dec_running(worker);
-       raw_spin_unlock_irq(&worker->wqe->lock);
+       raw_spin_unlock(&worker->wqe->lock);
+}
+
+static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
+                              struct task_struct *tsk)
+{
+       tsk->pf_io_worker = worker;
+       worker->task = tsk;
+       set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
+       tsk->flags |= PF_NO_SETAFFINITY;
+
+       raw_spin_lock(&wqe->lock);
+       hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
+       list_add_tail_rcu(&worker->all_list, &wqe->all_list);
+       worker->flags |= IO_WORKER_F_FREE;
+       raw_spin_unlock(&wqe->lock);
+       wake_up_new_task(tsk);
 }
 
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
+{
+       return true;
+}
+
+static inline bool io_should_retry_thread(long err)
+{
+       switch (err) {
+       case -EAGAIN:
+       case -ERESTARTSYS:
+       case -ERESTARTNOINTR:
+       case -ERESTARTNOHAND:
+               return true;
+       default:
+               return false;
+       }
+}
+
+static void create_worker_cont(struct callback_head *cb)
+{
+       struct io_worker *worker;
+       struct task_struct *tsk;
+       struct io_wqe *wqe;
+
+       worker = container_of(cb, struct io_worker, create_work);
+       clear_bit_unlock(0, &worker->create_state);
+       wqe = worker->wqe;
+       tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+       if (!IS_ERR(tsk)) {
+               io_init_new_worker(wqe, worker, tsk);
+               io_worker_release(worker);
+               return;
+       } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+               struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+
+               atomic_dec(&acct->nr_running);
+               raw_spin_lock(&wqe->lock);
+               acct->nr_workers--;
+               if (!acct->nr_workers) {
+                       struct io_cb_cancel_data match = {
+                               .fn             = io_wq_work_match_all,
+                               .cancel_all     = true,
+                       };
+
+                       while (io_acct_cancel_pending_work(wqe, acct, &match))
+                               raw_spin_lock(&wqe->lock);
+               }
+               raw_spin_unlock(&wqe->lock);
+               io_worker_ref_put(wqe->wq);
+               return;
+       }
+
+       /* re-create attempts grab a new worker ref, drop the existing one */
+       io_worker_release(worker);
+       schedule_work(&worker->work);
+}
+
+static void io_workqueue_create(struct work_struct *work)
+{
+       struct io_worker *worker = container_of(work, struct io_worker, work);
+       struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+
+       if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
+               clear_bit_unlock(0, &worker->create_state);
+               io_worker_release(worker);
+       }
+}
+
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 {
        struct io_wqe_acct *acct = &wqe->acct[index];
        struct io_worker *worker;
@@ -638,42 +737,35 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
        __set_current_state(TASK_RUNNING);
 
        worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
-       if (!worker)
-               goto fail;
-
-       refcount_set(&worker->ref, 1);
-       worker->nulls_node.pprev = NULL;
-       worker->wqe = wqe;
-       spin_lock_init(&worker->lock);
-       init_completion(&worker->ref_done);
-
-       tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
-       if (IS_ERR(tsk)) {
-               kfree(worker);
+       if (!worker) {
 fail:
                atomic_dec(&acct->nr_running);
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
                acct->nr_workers--;
-               raw_spin_unlock_irq(&wqe->lock);
+               raw_spin_unlock(&wqe->lock);
                io_worker_ref_put(wq);
-               return;
+               return false;
        }
 
-       tsk->pf_io_worker = worker;
-       worker->task = tsk;
-       set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
-       tsk->flags |= PF_NO_SETAFFINITY;
+       refcount_set(&worker->ref, 1);
+       worker->wqe = wqe;
+       spin_lock_init(&worker->lock);
+       init_completion(&worker->ref_done);
 
-       raw_spin_lock_irq(&wqe->lock);
-       hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
-       list_add_tail_rcu(&worker->all_list, &wqe->all_list);
-       worker->flags |= IO_WORKER_F_FREE;
        if (index == IO_WQ_ACCT_BOUND)
                worker->flags |= IO_WORKER_F_BOUND;
-       if ((acct->nr_workers == 1) && (worker->flags & IO_WORKER_F_BOUND))
-               worker->flags |= IO_WORKER_F_FIXED;
-       raw_spin_unlock_irq(&wqe->lock);
-       wake_up_new_task(tsk);
+
+       tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+       if (!IS_ERR(tsk)) {
+               io_init_new_worker(wqe, worker, tsk);
+       } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+               goto fail;
+       } else {
+               INIT_WORK(&worker->work, io_workqueue_create);
+               schedule_work(&worker->work);
+       }
+
+       return true;
 }
 
 /*
@@ -708,11 +800,6 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
        return false;
 }
 
-static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
-{
-       return true;
-}
-
 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 {
        struct io_wq *wq = wqe->wq;
@@ -726,12 +813,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 
 static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
 {
+       struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
        unsigned int hash;
        struct io_wq_work *tail;
 
        if (!io_wq_is_hashed(work)) {
 append:
-               wq_list_add_tail(&work->list, &wqe->work_list);
+               wq_list_add_tail(&work->list, &acct->work_list);
                return;
        }
 
@@ -741,14 +829,14 @@ append:
        if (!tail)
                goto append;
 
-       wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
+       wq_list_add_after(&work->list, &tail->list, &acct->work_list);
 }
 
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 {
        struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
-       int work_flags;
-       unsigned long flags;
+       unsigned work_flags = work->flags;
+       bool do_create;
 
        /*
         * If io-wq is exiting for this task, or if the request has explicitly
@@ -756,19 +844,36 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
         */
        if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
            (work->flags & IO_WQ_WORK_CANCEL)) {
+run_cancel:
                io_run_cancel(work, wqe);
                return;
        }
 
-       work_flags = work->flags;
-       raw_spin_lock_irqsave(&wqe->lock, flags);
+       raw_spin_lock(&wqe->lock);
        io_wqe_insert_work(wqe, work);
-       wqe->flags &= ~IO_WQE_FLAG_STALLED;
-       raw_spin_unlock_irqrestore(&wqe->lock, flags);
+       clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+
+       rcu_read_lock();
+       do_create = !io_wqe_activate_free_worker(wqe, acct);
+       rcu_read_unlock();
+
+       raw_spin_unlock(&wqe->lock);
+
+       if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
+           !atomic_read(&acct->nr_running))) {
+               bool did_create;
 
-       if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
-           !atomic_read(&acct->nr_running))
-               io_wqe_wake_worker(wqe, acct);
+               did_create = io_wqe_create_worker(wqe, acct);
+               if (unlikely(!did_create)) {
+                       raw_spin_lock(&wqe->lock);
+                       /* fatal condition, failed to create the first worker */
+                       if (!acct->nr_workers) {
+                               raw_spin_unlock(&wqe->lock);
+                               goto run_cancel;
+                       }
+                       raw_spin_unlock(&wqe->lock);
+               }
+       }
 }
 
 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
@@ -793,19 +898,18 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 {
        struct io_cb_cancel_data *match = data;
-       unsigned long flags;
 
        /*
         * Hold the lock to avoid ->cur_work going out of scope, caller
         * may dereference the passed in work.
         */
-       spin_lock_irqsave(&worker->lock, flags);
+       spin_lock(&worker->lock);
        if (worker->cur_work &&
            match->fn(worker->cur_work, match->data)) {
                set_notify_signal(worker->task);
                match->nr_running++;
        }
-       spin_unlock_irqrestore(&worker->lock, flags);
+       spin_unlock(&worker->lock);
 
        return match->nr_running && !match->cancel_all;
 }
@@ -814,6 +918,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
                                         struct io_wq_work *work,
                                         struct io_wq_work_node *prev)
 {
+       struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
        unsigned int hash = io_get_work_hash(work);
        struct io_wq_work *prev_work = NULL;
 
@@ -825,33 +930,48 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
                else
                        wqe->hash_tail[hash] = NULL;
        }
-       wq_list_del(&wqe->work_list, &work->list, prev);
+       wq_list_del(&acct->work_list, &work->list, prev);
 }
 
-static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
-                                      struct io_cb_cancel_data *match)
+static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
+                                       struct io_wqe_acct *acct,
+                                       struct io_cb_cancel_data *match)
+       __releases(wqe->lock)
 {
        struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
-       unsigned long flags;
 
-retry:
-       raw_spin_lock_irqsave(&wqe->lock, flags);
-       wq_list_for_each(node, prev, &wqe->work_list) {
+       wq_list_for_each(node, prev, &acct->work_list) {
                work = container_of(node, struct io_wq_work, list);
                if (!match->fn(work, match->data))
                        continue;
                io_wqe_remove_pending(wqe, work, prev);
-               raw_spin_unlock_irqrestore(&wqe->lock, flags);
+               raw_spin_unlock(&wqe->lock);
                io_run_cancel(work, wqe);
                match->nr_pending++;
-               if (!match->cancel_all)
-                       return;
-
                /* not safe to continue after unlock */
-               goto retry;
+               return true;
+       }
+
+       return false;
+}
+
+static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
+                                      struct io_cb_cancel_data *match)
+{
+       int i;
+retry:
+       raw_spin_lock(&wqe->lock);
+       for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+               struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
+
+               if (io_acct_cancel_pending_work(wqe, acct, match)) {
+                       if (match->cancel_all)
+                               goto retry;
+                       return;
+               }
        }
-       raw_spin_unlock_irqrestore(&wqe->lock, flags);
+       raw_spin_unlock(&wqe->lock);
 }
 
 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -910,18 +1030,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
                            int sync, void *key)
 {
        struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
+       int i;
 
        list_del_init(&wait->entry);
 
        rcu_read_lock();
-       io_wqe_activate_free_worker(wqe);
+       for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+               struct io_wqe_acct *acct = &wqe->acct[i];
+
+               if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
+                       io_wqe_activate_free_worker(wqe, acct);
+       }
        rcu_read_unlock();
        return 1;
 }
 
 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 {
-       int ret, node;
+       int ret, node, i;
        struct io_wq *wq;
 
        if (WARN_ON_ONCE(!data->free_work || !data->do_work))
@@ -956,18 +1082,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
                cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
                wq->wqes[node] = wqe;
                wqe->node = alloc_node;
-               wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
-               wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
                wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
-               atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
                wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
                                        task_rlimit(current, RLIMIT_NPROC);
-               atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
-               wqe->wait.func = io_wqe_hash_wake;
                INIT_LIST_HEAD(&wqe->wait.entry);
+               wqe->wait.func = io_wqe_hash_wake;
+               for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+                       struct io_wqe_acct *acct = &wqe->acct[i];
+
+                       acct->index = i;
+                       atomic_set(&acct->nr_running, 0);
+                       INIT_WQ_LIST(&acct->work_list);
+               }
                wqe->wq = wq;
                raw_spin_lock_init(&wqe->lock);
-               INIT_WQ_LIST(&wqe->work_list);
                INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
                INIT_LIST_HEAD(&wqe->all_list);
        }
@@ -992,12 +1120,12 @@ err_wq:
 
 static bool io_task_work_match(struct callback_head *cb, void *data)
 {
-       struct create_worker_data *cwd;
+       struct io_worker *worker;
 
-       if (cb->func != create_worker_cb)
+       if (cb->func != create_worker_cb || cb->func != create_worker_cont)
                return false;
-       cwd = container_of(cb, struct create_worker_data, work);
-       return cwd->wqe->wq == data;
+       worker = container_of(cb, struct io_worker, create_work);
+       return worker->wqe->wq == data;
 }
 
 void io_wq_exit_start(struct io_wq *wq)
@@ -1014,12 +1142,13 @@ static void io_wq_exit_workers(struct io_wq *wq)
                return;
 
        while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
-               struct create_worker_data *cwd;
+               struct io_worker *worker;
 
-               cwd = container_of(cb, struct create_worker_data, work);
-               atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
+               worker = container_of(cb, struct io_worker, create_work);
+               atomic_dec(&worker->wqe->acct[worker->create_index].nr_running);
                io_worker_ref_put(wq);
-               kfree(cwd);
+               clear_bit_unlock(0, &worker->create_state);
+               io_worker_release(worker);
        }
 
        rcu_read_lock();
@@ -1131,6 +1260,35 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
        return 0;
 }
 
+/*
+ * Set max number of unbounded workers, returns old value. If new_count is 0,
+ * then just return the old value.
+ */
+int io_wq_max_workers(struct io_wq *wq, int *new_count)
+{
+       int i, node, prev = 0;
+
+       for (i = 0; i < 2; i++) {
+               if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
+                       new_count[i] = task_rlimit(current, RLIMIT_NPROC);
+       }
+
+       rcu_read_lock();
+       for_each_node(node) {
+               struct io_wqe_acct *acct;
+
+               for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+                       acct = &wq->wqes[node]->acct[i];
+                       prev = max_t(int, acct->max_workers, prev);
+                       if (new_count[i])
+                               acct->max_workers = new_count[i];
+                       new_count[i] = prev;
+               }
+       }
+       rcu_read_unlock();
+       return 0;
+}
+
 static __init int io_wq_init(void)
 {
        int ret;