ndr: fix translation in ndr_encode_posix_acl()
[linux-2.6-microblaze.git] / fs / io-wq.c
index 7d2ed8c..cd9bd09 100644 (file)
@@ -51,6 +51,10 @@ struct io_worker {
 
        struct completion ref_done;
 
+       unsigned long create_state;
+       struct callback_head create_work;
+       int create_index;
+
        struct rcu_head rcu;
 };
 
@@ -174,7 +178,7 @@ static void io_worker_exit(struct io_worker *worker)
                complete(&worker->ref_done);
        wait_for_completion(&worker->ref_done);
 
-       raw_spin_lock_irq(&wqe->lock);
+       raw_spin_lock(&wqe->lock);
        if (worker->flags & IO_WORKER_F_FREE)
                hlist_nulls_del_rcu(&worker->nulls_node);
        list_del_rcu(&worker->all_list);
@@ -184,7 +188,7 @@ static void io_worker_exit(struct io_worker *worker)
        worker->flags = 0;
        current->flags &= ~PF_IO_WORKER;
        preempt_enable();
-       raw_spin_unlock_irq(&wqe->lock);
+       raw_spin_unlock(&wqe->lock);
 
        kfree_rcu(worker, rcu);
        io_worker_ref_put(wqe->wq);
@@ -250,18 +254,19 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
        if (!ret) {
                bool do_create = false, first = false;
 
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
                if (acct->nr_workers < acct->max_workers) {
-                       atomic_inc(&acct->nr_running);
-                       atomic_inc(&wqe->wq->worker_refs);
                        if (!acct->nr_workers)
                                first = true;
                        acct->nr_workers++;
                        do_create = true;
                }
-               raw_spin_unlock_irq(&wqe->lock);
-               if (do_create)
+               raw_spin_unlock(&wqe->lock);
+               if (do_create) {
+                       atomic_inc(&acct->nr_running);
+                       atomic_inc(&wqe->wq->worker_refs);
                        create_io_worker(wqe->wq, wqe, acct->index, first);
+               }
        }
 }
 
@@ -272,60 +277,63 @@ static void io_wqe_inc_running(struct io_worker *worker)
        atomic_inc(&acct->nr_running);
 }
 
-struct create_worker_data {
-       struct callback_head work;
-       struct io_wqe *wqe;
-       int index;
-};
-
 static void create_worker_cb(struct callback_head *cb)
 {
-       struct create_worker_data *cwd;
+       struct io_worker *worker;
        struct io_wq *wq;
        struct io_wqe *wqe;
        struct io_wqe_acct *acct;
        bool do_create = false, first = false;
 
-       cwd = container_of(cb, struct create_worker_data, work);
-       wqe = cwd->wqe;
+       worker = container_of(cb, struct io_worker, create_work);
+       wqe = worker->wqe;
        wq = wqe->wq;
-       acct = &wqe->acct[cwd->index];
-       raw_spin_lock_irq(&wqe->lock);
+       acct = &wqe->acct[worker->create_index];
+       raw_spin_lock(&wqe->lock);
        if (acct->nr_workers < acct->max_workers) {
                if (!acct->nr_workers)
                        first = true;
                acct->nr_workers++;
                do_create = true;
        }
-       raw_spin_unlock_irq(&wqe->lock);
+       raw_spin_unlock(&wqe->lock);
        if (do_create) {
-               create_io_worker(wq, wqe, cwd->index, first);
+               create_io_worker(wq, wqe, worker->create_index, first);
        } else {
                atomic_dec(&acct->nr_running);
                io_worker_ref_put(wq);
        }
-       kfree(cwd);
+       clear_bit_unlock(0, &worker->create_state);
+       io_worker_release(worker);
 }
 
