Merge tag 'libnvdimm-for-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/nvdim...
[linux-2.6-microblaze.git] / fs / io-wq.c
index 8cba77a..d80e4a7 100644 (file)
@@ -23,8 +23,7 @@ enum {
        IO_WORKER_F_UP          = 1,    /* up and active */
        IO_WORKER_F_RUNNING     = 2,    /* account as running */
        IO_WORKER_F_FREE        = 4,    /* worker on free list */
-       IO_WORKER_F_FIXED       = 8,    /* static idle worker */
-       IO_WORKER_F_BOUND       = 16,   /* is doing bounded work */
+       IO_WORKER_F_BOUND       = 8,    /* is doing bounded work */
 };
 
 enum {
@@ -55,7 +54,10 @@ struct io_worker {
        struct callback_head create_work;
        int create_index;
 
-       struct rcu_head rcu;
+       union {
+               struct rcu_head rcu;
+               struct work_struct work;
+       };
 };
 
 #if BITS_PER_LONG == 64
@@ -132,8 +134,11 @@ struct io_cb_cancel_data {
        bool cancel_all;
 };
 
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first);
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
 static void io_wqe_dec_running(struct io_worker *worker);
+static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
+                                       struct io_wqe_acct *acct,
+                                       struct io_cb_cancel_data *match);
 
 static bool io_worker_get(struct io_worker *worker)
 {
@@ -239,9 +244,9 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
  * We need a worker. If we find a free one, we're good. If not, and we're
  * below the max number of workers, create one.
  */
-static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
+static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 {
-       bool do_create = false, first = false;
+       bool do_create = false;
 
        /*
         * Most likely an attempt to queue unbounded work on an io_wq that
@@ -252,8 +257,6 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 
        raw_spin_lock(&wqe->lock);
        if (acct->nr_workers < acct->max_workers) {
-               if (!acct->nr_workers)
-                       first = true;
                acct->nr_workers++;
                do_create = true;
        }
@@ -261,8 +264,10 @@ static void io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
        if (do_create) {
                atomic_inc(&acct->nr_running);
                atomic_inc(&wqe->wq->worker_refs);
-               create_io_worker(wqe->wq, wqe, acct->index, first);
+               return create_io_worker(wqe->wq, wqe, acct->index);
        }
+
+       return true;
 }
 
 static void io_wqe_inc_running(struct io_worker *worker)
@@ -278,7 +283,7 @@ static void create_worker_cb(struct callback_head *cb)
        struct io_wq *wq;
        struct io_wqe *wqe;
        struct io_wqe_acct *acct;
-       bool do_create = false, first = false;
+       bool do_create = false;
 
        worker = container_of(cb, struct io_worker, create_work);
        wqe = worker->wqe;
@@ -286,14 +291,12 @@ static void create_worker_cb(struct callback_head *cb)
        acct = &wqe->acct[worker->create_index];
        raw_spin_lock(&wqe->lock);
        if (acct->nr_workers < acct->max_workers) {
-               if (!acct->nr_workers)
-                       first = true;
                acct->nr_workers++;
                do_create = true;
        }
        raw_spin_unlock(&wqe->lock);
        if (do_create) {
-               create_io_worker(wq, wqe, worker->create_index, first);
+               create_io_worker(wq, wqe, worker->create_index);
        } else {
                atomic_dec(&acct->nr_running);
                io_worker_ref_put(wq);
@@ -302,9 +305,11 @@ static void create_worker_cb(struct callback_head *cb)
        io_worker_release(worker);
 }
 
-static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker,
-                                  struct io_wqe_acct *acct)
+static bool io_queue_worker_create(struct io_worker *worker,
+                                  struct io_wqe_acct *acct,
+                                  task_work_func_t func)
 {
+       struct io_wqe *wqe = worker->wqe;
        struct io_wq *wq = wqe->wq;
 
        /* raced with exit, just ignore create call */
@@ -322,16 +327,17 @@ static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker,
            test_and_set_bit_lock(0, &worker->create_state))
                goto fail_release;
 
-       init_task_work(&worker->create_work, create_worker_cb);
+       init_task_work(&worker->create_work, func);
        worker->create_index = acct->index;
        if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
-               return;
+               return true;
        clear_bit_unlock(0, &worker->create_state);
 fail_release:
        io_worker_release(worker);
 fail:
        atomic_dec(&acct->nr_running);
        io_worker_ref_put(wq);
+       return false;
 }
 
 static void io_wqe_dec_running(struct io_worker *worker)
@@ -346,7 +352,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
        if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
                atomic_inc(&acct->nr_running);
                atomic_inc(&wqe->wq->worker_refs);
