KVM: x86: document limitations of MSR filtering
[linux-2.6-microblaze.git] / fs / io-wq.c
index a776312..bb7f161 100644 (file)
@@ -48,7 +48,8 @@ struct io_worker {
        struct io_wqe *wqe;
 
        struct io_wq_work *cur_work;
-       spinlock_t lock;
+       struct io_wq_work *next_work;
+       raw_spinlock_t lock;
 
        struct completion ref_done;
 
@@ -405,8 +406,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
  * Worker will start processing some work. Move it to the busy list, if
  * it's currently on the freelist
  */
-static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
-                            struct io_wq_work *work)
+static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
        __must_hold(wqe->lock)
 {
        if (worker->flags & IO_WORKER_F_FREE) {
@@ -529,9 +529,10 @@ static void io_assign_current_work(struct io_worker *worker,
                cond_resched();
        }
 
-       spin_lock(&worker->lock);
+       raw_spin_lock(&worker->lock);
        worker->cur_work = work;
-       spin_unlock(&worker->lock);
+       worker->next_work = NULL;
+       raw_spin_unlock(&worker->lock);
 }
 
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
@@ -546,7 +547,7 @@ static void io_worker_handle_work(struct io_worker *worker)
 
        do {
                struct io_wq_work *work;
-get_next:
+
                /*
                 * If we got some work, mark us as busy. If we didn't, but
                 * the list isn't empty, it means we stalled on hashed work.
@@ -555,9 +556,20 @@ get_next:
                 * clear the stalled flag.
                 */
                work = io_get_next_work(acct, worker);
-               if (work)
-                       __io_worker_busy(wqe, worker, work);
-
+               if (work) {
+                       __io_worker_busy(wqe, worker);
+
+                       /*
+                        * Make sure cancelation can find this, even before
+                        * it becomes the active work. That avoids a window
+                        * where the work has been removed from our general
+                        * work list, but isn't yet discoverable as the
+                        * current work item for this worker.
+                        */
+                       raw_spin_lock(&worker->lock);
+                       worker->next_work = work;
+                       raw_spin_unlock(&worker->lock);
+               }
                raw_spin_unlock(&wqe->lock);
                if (!work)
                        break;
@@ -594,11 +606,6 @@ get_next:
                                spin_unlock_irq(&wq->hash->wait.lock);
                                if (wq_has_sleeper(&wq->hash->wait))
                                        wake_up(&wq->hash->wait);
-                               raw_spin_lock(&wqe->lock);
-                               /* skip unnecessary unlock-lock wqe->lock */
-                               if (!work)
-                                       goto get_next;
-                               raw_spin_unlock(&wqe->lock);
                        }
                } while (work);
 
@@ -815,7 +822,7 @@ fail:
 
        refcount_set(&worker->ref, 1);
        worker->wqe = wqe;
-       spin_lock_init(&worker->lock);
+       raw_spin_lock_init(&worker->lock);
        init_completion(&worker->ref_done);
 
        if (index == IO_WQ_ACCT_BOUND)
@@ -973,6 +980,19 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
        work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
 }
 
+static bool __io_wq_worker_cancel(struct io_worker *worker,
+                                 struct io_cb_cancel_data *match,
+                                 struct io_wq_work *work)
+{
+       if (work && match->fn(work, match->data)) {
+               work->flags |= IO_WQ_WORK_CANCEL;
+               set_notify_signal(worker->task);
+               return true;
+       }
+
+       return false;
+}
+
 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
 {
        struct io_cb_cancel_data *match = data;
@@ -981,13 +1001,11 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
         * Hold the lock to avoid ->cur_work going out of scope, caller
         * may dereference the passed in work.
         */
-       spin_lock(&worker->lock);
-       if (worker->cur_work &&
-           match->fn(worker->cur_work, match->data)) {
-               set_notify_signal(worker->task);
+       raw_spin_lock(&worker->lock);
+       if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
+           __io_wq_worker_cancel(worker, match, worker->next_work))
                match->nr_running++;
-       }
-       spin_unlock(&worker->lock);
+       raw_spin_unlock(&worker->lock);
 
        return match->nr_running && !match->cancel_all;
 }
@@ -1039,17 +1057,16 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
 {
        int i;
 retry:
-       raw_spin_lock(&wqe->lock);
        for (i = 0; i < IO_WQ_ACCT_NR; i++) {
                struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
 
                if (io_acct_cancel_pending_work(wqe, acct, match)) {
+                       raw_spin_lock(&wqe->lock);
                        if (match->cancel_all)
                                goto retry;
-                       return;
+                       break;
                }
        }
-       raw_spin_unlock(&wqe->lock);
 }
 
 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
@@ -1074,25 +1091,27 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
         * First check pending list, if we're lucky we can just remove it
         * from there. CANCEL_OK means that the work is returned as-new,
         * no completion will be posted for it.
-        */
-       for_each_node(node) {
-               struct io_wqe *wqe = wq->wqes[node];
-
-               io_wqe_cancel_pending_work(wqe, &match);
-               if (match.nr_pending && !match.cancel_all)
-                       return IO_WQ_CANCEL_OK;
-       }
-
-       /*
-        * Now check if a free (going busy) or busy worker has the work
+        *
+        * Then check if a free (going busy) or busy worker has the work
         * currently running. If we find it there, we'll return CANCEL_RUNNING
         * as an indication that we attempt to signal cancellation. The
         * completion will run normally in this case.
+        *
+        * Do both of these while holding the wqe->lock, to ensure that
+        * we'll find a work item regardless of state.
         */
        for_each_node(node) {
                struct io_wqe *wqe = wq->wqes[node];
 
+               raw_spin_lock(&wqe->lock);
+               io_wqe_cancel_pending_work(wqe, &match);
+               if (match.nr_pending && !match.cancel_all) {
+                       raw_spin_unlock(&wqe->lock);
+                       return IO_WQ_CANCEL_OK;
+               }
+
                io_wqe_cancel_running_work(wqe, &match);
+               raw_spin_unlock(&wqe->lock);
                if (match.nr_running && !match.cancel_all)
                        return IO_WQ_CANCEL_RUNNING;
        }
@@ -1263,7 +1282,9 @@ static void io_wq_destroy(struct io_wq *wq)
                        .fn             = io_wq_work_match_all,
                        .cancel_all     = true,
                };
+               raw_spin_lock(&wqe->lock);
                io_wqe_cancel_pending_work(wqe, &match);
+               raw_spin_unlock(&wqe->lock);
                free_cpumask_var(wqe->cpu_mask);
                kfree(wqe);
        }