Merge tag 'erofs-for-5.5-rc2-fixes' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-2.6-microblaze.git] / fs / pipe.c
index 648ce44..04d004e 100644 (file)
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -270,22 +270,41 @@ static bool pipe_buf_can_merge(struct pipe_buffer *buf)
        return buf->ops == &anon_pipe_buf_ops;
 }
 
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
+static inline bool pipe_readable(const struct pipe_inode_info *pipe)
+{
+       unsigned int head = READ_ONCE(pipe->head);
+       unsigned int tail = READ_ONCE(pipe->tail);
+       unsigned int writers = READ_ONCE(pipe->writers);
+
+       return !pipe_empty(head, tail) || !writers;
+}
+
 static ssize_t
 pipe_read(struct kiocb *iocb, struct iov_iter *to)
 {
        size_t total_len = iov_iter_count(to);
        struct file *filp = iocb->ki_filp;
        struct pipe_inode_info *pipe = filp->private_data;
-       int do_wakeup;
+       bool was_full;
        ssize_t ret;
 
        /* Null read succeeds. */
        if (unlikely(total_len == 0))
                return 0;
 
-       do_wakeup = 0;
        ret = 0;
        __pipe_lock(pipe);
+
+       /*
+        * We only wake up writers if the pipe was full when we started
+        * reading in order to avoid unnecessary wakeups.
+        *
+        * But when we do wake up writers, we do so using a sync wakeup
+        * (WF_SYNC), because we want them to get going and generate more
+        * data for us.
+        */
+       was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
        for (;;) {
                unsigned int head = pipe->head;
                unsigned int tail = pipe->tail;
@@ -324,19 +343,11 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
                        }
 
                        if (!buf->len) {
-                               bool wake;
                                pipe_buf_release(pipe, buf);
                                spin_lock_irq(&pipe->wait.lock);
                                tail++;
                                pipe->tail = tail;
-                               do_wakeup = 1;
-                               wake = head - (tail - 1) == pipe->max_usage / 2;
-                               if (wake)
-                                       wake_up_locked_poll(
-                                               &pipe->wait, EPOLLOUT | EPOLLWRNORM);
                                spin_unlock_irq(&pipe->wait.lock);
-                               if (wake)
-                                       kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
                        }
                        total_len -= chars;
                        if (!total_len)
@@ -347,31 +358,52 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
 
                if (!pipe->writers)
                        break;
-               if (!pipe->waiting_writers) {
-                       /* syscall merging: Usually we must not sleep
-                        * if O_NONBLOCK is set, or if we got some data.
-                        * But if a writer sleeps in kernel space, then
-                        * we can wait for that data without violating POSIX.
-                        */
-                       if (ret)
-                               break;
-                       if (filp->f_flags & O_NONBLOCK) {
-                               ret = -EAGAIN;
-                               break;
-                       }
-               }
-               if (signal_pending(current)) {
-                       if (!ret)
-                               ret = -ERESTARTSYS;
+               if (ret)
+                       break;
+               if (filp->f_flags & O_NONBLOCK) {
+                       ret = -EAGAIN;
                        break;
                }
-               pipe_wait(pipe);
+               __pipe_unlock(pipe);
+
+               /*
+                * We only get here if we didn't actually read anything.
+                *
+                * However, we could have seen (and removed) a zero-sized
+                * pipe buffer, and might have made space in the buffers
+                * that way.
+                *
+                * You can't make zero-sized pipe buffers by doing an empty
+                * write (not even in packet mode), but they can happen if
+                * the writer gets an EFAULT when trying to fill a buffer
+                * that already got allocated and inserted in the buffer
+                * array.
+                *
+                * So we still need to wake up any pending writers in the
+                * _very_ unlikely case that the pipe was full, but we got
+                * no data.
+                */
+               if (unlikely(was_full)) {
+                       wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
+                       kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
+               }
+
+               /*
+                * But because we didn't read anything, at this point we can
+                * just return directly with -ERESTARTSYS if we're interrupted,
+                * since we've done any required wakeups and there's no need
+                * to mark anything accessed. And we've dropped the lock.
+                */
+               if (wait_event_interruptible(pipe->wait, pipe_readable(pipe)) < 0)
+                       return -ERESTARTSYS;
+
+               __pipe_lock(pipe);
+               was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
        }
        __pipe_unlock(pipe);
 
-       /* Signal writers asynchronously that there is more room. */
-       if (do_wakeup) {
-               wake_up_interruptible_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
+       if (was_full) {
+               wake_up_interruptible_sync_poll(&pipe->wait, EPOLLOUT | EPOLLWRNORM);
                kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
        }
        if (ret > 0)
@@ -384,16 +416,27 @@ static inline int is_packetized(struct file *file)
        return (file->f_flags & O_DIRECT) != 0;
 }
 
+/* Done while waiting without holding the pipe lock - thus the READ_ONCE() */
+static inline bool pipe_writable(const struct pipe_inode_info *pipe)
+{
+       unsigned int head = READ_ONCE(pipe->head);
+       unsigned int tail = READ_ONCE(pipe->tail);
+       unsigned int max_usage = READ_ONCE(pipe->max_usage);
+
+       return !pipe_full(head, tail, max_usage) ||
+               !READ_ONCE(pipe->readers);
+}
+
 static ssize_t
 pipe_write(struct kiocb *iocb, struct iov_iter *from)
 {
        struct file *filp = iocb->ki_filp;
        struct pipe_inode_info *pipe = filp->private_data;
-       unsigned int head, max_usage, mask;
+       unsigned int head;
        ssize_t ret = 0;
-       int do_wakeup = 0;
        size_t total_len = iov_iter_count(from);
        ssize_t chars;
+       bool was_empty = false;
 
        /* Null write succeeds. */
        if (unlikely(total_len == 0))
@@ -407,13 +450,22 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
                goto out;
        }
 
+       /*
+        * Only wake up if the pipe started out empty, since
+        * otherwise there should be no readers waiting.
+        *
+        * If it wasn't empty we try to merge new data into
+        * the last buffer.
+        *
+        * That naturally merges small writes, but it also
+        * page-aligs the rest of the writes for large writes
+        * spanning multiple pages.
+        */
        head = pipe->head;
-       max_usage = pipe->max_usage;
-       mask = pipe->ring_size - 1;
-
-       /* We try to merge small writes */
-       chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
-       if (!pipe_empty(head, pipe->tail) && chars != 0) {
+       was_empty = pipe_empty(head, pipe->tail);
+       chars = total_len & (PAGE_SIZE-1);
+       if (chars && !was_empty) {
+               unsigned int mask = pipe->ring_size - 1;
                struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
                int offset = buf->offset + buf->len;
 
@@ -427,7 +479,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
                                ret = -EFAULT;
                                goto out;
                        }
-                       do_wakeup = 1;
+
                        buf->len += ret;
                        if (!iov_iter_count(from))
                                goto out;
@@ -443,7 +495,8 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
                }
 
                head = pipe->head;
-               if (!pipe_full(head, pipe->tail, max_usage)) {
+               if (!pipe_full(head, pipe->tail, pipe->max_usage)) {
+                       unsigned int mask = pipe->ring_size - 1;
                        struct pipe_buffer *buf = &pipe->bufs[head & mask];
                        struct page *page = pipe->tmp_page;
                        int copied;
@@ -465,23 +518,13 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
                        spin_lock_irq(&pipe->wait.lock);
 
                        head = pipe->head;
-                       if (pipe_full(head, pipe->tail, max_usage)) {
+                       if (pipe_full(head, pipe->tail, pipe->max_usage)) {
                                spin_unlock_irq(&pipe->wait.lock);
                                continue;
                        }
 
                        pipe->head = head + 1;
-
-                       /* Always wake up, even if the copy fails. Otherwise
-                        * we lock up (O_NONBLOCK-)readers that sleep due to
-                        * syscall merging.
-                        * FIXME! Is this really true?
-                        */
-                       wake_up_locked_poll(
-                               &pipe->wait, EPOLLIN | EPOLLRDNORM);
-
                        spin_unlock_irq(&pipe->wait.lock);
-                       kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 
                        /* Insert it into the buffer array */
                        buf = &pipe->bufs[head & mask];
@@ -510,7 +553,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
                                break;
                }
 
-               if (!pipe_full(head, pipe->tail, max_usage))
+               if (!pipe_full(head, pipe->tail, pipe->max_usage))
                        continue;
 
                /* Wait for buffer space to become available. */
@@ -524,14 +567,36 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
                                ret = -ERESTARTSYS;
                        break;
                }
-               pipe->waiting_writers++;
-               pipe_wait(pipe);
-               pipe->waiting_writers--;
+
+               /*
+                * We're going to release the pipe lock and wait for more
+                * space. We wake up any readers if necessary, and then
+                * after waiting we need to re-check whether the pipe
+                * become empty while we dropped the lock.
+                */
+               __pipe_unlock(pipe);
+               if (was_empty) {
+                       wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
+                       kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
+               }
+               wait_event_interruptible(pipe->wait, pipe_writable(pipe));
+               __pipe_lock(pipe);
+               was_empty = pipe_empty(head, pipe->tail);
        }
 out:
        __pipe_unlock(pipe);
-       if (do_wakeup) {
-               wake_up_interruptible_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
+
+       /*
+        * If we do do a wakeup event, we do a 'sync' wakeup, because we
+        * want the reader to start processing things asap, rather than
+        * leave the data pending.
+        *
+        * This is particularly important for small writes, because of
+        * how (for example) the GNU make jobserver uses small writes to
+        * wake up pending jobs
+        */
+       if (was_empty) {
+               wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM);
                kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
        }
        if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {
@@ -574,14 +639,24 @@ pipe_poll(struct file *filp, poll_table *wait)
 {
        __poll_t mask;
        struct pipe_inode_info *pipe = filp->private_data;
-       unsigned int head = READ_ONCE(pipe->head);
-       unsigned int tail = READ_ONCE(pipe->tail);
+       unsigned int head, tail;
 
+       /*
+        * Reading only -- no need for acquiring the semaphore.
+        *
+        * But because this is racy, the code has to add the
+        * entry to the poll table _first_ ..
+        */
        poll_wait(filp, &pipe->wait, wait);
 
-       BUG_ON(pipe_occupancy(head, tail) > pipe->ring_size);
+       /*
+        * .. and only then can you do the racy tests. That way,
+        * if something changes and you got it wrong, the poll
+        * table entry will wake you up and fix it.
+        */
+       head = READ_ONCE(pipe->head);
+       tail = READ_ONCE(pipe->tail);
 
-       /* Reading only -- no need for acquiring the semaphore.  */
        mask = 0;
        if (filp->f_mode & FMODE_READ) {
                if (!pipe_empty(head, tail))
@@ -1176,6 +1251,7 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
        pipe->max_usage = nr_slots;
        pipe->tail = tail;
        pipe->head = head;
+       wake_up_interruptible_all(&pipe->wait);
        return pipe->max_usage * PAGE_SIZE;
 
 out_revert_acct: