Merge branch 'ucount-rlimit-fixes-for-v5.16' of git://git.kernel.org/pub/scm/linux...
[linux-2.6-microblaze.git] / fs / aio.c
index 9c81cf6..f6f1cbf 100644 (file)
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -181,8 +181,9 @@ struct poll_iocb {
        struct file             *file;
        struct wait_queue_head  *head;
        __poll_t                events;
-       bool                    done;
        bool                    cancelled;
+       bool                    work_scheduled;
+       bool                    work_need_resched;
        struct wait_queue_entry wait;
        struct work_struct      work;
 };
@@ -1619,6 +1620,51 @@ static void aio_poll_put_work(struct work_struct *work)
        iocb_put(iocb);
 }
 
+/*
+ * Safely lock the waitqueue which the request is on, synchronizing with the
+ * case where the ->poll() provider decides to free its waitqueue early.
+ *
+ * Returns true on success, meaning that req->head->lock was locked, req->wait
+ * is on req->head, and an RCU read lock was taken.  Returns false if the
+ * request was already removed from its waitqueue (which might no longer exist).
+ */
+static bool poll_iocb_lock_wq(struct poll_iocb *req)
+{
+       wait_queue_head_t *head;
+
+       /*
+        * While we hold the waitqueue lock and the waitqueue is nonempty,
+        * wake_up_pollfree() will wait for us.  However, taking the waitqueue
+        * lock in the first place can race with the waitqueue being freed.
+        *
+        * We solve this as eventpoll does: by taking advantage of the fact that
+        * all users of wake_up_pollfree() will RCU-delay the actual free.  If
+        * we enter rcu_read_lock() and see that the pointer to the queue is
+        * non-NULL, we can then lock it without the memory being freed out from
+        * under us, then check whether the request is still on the queue.
+        *
+        * Keep holding rcu_read_lock() as long as we hold the queue lock, in
+        * case the caller deletes the entry from the queue, leaving it empty.
+        * In that case, only RCU prevents the queue memory from being freed.
+        */
+       rcu_read_lock();
+       head = smp_load_acquire(&req->head);
+       if (head) {
+               spin_lock(&head->lock);
+               if (!list_empty(&req->wait.entry))
+                       return true;
+               spin_unlock(&head->lock);
+       }
+       rcu_read_unlock();
+       return false;
+}
+
+static void poll_iocb_unlock_wq(struct poll_iocb *req)
+{
+       spin_unlock(&req->head->lock);
+       rcu_read_unlock();
+}
+
 static void aio_poll_complete_work(struct work_struct *work)
 {
        struct poll_iocb *req = container_of(work, struct poll_iocb, work);
@@ -1638,14 +1684,27 @@ static void aio_poll_complete_work(struct work_struct *work)
         * avoid further branches in the fast path.
         */
        spin_lock_irq(&ctx->ctx_lock);
-       if (!mask && !READ_ONCE(req->cancelled)) {
-               add_wait_queue(req->head, &req->wait);
-               spin_unlock_irq(&ctx->ctx_lock);
-               return;
-       }
+       if (poll_iocb_lock_wq(req)) {
+               if (!mask && !READ_ONCE(req->cancelled)) {
+                       /*
+                        * The request isn't actually ready to be completed yet.
+                        * Reschedule completion if another wakeup came in.
+                        */
+                       if (req->work_need_resched) {
+                               schedule_work(&req->work);
+                               req->work_need_resched = false;
+                       } else {
+                               req->work_scheduled = false;
+                       }
+                       poll_iocb_unlock_wq(req);
+                       spin_unlock_irq(&ctx->ctx_lock);
+                       return;
+               }
+               list_del_init(&req->wait.entry);
+               poll_iocb_unlock_wq(req);
+       } /* else, POLLFREE has freed the waitqueue, so we must complete */
        list_del_init(&iocb->ki_list);
        iocb->ki_res.res = mangle_poll(mask);
-       req->done = true;
        spin_unlock_irq(&ctx->ctx_lock);
 
        iocb_put(iocb);
@@ -1657,13 +1716,14 @@ static int aio_poll_cancel(struct kiocb *iocb)
        struct aio_kiocb *aiocb = container_of(iocb, struct aio_kiocb, rw);
        struct poll_iocb *req = &aiocb->poll;
 
-       spin_lock(&req->head->lock);
-       WRITE_ONCE(req->cancelled, true);
-       if (!list_empty(&req->wait.entry)) {
-               list_del_init(&req->wait.entry);
-               schedule_work(&aiocb->poll.work);
-       }
-       spin_unlock(&req->head->lock);
+       if (poll_iocb_lock_wq(req)) {
+               WRITE_ONCE(req->cancelled, true);
+               if (!req->work_scheduled) {
+                       schedule_work(&aiocb->poll.work);
+                       req->work_scheduled = true;
+               }
+               poll_iocb_unlock_wq(req);
+       } /* else, the request was force-cancelled by POLLFREE already */
 
        return 0;
 }
@@ -1680,21 +1740,27 @@ static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
        if (mask && !(mask & req->events))
                return 0;
 
