Instead of having to wait separately on workers and manager, just have
the manager wait on the workers. We use an atomic_t for the reference
here, as we need to start at 0 and allow increment from that. Since the
number of workers is naturally capped by the allowed nr of processes,
and that uses an int, there is no risk of overflow.
Signed-off-by: Jens Axboe <axboe@kernel.dk>
refcount_t refs;
struct completion done;
refcount_t refs;
struct completion done;
+ atomic_t worker_refs;
+ struct completion worker_done;
+
struct hlist_node cpuhp_node;
pid_t task_pid;
struct hlist_node cpuhp_node;
pid_t task_pid;
raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu);
raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu);
+ if (atomic_dec_and_test(&wqe->wq->worker_refs))
+ complete(&wqe->wq->worker_done);
}
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
}
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
init_completion(&worker->ref_done);
init_completion(&worker->started);
init_completion(&worker->ref_done);
init_completion(&worker->started);
- refcount_inc(&wq->refs);
+ atomic_inc(&wq->worker_refs);
if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker);
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker);
else
pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) {
+ if (atomic_dec_and_test(&wq->worker_refs))
+ complete(&wq->worker_done);
kfree(worker);
return false;
}
kfree(worker);
return false;
}
{
struct io_wq *wq = data;
char buf[TASK_COMM_LEN];
{
struct io_wq *wq = data;
char buf[TASK_COMM_LEN];
sprintf(buf, "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf);
sprintf(buf, "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf);
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_check_workers(wq);
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_check_workers(wq);
+
+ rcu_read_lock();
+ for_each_node(node)
+ io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
+ rcu_read_unlock();
+
+ /* we might not ever have created any workers */
+ if (atomic_read(&wq->worker_refs))
+ wait_for_completion(&wq->worker_done);
wq->manager = NULL;
io_wq_put(wq);
do_exit(0);
wq->manager = NULL;
io_wq_put(wq);
do_exit(0);
if (wq->manager)
return 0;
if (wq->manager)
return 0;
+ reinit_completion(&wq->worker_done);
clear_bit(IO_WQ_BIT_EXIT, &wq->state);
refcount_inc(&wq->refs);
current->flags |= PF_IO_WORKER;
clear_bit(IO_WQ_BIT_EXIT, &wq->state);
refcount_inc(&wq->refs);
current->flags |= PF_IO_WORKER;
init_completion(&wq->done);
refcount_set(&wq->refs, 1);
init_completion(&wq->done);
refcount_set(&wq->refs, 1);
+ init_completion(&wq->worker_done);
+ atomic_set(&wq->worker_refs, 0);
+
ret = io_wq_fork_manager(wq);
if (!ret)
return wq;
ret = io_wq_fork_manager(wq);
if (!ret)
return wq;
if (wq->manager)
wake_up_process(wq->manager);
if (wq->manager)
wake_up_process(wq->manager);
- rcu_read_lock();
- for_each_node(node)
- io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
- rcu_read_unlock();
-
spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];