-static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
+static void io_queue_worker_create(struct io_wqe *wqe, struct io_worker *worker,
+                                  struct io_wqe_acct *acct)
 {
-       struct create_worker_data *cwd;
        struct io_wq *wq = wqe->wq;
 
        /* raced with exit, just ignore create call */
        if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
                goto fail;
+       if (!io_worker_get(worker))
+               goto fail;
+       /*
+        * create_state manages ownership of create_work/index. We should
+        * only need one entry per worker, as the worker going to sleep
+        * will trigger the condition, and waking will clear it once it
+        * runs the task_work.
+        */
+       if (test_bit(0, &worker->create_state) ||
+           test_and_set_bit_lock(0, &worker->create_state))
+               goto fail_release;
 
-       cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
-       if (cwd) {
-               init_task_work(&cwd->work, create_worker_cb);
-               cwd->wqe = wqe;
-               cwd->index = acct->index;
-               if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
-                       return;
-
-               kfree(cwd);
-       }
+       init_task_work(&worker->create_work, create_worker_cb);
+       worker->create_index = acct->index;
+       if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
+               return;
+       clear_bit_unlock(0, &worker->create_state);
+fail_release:
+       io_worker_release(worker);
 fail:
        atomic_dec(&acct->nr_running);
        io_worker_ref_put(wq);
@@ -343,7 +351,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
        if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
                atomic_inc(&acct->nr_running);
                atomic_inc(&wqe->wq->worker_refs);
-               io_queue_worker_create(wqe, acct);
+               io_queue_worker_create(wqe, worker, acct);
        }
 }
 
@@ -416,7 +424,28 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
        spin_unlock(&wq->hash->wait.lock);
 }
 
-static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
+/*
+ * We can always run the work if the worker is currently the same type as
+ * the work (eg both are bound, or both are unbound). If they are not the
+ * same, only allow it if incrementing the worker count would be allowed.
+ */
+static bool io_worker_can_run_work(struct io_worker *worker,
+                                  struct io_wq_work *work)
+{
+       struct io_wqe_acct *acct;
+
+       if (!(worker->flags & IO_WORKER_F_BOUND) !=
+           !(work->flags & IO_WQ_WORK_UNBOUND))
+               return true;
+
+       /* not the same type, check if we'd go over the limit */
+       acct = io_work_get_acct(worker->wqe, work);
+       return acct->nr_workers < acct->max_workers;
+}
+
+static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
+                                          struct io_worker *worker,
+                                          bool *stalled)
        __must_hold(wqe->lock)
 {
        struct io_wq_work_node *node, *prev;
@@ -428,6 +457,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
 
                work = container_of(node, struct io_wq_work, list);
 
+               if (!io_worker_can_run_work(worker, work))
+                       break;
+
                /* not hashed, can run anytime */
                if (!io_wq_is_hashed(work)) {
                        wq_list_del(&wqe->work_list, node, prev);
@@ -454,6 +486,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
                raw_spin_unlock(&wqe->lock);
                io_wait_on_hash(wqe, stall_hash);
                raw_spin_lock(&wqe->lock);
+               *stalled = true;
        }
 
        return NULL;
@@ -477,9 +510,9 @@ static void io_assign_current_work(struct io_worker *worker,
                cond_resched();
        }
 
-       spin_lock_irq(&worker->lock);
+       spin_lock(&worker->lock);
        worker->cur_work = work;
-       spin_unlock_irq(&worker->lock);
+       spin_unlock(&worker->lock);
 }
 
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
@@ -493,6 +526,7 @@ static void io_worker_handle_work(struct io_worker *worker)
 
        do {
                struct io_wq_work *work;
+               bool stalled;
 get_next:
                /*
                 * If we got some work, mark us as busy. If we didn't, but
@@ -501,13 +535,14 @@ get_next:
                 * can't make progress, any work completion or insertion will
                 * clear the stalled flag.
                 */
-               work = io_get_next_work(wqe);
+               stalled = false;
+               work = io_get_next_work(wqe, worker, &stalled);
                if (work)
                        __io_worker_busy(wqe, worker, work);
-               else if (!wq_list_empty(&wqe->work_list))
+               else if (stalled)
                        wqe->flags |= IO_WQE_FLAG_STALLED;
 
-               raw_spin_unlock_irq(&wqe->lock);
+               raw_spin_unlock(&wqe->lock);
                if (!work)
                        break;
                io_assign_current_work(worker, work);
@@ -539,16 +574,16 @@ get_next:
                                clear_bit(hash, &wq->hash->map);
                                if (wq_has_sleeper(&wq->hash->wait))
                                        wake_up(&wq->hash->wait);
-                               raw_spin_lock_irq(&wqe->lock);
+                               raw_spin_lock(&wqe->lock);
                                wqe->flags &= ~IO_WQE_FLAG_STALLED;
                                /* skip unnecessary unlock-lock wqe->lock */
                                if (!work)
                                        goto get_next;
-                               raw_spin_unlock_irq(&wqe->lock);
+                               raw_spin_unlock(&wqe->lock);
                        }
                } while (work);
 
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
        } while (1);
 }
 
