Merge tag 'x86_urgent_for_v5.14_rc6' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-2.6-microblaze.git] / fs / io-wq.c
index cf086b0..7d2ed8c 100644 (file)
@@ -129,7 +129,8 @@ struct io_cb_cancel_data {
        bool cancel_all;
 };
 
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first);
+static void io_wqe_dec_running(struct io_worker *worker);
 
 static bool io_worker_get(struct io_worker *worker)
 {
@@ -168,26 +169,21 @@ static void io_worker_exit(struct io_worker *worker)
 {
        struct io_wqe *wqe = worker->wqe;
        struct io_wqe_acct *acct = io_wqe_get_acct(worker);
-       unsigned flags;
 
        if (refcount_dec_and_test(&worker->ref))
                complete(&worker->ref_done);
        wait_for_completion(&worker->ref_done);
 
-       preempt_disable();
-       current->flags &= ~PF_IO_WORKER;
-       flags = worker->flags;
-       worker->flags = 0;
-       if (flags & IO_WORKER_F_RUNNING)
-               atomic_dec(&acct->nr_running);
-       worker->flags = 0;
-       preempt_enable();
-
        raw_spin_lock_irq(&wqe->lock);
-       if (flags & IO_WORKER_F_FREE)
+       if (worker->flags & IO_WORKER_F_FREE)
                hlist_nulls_del_rcu(&worker->nulls_node);
        list_del_rcu(&worker->all_list);
        acct->nr_workers--;
+       preempt_disable();
+       io_wqe_dec_running(worker);
+       worker->flags = 0;
+       current->flags &= ~PF_IO_WORKER;
+       preempt_enable();
        raw_spin_unlock_irq(&wqe->lock);
 
        kfree_rcu(worker, rcu);
@@ -214,15 +210,19 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
        struct hlist_nulls_node *n;
        struct io_worker *worker;
 
-       n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
-       if (is_a_nulls(n))
-               return false;
-
-       worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
-       if (io_worker_get(worker)) {
-               wake_up_process(worker->task);
+       /*
+        * Iterate free_list and see if we can find an idle worker to
+        * activate. If a given worker is on the free_list but in the process
+        * of exiting, keep trying.
+        */
+       hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
+               if (!io_worker_get(worker))
+                       continue;
+               if (wake_up_process(worker->task)) {
+                       io_worker_release(worker);
+                       return true;
+               }
                io_worker_release(worker);
-               return true;
        }
 
        return false;
@@ -247,10 +247,21 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
        ret = io_wqe_activate_free_worker(wqe);
        rcu_read_unlock();
 
-       if (!ret && acct->nr_workers < acct->max_workers) {
-               atomic_inc(&acct->nr_running);
-               atomic_inc(&wqe->wq->worker_refs);
-               create_io_worker(wqe->wq, wqe, acct->index);
+       if (!ret) {
+               bool do_create = false, first = false;
+
+               raw_spin_lock_irq(&wqe->lock);
+               if (acct->nr_workers < acct->max_workers) {
+                       atomic_inc(&acct->nr_running);
+                       atomic_inc(&wqe->wq->worker_refs);
+                       if (!acct->nr_workers)
+                               first = true;
+                       acct->nr_workers++;
+                       do_create = true;
+               }
+               raw_spin_unlock_irq(&wqe->lock);
+               if (do_create)
+                       create_io_worker(wqe->wq, wqe, acct->index, first);
        }
 }
 
@@ -271,10 +282,28 @@ static void create_worker_cb(struct callback_head *cb)
 {
        struct create_worker_data *cwd;
        struct io_wq *wq;
+       struct io_wqe *wqe;
+       struct io_wqe_acct *acct;
+       bool do_create = false, first = false;
 
        cwd = container_of(cb, struct create_worker_data, work);
-       wq = cwd->wqe->wq;
-       create_io_worker(wq, cwd->wqe, cwd->index);
+       wqe = cwd->wqe;
+       wq = wqe->wq;
+       acct = &wqe->acct[cwd->index];
+       raw_spin_lock_irq(&wqe->lock);
+       if (acct->nr_workers < acct->max_workers) {
+               if (!acct->nr_workers)
+                       first = true;
+               acct->nr_workers++;
+               do_create = true;
+       }
+       raw_spin_unlock_irq(&wqe->lock);
+       if (do_create) {
+               create_io_worker(wq, wqe, cwd->index, first);
+       } else {
+               atomic_dec(&acct->nr_running);
+               io_worker_ref_put(wq);
+       }
        kfree(cwd);
 }
 
@@ -612,7 +641,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
        raw_spin_unlock_irq(&worker->wqe->lock);
 }
 
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
 {
        struct io_wqe_acct *acct = &wqe->acct[index];
        struct io_worker *worker;
@@ -635,6 +664,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
                kfree(worker);
 fail:
                atomic_dec(&acct->nr_running);
+               raw_spin_lock_irq(&wqe->lock);
+               acct->nr_workers--;
+               raw_spin_unlock_irq(&wqe->lock);
                io_worker_ref_put(wq);
                return;
        }
@@ -650,9 +682,8 @@ fail:
        worker->flags |= IO_WORKER_F_FREE;
        if (index == IO_WQ_ACCT_BOUND)
                worker->flags |= IO_WORKER_F_BOUND;
-       if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
+       if (first && (worker->flags & IO_WORKER_F_BOUND))
                worker->flags |= IO_WORKER_F_FIXED;
-       acct->nr_workers++;
        raw_spin_unlock_irq(&wqe->lock);
        wake_up_new_task(tsk);
 }