Merge branch 'linus' of git://git.kernel.org/pub/scm/linux/kernel/git/herbert/crypto-2.6
[linux-2.6-microblaze.git] / fs / io-wq.c
index cb60a42..0a5ab1a 100644 (file)
@@ -16,6 +16,7 @@
 #include <linux/slab.h>
 #include <linux/kthread.h>
 #include <linux/rculist_nulls.h>
+#include <linux/fs_struct.h>
 
 #include "io-wq.h"
 
@@ -59,6 +60,7 @@ struct io_worker {
        const struct cred *cur_creds;
        const struct cred *saved_creds;
        struct files_struct *restore_files;
+       struct fs_struct *restore_fs;
 };
 
 #if BITS_PER_LONG == 64
@@ -151,6 +153,9 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
                task_unlock(current);
        }
 
+       if (current->fs != worker->restore_fs)
+               current->fs = worker->restore_fs;
+
        /*
         * If we have an active mm, we need to drop the wq lock before unusing
         * it. If we do, return true and let the caller retry the idle loop.
@@ -311,6 +316,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
 
        worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
        worker->restore_files = current->files;
+       worker->restore_fs = current->fs;
        io_wqe_inc_running(wqe, worker);
 }
 
@@ -481,6 +487,8 @@ next:
                        current->files = work->files;
                        task_unlock(current);
                }
+               if (work->fs && current->fs != work->fs)
+                       current->fs = work->fs;
                if (work->mm != worker->mm)
                        io_wq_switch_mm(worker, work);
                if (worker->cur_creds != work->creds)
@@ -691,11 +699,16 @@ static int io_wq_manager(void *data)
        /* create fixed workers */
        refcount_set(&wq->refs, workers_to_create);
        for_each_node(node) {
+               if (!node_online(node))
+                       continue;
                if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
                        goto err;
                workers_to_create--;
        }
 
+       while (workers_to_create--)
+               refcount_dec(&wq->refs);
+
        complete(&wq->done);
 
        while (!kthread_should_stop()) {
@@ -703,6 +716,9 @@ static int io_wq_manager(void *data)
                        struct io_wqe *wqe = wq->wqes[node];
                        bool fork_worker[2] = { false, false };
 
+                       if (!node_online(node))
+                               continue;
+
                        spin_lock_irq(&wqe->lock);
                        if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
                                fork_worker[IO_WQ_ACCT_BOUND] = true;
@@ -821,7 +837,9 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,
 
        list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
                if (io_worker_get(worker)) {
-                       ret = func(worker, data);
+                       /* no task if node is/was offline */
+                       if (worker->task)
+                               ret = func(worker, data);
                        io_worker_release(worker);
                        if (ret)
                                break;
@@ -929,17 +947,19 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
        return ret;
 }
 
+struct work_match {
+       bool (*fn)(struct io_wq_work *, void *data);
+       void *data;
+};
+
 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 {
-       struct io_wq_work *work = data;
+       struct work_match *match = data;
        unsigned long flags;
        bool ret = false;
 
-       if (worker->cur_work != work)
-               return false;
-
        spin_lock_irqsave(&worker->lock, flags);
-       if (worker->cur_work == work &&
+       if (match->fn(worker->cur_work, match->data) &&
            !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) {
                send_sig(SIGINT, worker->task, 1);
                ret = true;
@@ -950,15 +970,13 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 }
 
 static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
-                                           struct io_wq_work *cwork)
+                                           struct work_match *match)
 {
        struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
        unsigned long flags;
        bool found = false;
 
-       cwork->flags |= IO_WQ_WORK_CANCEL;
-
        /*
         * First check pending list, if we're lucky we can just remove it
         * from there. CANCEL_OK means that the work is returned as-new,
@@ -968,7 +986,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
        wq_list_for_each(node, prev, &wqe->work_list) {
                work = container_of(node, struct io_wq_work, list);
 
-               if (work == cwork) {
+               if (match->fn(work, match->data)) {
                        wq_node_del(&wqe->work_list, node, prev);
                        found = true;
                        break;
@@ -989,20 +1007,60 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
         * completion will run normally in this case.
         */
        rcu_read_lock();
-       found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
+       found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
        rcu_read_unlock();
        return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
 }
 
+static bool io_wq_work_match(struct io_wq_work *work, void *data)
+{
+       return work == data;
+}
+
 enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
 {
+       struct work_match match = {
+               .fn     = io_wq_work_match,
+               .data   = cwork
+       };
        enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
        int node;
 
+       cwork->flags |= IO_WQ_WORK_CANCEL;
+
        for_each_node(node) {
                struct io_wqe *wqe = wq->wqes[node];
 
-               ret = io_wqe_cancel_work(wqe, cwork);
+               ret = io_wqe_cancel_work(wqe, &match);
+               if (ret != IO_WQ_CANCEL_NOTFOUND)
+                       break;
+       }
+
+       return ret;
+}
+
+static bool io_wq_pid_match(struct io_wq_work *work, void *data)
+{
+       pid_t pid = (pid_t) (unsigned long) data;
+
+       if (work)
+               return work->task_pid == pid;
+       return false;
+}
+
+enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid)
+{
+       struct work_match match = {
+               .fn     = io_wq_pid_match,
+               .data   = (void *) (unsigned long) pid
+       };
+       enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
+       int node;
+
+       for_each_node(node) {
+               struct io_wqe *wqe = wq->wqes[node];
+
+               ret = io_wqe_cancel_work(wqe, &match);
                if (ret != IO_WQ_CANCEL_NOTFOUND)
                        break;
        }
@@ -1036,6 +1094,8 @@ void io_wq_flush(struct io_wq *wq)
        for_each_node(node) {
                struct io_wqe *wqe = wq->wqes[node];
 
+               if (!node_online(node))
+                       continue;
                init_completion(&data.done);
                INIT_IO_WORK(&data.work, io_wq_flush_func);
                data.work.flags |= IO_WQ_WORK_INTERNAL;
@@ -1067,12 +1127,15 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 
        for_each_node(node) {
                struct io_wqe *wqe;
+               int alloc_node = node;
 
-               wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
+               if (!node_online(alloc_node))
+                       alloc_node = NUMA_NO_NODE;
+               wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
                if (!wqe)
                        goto err;
                wq->wqes[node] = wqe;
-               wqe->node = node;
+               wqe->node = alloc_node;
                wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
                atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
                if (wq->user) {
@@ -1080,7 +1143,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
                                        task_rlimit(current, RLIMIT_NPROC);
                }
                atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
-               wqe->node = node;
                wqe->wq = wq;
                spin_lock_init(&wqe->lock);
                INIT_WQ_LIST(&wqe->work_list);