@@ -569,13 +604,13 @@ static int io_wqe_worker(void *data)
 
                set_current_state(TASK_INTERRUPTIBLE);
 loop:
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
                if (io_wqe_run_queue(wqe)) {
                        io_worker_handle_work(worker);
                        goto loop;
                }
                __io_worker_idle(wqe, worker);
-               raw_spin_unlock_irq(&wqe->lock);
+               raw_spin_unlock(&wqe->lock);
                if (io_flush_signals())
                        continue;
                ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
@@ -594,7 +629,7 @@ loop:
        }
 
        if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
                io_worker_handle_work(worker);
        }
 
@@ -636,9 +671,9 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
 
        worker->flags &= ~IO_WORKER_F_RUNNING;
 
-       raw_spin_lock_irq(&worker->wqe->lock);
+       raw_spin_lock(&worker->wqe->lock);
        io_wqe_dec_running(worker);
-       raw_spin_unlock_irq(&worker->wqe->lock);
+       raw_spin_unlock(&worker->wqe->lock);
 }
 
 static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
@@ -664,9 +699,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bo
                kfree(worker);
 fail:
                atomic_dec(&acct->nr_running);
-               raw_spin_lock_irq(&wqe->lock);
+               raw_spin_lock(&wqe->lock);
                acct->nr_workers--;
-               raw_spin_unlock_irq(&wqe->lock);
+               raw_spin_unlock(&wqe->lock);
                io_worker_ref_put(wq);
                return;
        }
@@ -676,7 +711,7 @@ fail:
        set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
        tsk->flags |= PF_NO_SETAFFINITY;
 
-       raw_spin_lock_irq(&wqe->lock);
+       raw_spin_lock(&wqe->lock);
        hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
        list_add_tail_rcu(&worker->all_list, &wqe->all_list);
        worker->flags |= IO_WORKER_F_FREE;
@@ -684,7 +719,7 @@ fail:
                worker->flags |= IO_WORKER_F_BOUND;
        if (first && (worker->flags & IO_WORKER_F_BOUND))
                worker->flags |= IO_WORKER_F_FIXED;
-       raw_spin_unlock_irq(&wqe->lock);
+       raw_spin_unlock(&wqe->lock);
        wake_up_new_task(tsk);
 }
 
