Documentation/admin-guide/cgroup-v2.rst: document why inactive_X + active_X may not...
[linux-2.6-microblaze.git] / fs / io-wq.c
index 9174007..91b85df 100644 (file)
@@ -33,6 +33,7 @@ enum {
 enum {
        IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
        IO_WQ_BIT_CANCEL        = 1,    /* cancel work on list */
+       IO_WQ_BIT_ERROR         = 2,    /* error on setup */
 };
 
 enum {
@@ -56,6 +57,7 @@ struct io_worker {
 
        struct rcu_head rcu;
        struct mm_struct *mm;
+       const struct cred *creds;
        struct files_struct *restore_files;
 };
 
@@ -82,7 +84,7 @@ enum {
 struct io_wqe {
        struct {
                spinlock_t lock;
-               struct list_head work_list;
+               struct io_wq_work_list work_list;
                unsigned long hash_map;
                unsigned flags;
        } ____cacheline_aligned_in_smp;
@@ -103,13 +105,13 @@ struct io_wqe {
 struct io_wq {
        struct io_wqe **wqes;
        unsigned long state;
-       unsigned nr_wqes;
 
        get_work_fn *get_work;
        put_work_fn *put_work;
 
        struct task_struct *manager;
        struct user_struct *user;
+       struct cred *creds;
        struct mm_struct *mm;
        refcount_t refs;
        struct completion done;
@@ -135,6 +137,11 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
 {
        bool dropped_lock = false;
 
+       if (worker->creds) {
+               revert_creds(worker->creds);
+               worker->creds = NULL;
+       }
+
        if (current->files != worker->restore_files) {
                __acquire(&wqe->lock);
                spin_unlock_irq(&wqe->lock);
@@ -229,7 +236,8 @@ static void io_worker_exit(struct io_worker *worker)
 static inline bool io_wqe_run_queue(struct io_wqe *wqe)
        __must_hold(wqe->lock)
 {
-       if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED))
+       if (!wq_list_empty(&wqe->work_list) &&
+           !(wqe->flags & IO_WQE_FLAG_STALLED))
                return true;
        return false;
 }
@@ -327,9 +335,9 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
         * If worker is moving from bound to unbound (or vice versa), then
         * ensure we update the running accounting.
         */
-        worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
-        work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
-        if (worker_bound != work_bound) {
+       worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
+       work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
+       if (worker_bound != work_bound) {
                io_wqe_dec_running(wqe, worker);
                if (work_bound) {
                        worker->flags |= IO_WORKER_F_BOUND;
@@ -368,12 +376,15 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
 static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
        __must_hold(wqe->lock)
 {
+       struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
 
-       list_for_each_entry(work, &wqe->work_list, list) {
+       wq_list_for_each(node, prev, &wqe->work_list) {
+               work = container_of(node, struct io_wq_work, list);
+
                /* not hashed, can run anytime */
                if (!(work->flags & IO_WQ_WORK_HASHED)) {
-                       list_del(&work->list);
+                       wq_node_del(&wqe->work_list, node, prev);
                        return work;
                }
 
@@ -381,7 +392,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
                *hash = work->flags >> IO_WQ_HASH_SHIFT;
                if (!(wqe->hash_map & BIT_ULL(*hash))) {
                        wqe->hash_map |= BIT_ULL(*hash);
-                       list_del(&work->list);
+                       wq_node_del(&wqe->work_list, node, prev);
                        return work;
                }
        }
@@ -409,7 +420,7 @@ static void io_worker_handle_work(struct io_worker *worker)
                work = io_get_next_work(wqe, &hash);
                if (work)
                        __io_worker_busy(wqe, worker, work);
-               else if (!list_empty(&wqe->work_list))
+               else if (!wq_list_empty(&wqe->work_list))
                        wqe->flags |= IO_WQE_FLAG_STALLED;
 
                spin_unlock_irq(&wqe->lock);
@@ -426,6 +437,9 @@ next:
                worker->cur_work = work;
                spin_unlock_irq(&worker->lock);
 
+               if (work->flags & IO_WQ_WORK_CB)
+                       work->func(&work);
+
                if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
                    current->files != work->files) {
                        task_lock(current);
@@ -438,6 +452,8 @@ next:
                        set_fs(USER_DS);
                        worker->mm = wq->mm;
                }
+               if (!worker->creds)
+                       worker->creds = override_creds(wq->creds);
                if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
                        work->flags |= IO_WQ_WORK_CANCEL;
                if (worker->mm)
@@ -514,7 +530,7 @@ static int io_wqe_worker(void *data)
 
        if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
                spin_lock_irq(&wqe->lock);
-               if (!list_empty(&wqe->work_list))
+               if (!wq_list_empty(&wqe->work_list))
                        io_worker_handle_work(worker);
                else
                        spin_unlock_irq(&wqe->lock);
@@ -562,14 +578,14 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
        spin_unlock_irq(&wqe->lock);
 }
 
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 {
        struct io_wqe_acct *acct =&wqe->acct[index];
        struct io_worker *worker;
 
-       worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
+       worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
        if (!worker)
-               return;
+               return false;
 
        refcount_set(&worker->ref, 1);
        worker->nulls_node.pprev = NULL;
@@ -581,7 +597,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
                                "io_wqe_worker-%d/%d", index, wqe->node);
        if (IS_ERR(worker->task)) {
                kfree(worker);
-               return;
+               return false;
        }
 
        spin_lock_irq(&wqe->lock);
@@ -599,6 +615,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
                atomic_inc(&wq->user->processes);
 
        wake_up_process(worker->task);
+       return true;
 }
 
 static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
@@ -606,9 +623,6 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
 {
        struct io_wqe_acct *acct = &wqe->acct[index];
 
-       /* always ensure we have one bounded worker */
-       if (index == IO_WQ_ACCT_BOUND && !acct->nr_workers)
-               return true;
        /* if we have available workers or no work, no need */
        if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
                return false;
@@ -621,12 +635,22 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
 static int io_wq_manager(void *data)
 {
        struct io_wq *wq = data;
+       int workers_to_create = num_possible_nodes();
+       int node;
 
-       while (!kthread_should_stop()) {
-               int i;
+       /* create fixed workers */
+       refcount_set(&wq->refs, workers_to_create);
+       for_each_node(node) {
+               if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
+                       goto err;
+               workers_to_create--;
+       }
+
+       complete(&wq->done);
 
-               for (i = 0; i < wq->nr_wqes; i++) {
-                       struct io_wqe *wqe = wq->wqes[i];
+       while (!kthread_should_stop()) {
+               for_each_node(node) {
+                       struct io_wqe *wqe = wq->wqes[node];
                        bool fork_worker[2] = { false, false };
 
                        spin_lock_irq(&wqe->lock);
@@ -645,6 +669,12 @@ static int io_wq_manager(void *data)
        }
 
        return 0;
+err:
+       set_bit(IO_WQ_BIT_ERROR, &wq->state);
+       set_bit(IO_WQ_BIT_EXIT, &wq->state);
+       if (refcount_sub_and_test(workers_to_create, &wq->refs))
+               complete(&wq->done);
+       return 0;
 }
 
 static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
@@ -688,7 +718,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
        }
 
        spin_lock_irqsave(&wqe->lock, flags);
-       list_add_tail(&work->list, &wqe->work_list);
+       wq_list_add_tail(&work->list, &wqe->work_list);
        wqe->flags &= ~IO_WQE_FLAG_STALLED;
        spin_unlock_irqrestore(&wqe->lock, flags);
 
@@ -750,7 +780,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,
 
 void io_wq_cancel_all(struct io_wq *wq)
 {
-       int i;
+       int node;
 
        set_bit(IO_WQ_BIT_CANCEL, &wq->state);
 
@@ -759,8 +789,8 @@ void io_wq_cancel_all(struct io_wq *wq)
         * to a worker and the worker putting itself on the busy_list
         */
        rcu_read_lock();
-       for (i = 0; i < wq->nr_wqes; i++) {
-               struct io_wqe *wqe = wq->wqes[i];
+       for_each_node(node) {
+               struct io_wqe *wqe = wq->wqes[node];
 
                io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
        }
@@ -803,14 +833,17 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
                .cancel = cancel,
                .caller_data = cancel_data,
        };
+       struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
        unsigned long flags;
        bool found = false;
 
        spin_lock_irqsave(&wqe->lock, flags);
-       list_for_each_entry(work, &wqe->work_list, list) {
+       wq_list_for_each(node, prev, &wqe->work_list) {
+               work = container_of(node, struct io_wq_work, list);
+
                if (cancel(work, cancel_data)) {
-                       list_del(&work->list);
+                       wq_node_del(&wqe->work_list, node, prev);
                        found = true;
                        break;
                }
@@ -833,10 +866,10 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
                                  void *data)
 {
        enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
-       int i;
+       int node;
 
-       for (i = 0; i < wq->nr_wqes; i++) {
-               struct io_wqe *wqe = wq->wqes[i];
+       for_each_node(node) {
+               struct io_wqe *wqe = wq->wqes[node];
 
                ret = io_wqe_cancel_cb_work(wqe, cancel, data);
                if (ret != IO_WQ_CANCEL_NOTFOUND)
@@ -868,6 +901,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
                                            struct io_wq_work *cwork)
 {
+       struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
        unsigned long flags;
        bool found = false;
@@ -880,9 +914,11 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
         * no completion will be posted for it.
         */
        spin_lock_irqsave(&wqe->lock, flags);
-       list_for_each_entry(work, &wqe->work_list, list) {
+       wq_list_for_each(node, prev, &wqe->work_list) {
+               work = container_of(node, struct io_wq_work, list);
+
                if (work == cwork) {
-                       list_del(&work->list);
+                       wq_node_del(&wqe->work_list, node, prev);
                        found = true;
                        break;
                }
@@ -910,10 +946,10 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
 enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
 {
        enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
-       int i;
+       int node;
 
-       for (i = 0; i < wq->nr_wqes; i++) {
-               struct io_wqe *wqe = wq->wqes[i];
+       for_each_node(node) {
+               struct io_wqe *wqe = wq->wqes[node];
 
                ret = io_wqe_cancel_work(wqe, cwork);
                if (ret != IO_WQ_CANCEL_NOTFOUND)
@@ -944,10 +980,10 @@ static void io_wq_flush_func(struct io_wq_work **workptr)
 void io_wq_flush(struct io_wq *wq)
 {
        struct io_wq_flush_data data;
-       int i;
+       int node;
 
-       for (i = 0; i < wq->nr_wqes; i++) {
-               struct io_wqe *wqe = wq->wqes[i];
+       for_each_node(node) {
+               struct io_wqe *wqe = wq->wqes[node];
 
                init_completion(&data.done);
                INIT_IO_WORK(&data.work, io_wq_flush_func);
@@ -957,43 +993,39 @@ void io_wq_flush(struct io_wq *wq)
        }
 }
 
-struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
-                          struct user_struct *user, get_work_fn *get_work,
-                          put_work_fn *put_work)
+struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 {
-       int ret = -ENOMEM, i, node;
+       int ret = -ENOMEM, node;
        struct io_wq *wq;
 
-       wq = kcalloc(1, sizeof(*wq), GFP_KERNEL);
+       wq = kzalloc(sizeof(*wq), GFP_KERNEL);
        if (!wq)
                return ERR_PTR(-ENOMEM);
 
-       wq->nr_wqes = num_online_nodes();
-       wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL);
+       wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
        if (!wq->wqes) {
                kfree(wq);
                return ERR_PTR(-ENOMEM);
        }
 
-       wq->get_work = get_work;
-       wq->put_work = put_work;
+       wq->get_work = data->get_work;
+       wq->put_work = data->put_work;
 
        /* caller must already hold a reference to this */
-       wq->user = user;
+       wq->user = data->user;
+       wq->creds = data->creds;
 
-       i = 0;
-       refcount_set(&wq->refs, wq->nr_wqes);
-       for_each_online_node(node) {
+       for_each_node(node) {
                struct io_wqe *wqe;
 
-               wqe = kcalloc_node(1, sizeof(struct io_wqe), GFP_KERNEL, node);
+               wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
                if (!wqe)
-                       break;
-               wq->wqes[i] = wqe;
+                       goto err;
+               wq->wqes[node] = wqe;
                wqe->node = node;
                wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
                atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
-               if (user) {
+               if (wq->user) {
                        wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
                                        task_rlimit(current, RLIMIT_NPROC);
                }
@@ -1001,33 +1033,36 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
                wqe->node = node;
                wqe->wq = wq;
                spin_lock_init(&wqe->lock);
-               INIT_LIST_HEAD(&wqe->work_list);
+               INIT_WQ_LIST(&wqe->work_list);
                INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
                INIT_HLIST_NULLS_HEAD(&wqe->busy_list, 1);
                INIT_LIST_HEAD(&wqe->all_list);
-
-               i++;
        }
 
        init_completion(&wq->done);
 
-       if (i != wq->nr_wqes)
-               goto err;
-
        /* caller must have already done mmgrab() on this mm */
-       wq->mm = mm;
+       wq->mm = data->mm;
 
        wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
        if (!IS_ERR(wq->manager)) {
                wake_up_process(wq->manager);
+               wait_for_completion(&wq->done);
+               if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
+                       ret = -ENOMEM;
+                       goto err;
+               }
+               reinit_completion(&wq->done);
                return wq;
        }
 
        ret = PTR_ERR(wq->manager);
-       wq->manager = NULL;
-err:
        complete(&wq->done);
-       io_wq_destroy(wq);
+err:
+       for_each_node(node)
+               kfree(wq->wqes[node]);
+       kfree(wq->wqes);
+       kfree(wq);
        return ERR_PTR(ret);
 }
 
@@ -1039,27 +1074,21 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
 
 void io_wq_destroy(struct io_wq *wq)
 {
-       int i;
+       int node;
 
-       if (wq->manager) {
-               set_bit(IO_WQ_BIT_EXIT, &wq->state);
+       set_bit(IO_WQ_BIT_EXIT, &wq->state);
+       if (wq->manager)
                kthread_stop(wq->manager);
-       }
 
        rcu_read_lock();
-       for (i = 0; i < wq->nr_wqes; i++) {
-               struct io_wqe *wqe = wq->wqes[i];
-
-               if (!wqe)
-                       continue;
-               io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
-       }
+       for_each_node(node)
+               io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
        rcu_read_unlock();
 
        wait_for_completion(&wq->done);
 
-       for (i = 0; i < wq->nr_wqes; i++)
-               kfree(wq->wqes[i]);
+       for_each_node(node)
+               kfree(wq->wqes[node]);
        kfree(wq->wqes);
        kfree(wq);
 }