-               io_queue_worker_create(wqe, worker, acct);
+               io_queue_worker_create(worker, acct, create_worker_cb);
        }
 }
 
@@ -548,6 +554,7 @@ static int io_wqe_worker(void *data)
        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
        struct io_wqe *wqe = worker->wqe;
        struct io_wq *wq = wqe->wq;
+       bool last_timeout = false;
        char buf[TASK_COMM_LEN];
 
        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
@@ -565,6 +572,13 @@ loop:
                        io_worker_handle_work(worker);
                        goto loop;
                }
+               /* timed out, exit unless we're the last worker */
+               if (last_timeout && acct->nr_workers > 1) {
+                       raw_spin_unlock(&wqe->lock);
+                       __set_current_state(TASK_RUNNING);
+                       break;
+               }
+               last_timeout = false;
                __io_worker_idle(wqe, worker);
                raw_spin_unlock(&wqe->lock);
                if (io_flush_signals())
@@ -575,13 +589,11 @@ loop:
 
                        if (!get_signal(&ksig))
                                continue;
-                       break;
-               }
-               if (ret)
+                       if (fatal_signal_pending(current))
+                               break;
                        continue;
-               /* timed out, exit unless we're the fixed worker */
-               if (!(worker->flags & IO_WORKER_F_FIXED))
-                       break;
+               }
+               last_timeout = !ret;
        }
 
        if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
@@ -632,7 +644,91 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
        raw_spin_unlock(&worker->wqe->lock);
 }
 
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
+static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
+                              struct task_struct *tsk)
+{
+       tsk->pf_io_worker = worker;
+       worker->task = tsk;
+       set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
+       tsk->flags |= PF_NO_SETAFFINITY;
+
+       raw_spin_lock(&wqe->lock);
+       hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
+       list_add_tail_rcu(&worker->all_list, &wqe->all_list);
+       worker->flags |= IO_WORKER_F_FREE;
+       raw_spin_unlock(&wqe->lock);
+       wake_up_new_task(tsk);
+}
+
+static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
+{
+       return true;
+}
+
+static inline bool io_should_retry_thread(long err)
+{
+       switch (err) {
+       case -EAGAIN:
+       case -ERESTARTSYS:
+       case -ERESTARTNOINTR:
+       case -ERESTARTNOHAND:
+               return true;
+       default:
+               return false;
+       }
+}
+
+static void create_worker_cont(struct callback_head *cb)
+{
+       struct io_worker *worker;
+       struct task_struct *tsk;
+       struct io_wqe *wqe;
+
+       worker = container_of(cb, struct io_worker, create_work);
+       clear_bit_unlock(0, &worker->create_state);
+       wqe = worker->wqe;
+       tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+       if (!IS_ERR(tsk)) {
+               io_init_new_worker(wqe, worker, tsk);
+               io_worker_release(worker);
+               return;
+       } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+               struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+
+               atomic_dec(&acct->nr_running);
+               raw_spin_lock(&wqe->lock);
+               acct->nr_workers--;
+               if (!acct->nr_workers) {
+                       struct io_cb_cancel_data match = {
+                               .fn             = io_wq_work_match_all,
+                               .cancel_all     = true,
+                       };
+
+                       while (io_acct_cancel_pending_work(wqe, acct, &match))
+                               raw_spin_lock(&wqe->lock);
+               }
+               raw_spin_unlock(&wqe->lock);
+               io_worker_ref_put(wqe->wq);
+               return;
+       }
+
+       /* re-create attempts grab a new worker ref, drop the existing one */
+       io_worker_release(worker);
+       schedule_work(&worker->work);
+}
+
+static void io_workqueue_create(struct work_struct *work)
+{
+       struct io_worker *worker = container_of(work, struct io_worker, work);
+       struct io_wqe_acct *acct = io_wqe_get_acct(worker);
+
+       if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
+               clear_bit_unlock(0, &worker->create_state);
+               io_worker_release(worker);
+       }
+}
+
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 {
        struct io_wqe_acct *acct = &wqe->acct[index];
        struct io_worker *worker;
@@ -641,42 +737,35 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo
        __set_current_state(TASK_RUNNING);
 
        worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
-       if (!worker)
-               goto fail;
-
-       refcount_set(&worker->ref, 1);
-       worker->nulls_node.pprev = NULL;
-       worker->wqe = wqe;
-       spin_lock_init(&worker->lock);
-       init_completion(&worker->ref_done);
-
-       tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
-       if (IS_ERR(tsk)) {
-               kfree(worker);
+       if (!worker) {
 fail:
                atomic_dec(&acct->nr_running);
                raw_spin_lock(&wqe->lock);
                acct->nr_workers--;
                raw_spin_unlock(&wqe->lock);
                io_worker_ref_put(wq);
-               return;
+               return false;
        }
 
-       tsk->pf_io_worker = worker;
-       worker->task = tsk;
-       set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
-       tsk->flags |= PF_NO_SETAFFINITY;
+       refcount_set(&worker->ref, 1);
+       worker->wqe = wqe;
+       spin_lock_init(&worker->lock);
+       init_completion(&worker->ref_done);
 
-       raw_spin_lock(&wqe->lock);
-       hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
-       list_add_tail_rcu(&worker->all_list, &wqe->all_list);
-       worker->flags |= IO_WORKER_F_FREE;
        if (index == IO_WQ_ACCT_BOUND)
                worker->flags |= IO_WORKER_F_BOUND;
-       if (first && (worker->flags & IO_WORKER_F_BOUND))
-               worker->flags |= IO_WORKER_F_FIXED;
-       raw_spin_unlock(&wqe->lock);
-       wake_up_new_task(tsk);
+
+       tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
+       if (!IS_ERR(tsk)) {
+               io_init_new_worker(wqe, worker, tsk);
+       } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
+               goto fail;
+       } else {
+               INIT_WORK(&worker->work, io_workqueue_create);
+               schedule_work(&worker->work);
+       }
+
+       return true;
 }
 
 /*
@@ -711,11 +800,6 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
        return false;
 }
 
-static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
-{
-       return true;
-}
-
 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 {
        struct io_wq *wq = wqe->wq;
@@ -760,6 +844,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
         */
        if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
            (work->flags & IO_WQ_WORK_CANCEL)) {
+run_cancel:
                io_run_cancel(work, wqe);
                return;
        }