@@ -759,8 +794,7 @@ append:
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 {
        struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
-       int work_flags;
-       unsigned long flags;
+       bool do_wake;
 
        /*
         * If io-wq is exiting for this task, or if the request has explicitly
@@ -772,14 +806,14 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
                return;
        }
 
-       work_flags = work->flags;
-       raw_spin_lock_irqsave(&wqe->lock, flags);
+       raw_spin_lock(&wqe->lock);
        io_wqe_insert_work(wqe, work);
        wqe->flags &= ~IO_WQE_FLAG_STALLED;
-       raw_spin_unlock_irqrestore(&wqe->lock, flags);
+       do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) ||
+                       !atomic_read(&acct->nr_running);
+       raw_spin_unlock(&wqe->lock);
 
-       if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
-           !atomic_read(&acct->nr_running))
+       if (do_wake)
                io_wqe_wake_worker(wqe, acct);
 }
 
@@ -805,19 +839,18 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 {
        struct io_cb_cancel_data *match = data;
-       unsigned long flags;
 
        /*
         * Hold the lock to avoid ->cur_work going out of scope, caller
         * may dereference the passed in work.
         */
-       spin_lock_irqsave(&worker->lock, flags);
+       spin_lock(&worker->lock);
        if (worker->cur_work &&
            match->fn(worker->cur_work, match->data)) {
                set_notify_signal(worker->task);
                match->nr_running++;
        }
-       spin_unlock_irqrestore(&worker->lock, flags);
+       spin_unlock(&worker->lock);
 
        return match->nr_running && !match->cancel_all;
 }
@@ -845,16 +878,15 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 {
        struct io_wq_work_node *node, *prev;
        struct io_wq_work *work;
-       unsigned long flags;
 
 retry:
-       raw_spin_lock_irqsave(&wqe->lock, flags);
+       raw_spin_lock(&wqe->lock);
        wq_list_for_each(node, prev, &wqe->work_list) {
                work = container_of(node, struct io_wq_work, list);
                if (!match->fn(work, match->data))
                        continue;
                io_wqe_remove_pending(wqe, work, prev);
-               raw_spin_unlock_irqrestore(&wqe->lock, flags);
+               raw_spin_unlock(&wqe->lock);
                io_run_cancel(work, wqe);
                match->nr_pending++;
                if (!match->cancel_all)
@@ -863,7 +895,7 @@ retry:
                /* not safe to continue after unlock */
                goto retry;
        }
-       raw_spin_unlock_irqrestore(&wqe->lock, flags);
+       raw_spin_unlock(&wqe->lock);
 }
 
 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -1004,12 +1036,12 @@ err_wq:
 
 static bool io_task_work_match(struct callback_head *cb, void *data)
 {
-       struct create_worker_data *cwd;
+       struct io_worker *worker;
 
        if (cb->func != create_worker_cb)
                return false;
-       cwd = container_of(cb, struct create_worker_data, work);
-       return cwd->wqe->wq == data;
+       worker = container_of(cb, struct io_worker, create_work);
+       return worker->wqe->wq == data;
 }
 
 void io_wq_exit_start(struct io_wq *wq)
@@ -1026,12 +1058,13 @@ static void io_wq_exit_workers(struct io_wq *wq)
                return;
 
        while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
-               struct create_worker_data *cwd;
+               struct io_worker *worker;
 
-               cwd = container_of(cb, struct create_worker_data, work);
-               atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
+               worker = container_of(cb, struct io_worker, create_work);
+               atomic_dec(&worker->wqe->acct[worker->create_index].nr_running);
                io_worker_ref_put(wq);
-               kfree(cwd);
+               clear_bit_unlock(0, &worker->create_state);
+               io_worker_release(worker);
        }
 
        rcu_read_lock();
@@ -1143,6 +1176,35 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
        return 0;
 }
 
+/*
+ * Set max number of unbounded workers, returns old value. If new_count is 0,
+ * then just return the old value.
+ */
+int io_wq_max_workers(struct io_wq *wq, int *new_count)
+{
+       int i, node, prev = 0;
+
+       for (i = 0; i < 2; i++) {
+               if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
+                       new_count[i] = task_rlimit(current, RLIMIT_NPROC);
+       }
+
+       rcu_read_lock();
+       for_each_node(node) {
+               struct io_wqe_acct *acct;
+
+               for (i = 0; i < 2; i++) {
+                       acct = &wq->wqes[node]->acct[i];
+                       prev = max_t(int, acct->max_workers, prev);
+                       if (new_count[i])
+                               acct->max_workers = new_count[i];
+                       new_count[i] = prev;
+               }
+       }
+       rcu_read_unlock();
+       return 0;
+}
+
 static __init int io_wq_init(void)
 {
        int ret;