-       list_del_init(&req->wait.entry);
-
-       if (mask && spin_trylock_irqsave(&iocb->ki_ctx->ctx_lock, flags)) {
+       /*
+        * Complete the request inline if possible.  This requires that three
+        * conditions be met:
+        *   1. An event mask must have been passed.  If a plain wakeup was done
+        *      instead, then mask == 0 and we have to call vfs_poll() to get
+        *      the events, so inline completion isn't possible.
+        *   2. The completion work must not have already been scheduled.
+        *   3. ctx_lock must not be busy.  We have to use trylock because we
+        *      already hold the waitqueue lock, so this inverts the normal
+        *      locking order.  Use irqsave/irqrestore because not all
+        *      filesystems (e.g. fuse) call this function with IRQs disabled,
+        *      yet IRQs have to be disabled before ctx_lock is obtained.
+        */
+       if (mask && !req->work_scheduled &&
+           spin_trylock_irqsave(&iocb->ki_ctx->ctx_lock, flags)) {
                struct kioctx *ctx = iocb->ki_ctx;
 
-               /*
-                * Try to complete the iocb inline if we can. Use
-                * irqsave/irqrestore because not all filesystems (e.g. fuse)
-                * call this function with IRQs disabled and because IRQs
-                * have to be disabled before ctx_lock is obtained.
-                */
+               list_del_init(&req->wait.entry);
                list_del(&iocb->ki_list);
                iocb->ki_res.res = mangle_poll(mask);
-               req->done = true;
-               if (iocb->ki_eventfd && eventfd_signal_allowed()) {
+               if (iocb->ki_eventfd && !eventfd_signal_allowed()) {
                        iocb = NULL;
                        INIT_WORK(&req->work, aio_poll_put_work);
                        schedule_work(&req->work);
@@ -1703,7 +1769,43 @@ static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
                if (iocb)
                        iocb_put(iocb);
        } else {
-               schedule_work(&req->work);
+               /*
+                * Schedule the completion work if needed.  If it was already
+                * scheduled, record that another wakeup came in.
+                *
+                * Don't remove the request from the waitqueue here, as it might
+                * not actually be complete yet (we won't know until vfs_poll()
+                * is called), and we must not miss any wakeups.  POLLFREE is an
+                * exception to this; see below.
+                */
+               if (req->work_scheduled) {
+                       req->work_need_resched = true;
+               } else {
+                       schedule_work(&req->work);
+                       req->work_scheduled = true;
+               }
+
+               /*
+                * If the waitqueue is being freed early but we can't complete
+                * the request inline, we have to tear down the request as best
+                * we can.  That means immediately removing the request from its
+                * waitqueue and preventing all further accesses to the
+                * waitqueue via the request.  We also need to schedule the
+                * completion work (done above).  Also mark the request as
+                * cancelled, to potentially skip an unneeded call to ->poll().
+                */
+               if (mask & POLLFREE) {
+                       WRITE_ONCE(req->cancelled, true);
+                       list_del_init(&req->wait.entry);
+
+                       /*
+                        * Careful: this *must* be the last step, since as soon
+                        * as req->head is NULL'ed out, the request can be
+                        * completed and freed, since aio_poll_complete_work()
+                        * will no longer need to take the waitqueue lock.
+                        */
+                       smp_store_release(&req->head, NULL);
+               }
        }
        return 1;
 }
@@ -1711,6 +1813,7 @@ static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 struct aio_poll_table {
        struct poll_table_struct        pt;
        struct aio_kiocb                *iocb;
+       bool                            queued;
        int                             error;
 };
 
@@ -1721,11 +1824,12 @@ aio_poll_queue_proc(struct file *file, struct wait_queue_head *head,
        struct aio_poll_table *pt = container_of(p, struct aio_poll_table, pt);
 
        /* multiple wait queues per file are not supported */
-       if (unlikely(pt->iocb->poll.head)) {
+       if (unlikely(pt->queued)) {
                pt->error = -EINVAL;
                return;
        }
 
+       pt->queued = true;
        pt->error = 0;
        pt->iocb->poll.head = head;
        add_wait_queue(head, &pt->iocb->poll.wait);
@@ -1750,12 +1854,14 @@ static int aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
        req->events = demangle_poll(iocb->aio_buf) | EPOLLERR | EPOLLHUP;
 
        req->head = NULL;
-       req->done = false;
        req->cancelled = false;
+       req->work_scheduled = false;
+       req->work_need_resched = false;
 
        apt.pt._qproc = aio_poll_queue_proc;
        apt.pt._key = req->events;
        apt.iocb = aiocb;
+       apt.queued = false;
        apt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
 
        /* initialized the list so that we can do list_empty checks */
@@ -1764,23 +1870,35 @@ static int aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
 
        mask = vfs_poll(req->file, &apt.pt) & req->events;
        spin_lock_irq(&ctx->ctx_lock);
-       if (likely(req->head)) {
-               spin_lock(&req->head->lock);
-               if (unlikely(list_empty(&req->wait.entry))) {
-                       if (apt.error)
+       if (likely(apt.queued)) {
+               bool on_queue = poll_iocb_lock_wq(req);
+
+               if (!on_queue || req->work_scheduled) {
+                       /*
+                        * aio_poll_wake() already either scheduled the async
+                        * completion work, or completed the request inline.
+                        */
+                       if (apt.error) /* unsupported case: multiple queues */
                                cancel = true;
                        apt.error = 0;
                        mask = 0;
                }
                if (mask || apt.error) {
+                       /* Steal to complete synchronously. */
                        list_del_init(&req->wait.entry);
                } else if (cancel) {
+                       /* Cancel if possible (may be too late though). */
                        WRITE_ONCE(req->cancelled, true);
-               } else if (!req->done) { /* actually waiting for an event */
+               } else if (on_queue) {
+                       /*
+                        * Actually waiting for an event, so add the request to
+                        * active_reqs so that it can be cancelled if needed.
+                        */
                        list_add_tail(&aiocb->ki_list, &ctx->active_reqs);
                        aiocb->ki_cancel = aio_poll_cancel;
                }
-               spin_unlock(&req->head->lock);
+               if (on_queue)
+                       poll_iocb_unlock_wq(req);
        }
        if (mask) { /* no async, we'd stolen it */
                aiocb->ki_res.res = mangle_poll(mask);