@@ -775,8 +860,20 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
        raw_spin_unlock(&wqe->lock);
 
        if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
-           !atomic_read(&acct->nr_running)))
-               io_wqe_create_worker(wqe, acct);
+           !atomic_read(&acct->nr_running))) {
+               bool did_create;
+
+               did_create = io_wqe_create_worker(wqe, acct);
+               if (unlikely(!did_create)) {
+                       raw_spin_lock(&wqe->lock);
+                       /* fatal condition, failed to create the first worker */
+                       if (!acct->nr_workers) {
+                               raw_spin_unlock(&wqe->lock);
+                               goto run_cancel;
+                       }
+                       raw_spin_unlock(&wqe->lock);
+               }
+       }
 }
 
 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
@@ -836,31 +933,42 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
        wq_list_del(&acct->work_list, &work->list, prev);
 }
 
-static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
-                                      struct io_cb_cancel_data *match)
+static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
+                                       struct io_wqe_acct *acct,
+                                       struct io_cb_cancel_data *match)
+       __releases(wqe->lock)
 {
        struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
-       int i;
 
+       wq_list_for_each(node, prev, &acct->work_list) {
+               work = container_of(node, struct io_wq_work, list);
+               if (!match->fn(work, match->data))
+                       continue;
+               io_wqe_remove_pending(wqe, work, prev);
+               raw_spin_unlock(&wqe->lock);
+               io_run_cancel(work, wqe);
+               match->nr_pending++;
+               /* not safe to continue after unlock */
+               return true;
+       }
+
+       return false;
+}
+
+static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
+                                      struct io_cb_cancel_data *match)
+{
+       int i;
 retry:
        raw_spin_lock(&wqe->lock);
        for (i = 0; i < IO_WQ_ACCT_NR; i++) {
                struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
 
-               wq_list_for_each(node, prev, &acct->work_list) {
-                       work = container_of(node, struct io_wq_work, list);
-                       if (!match->fn(work, match->data))
-                               continue;
-                       io_wqe_remove_pending(wqe, work, prev);
-                       raw_spin_unlock(&wqe->lock);
-                       io_run_cancel(work, wqe);
-                       match->nr_pending++;
-                       if (!match->cancel_all)
-                               return;
-
-                       /* not safe to continue after unlock */
-                       goto retry;
+               if (io_acct_cancel_pending_work(wqe, acct, match)) {
+                       if (match->cancel_all)
+                               goto retry;
+                       return;
                }
        }
        raw_spin_unlock(&wqe->lock);
@@ -1014,7 +1122,7 @@ static bool io_task_work_match(struct callback_head *cb, void *data)
 {
        struct io_worker *worker;
 
-       if (cb->func != create_worker_cb)
+       if (cb->func != create_worker_cb || cb->func != create_worker_cont)
                return false;
        worker = container_of(cb, struct io_worker, create_work);
        return worker->wqe->wq == data;