io-wq: wait for manager exit on wq destroy
[linux-2.6-microblaze.git] / fs / io-wq.c
index 44e2024..7a1d51c 100644 (file)
@@ -56,6 +56,7 @@ struct io_worker {
        const struct cred *saved_creds;
 
        struct completion ref_done;
+       struct completion started;
 
        struct rcu_head rcu;
 };
@@ -117,7 +118,11 @@ struct io_wq {
        struct io_wq_hash *hash;
 
        refcount_t refs;
-       struct completion done;
+       struct completion started;
+       struct completion exited;
+
+       atomic_t worker_refs;
+       struct completion worker_done;
 
        struct hlist_node cpuhp_node;
 
@@ -188,7 +193,8 @@ static void io_worker_exit(struct io_worker *worker)
        raw_spin_unlock_irq(&wqe->lock);
 
        kfree_rcu(worker, rcu);
-       io_wq_put(wqe->wq);
+       if (atomic_dec_and_test(&wqe->wq->worker_refs))
+               complete(&wqe->wq->worker_done);
 }
 
 static inline bool io_wqe_run_queue(struct io_wqe *wqe)
@@ -267,6 +273,7 @@ static void io_worker_start(struct io_worker *worker)
 {
        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
        io_wqe_inc_running(worker);
+       complete(&worker->started);
 }
 
 /*
@@ -644,18 +651,21 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
        worker->wqe = wqe;
        spin_lock_init(&worker->lock);
        init_completion(&worker->ref_done);
+       init_completion(&worker->started);
 
-       refcount_inc(&wq->refs);
+       atomic_inc(&wq->worker_refs);
 
        if (index == IO_WQ_ACCT_BOUND)
                pid = io_wq_fork_thread(task_thread_bound, worker);
        else
                pid = io_wq_fork_thread(task_thread_unbound, worker);
        if (pid < 0) {
-               io_wq_put(wq);
+               if (atomic_dec_and_test(&wq->worker_refs))
+                       complete(&wq->worker_done);
                kfree(worker);
                return false;
        }
+       wait_for_completion(&worker->started);
        return true;
 }
 
@@ -664,6 +674,8 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
 {
        struct io_wqe_acct *acct = &wqe->acct[index];
 
+       if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state))
+               return false;
        /* if we have available workers or no work, no need */
        if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
                return false;
@@ -732,13 +744,14 @@ static int io_wq_manager(void *data)
 {
        struct io_wq *wq = data;
        char buf[TASK_COMM_LEN];
+       int node;
 
        sprintf(buf, "iou-mgr-%d", wq->task_pid);
        set_task_comm(current, buf);
        current->flags |= PF_IO_WORKER;
        wq->manager = current;
 
-       complete(&wq->done);
+       complete(&wq->started);
 
        do {
                set_current_state(TASK_INTERRUPTIBLE);
@@ -749,7 +762,17 @@ static int io_wq_manager(void *data)
        } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
 
        io_wq_check_workers(wq);
+
+       rcu_read_lock();
+       for_each_node(node)
+               io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
+       rcu_read_unlock();
+
+       /* we might not ever have created any workers */
+       if (atomic_read(&wq->worker_refs))
+               wait_for_completion(&wq->worker_done);
        wq->manager = NULL;
+       complete(&wq->exited);
        io_wq_put(wq);
        do_exit(0);
 }
@@ -792,13 +815,14 @@ static int io_wq_fork_manager(struct io_wq *wq)
        if (wq->manager)
                return 0;
 
+       reinit_completion(&wq->worker_done);
        clear_bit(IO_WQ_BIT_EXIT, &wq->state);
        refcount_inc(&wq->refs);
        current->flags |= PF_IO_WORKER;
        ret = io_wq_fork_thread(io_wq_manager, wq);
        current->flags &= ~PF_IO_WORKER;
        if (ret >= 0) {
-               wait_for_completion(&wq->done);
+               wait_for_completion(&wq->started);
                return 0;
        }
 
@@ -1043,9 +1067,13 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
        }
 
        wq->task_pid = current->pid;
-       init_completion(&wq->done);
+       init_completion(&wq->started);
+       init_completion(&wq->exited);
        refcount_set(&wq->refs, 1);
 
+       init_completion(&wq->worker_done);
+       atomic_set(&wq->worker_refs, 0);
+
        ret = io_wq_fork_manager(wq);
        if (!ret)
                return wq;
@@ -1070,13 +1098,10 @@ static void io_wq_destroy(struct io_wq *wq)
        cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
 
        set_bit(IO_WQ_BIT_EXIT, &wq->state);
-       if (wq->manager)
+       if (wq->manager) {
                wake_up_process(wq->manager);
-
-       rcu_read_lock();
-       for_each_node(node)
-               io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
-       rcu_read_unlock();
+               wait_for_completion(&wq->exited);
+       }
 
        spin_lock_irq(&wq->hash->wait.lock);
        for_each_node(node) {