Merge tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block
authorLinus Torvalds <torvalds@linux-foundation.org>
Mon, 3 Aug 2020 20:01:22 +0000 (13:01 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Mon, 3 Aug 2020 20:01:22 +0000 (13:01 -0700)
Pull io_uring updates from Jens Axboe:
 "Lots of cleanups in here, hardening the code and/or making it easier
  to read and fixing bugs, but a core feature/change too adding support
  for real async buffered reads. With the latter in place, we just need
  buffered write async support and we're done relying on kthreads for
  the fast path. In detail:

   - Cleanup how memory accounting is done on ring setup/free (Bijan)

   - sq array offset calculation fixup (Dmitry)

   - Consistently handle blocking off O_DIRECT submission path (me)

   - Support proper async buffered reads, instead of relying on kthread
     offload for that. This uses the page waitqueue to drive retries
     from task_work, like we handle poll based retry. (me)

   - IO completion optimizations (me)

   - Fix race with accounting and ring fd install (me)

   - Support EPOLLEXCLUSIVE (Jiufei)

   - Get rid of the io_kiocb unionizing, made possible by shrinking
     other bits (Pavel)

   - Completion side cleanups (Pavel)

   - Cleanup REQ_F_ flags handling, and kill off many of them (Pavel)

   - Request environment grabbing cleanups (Pavel)

   - File and socket read/write cleanups (Pavel)

   - Improve kiocb_set_rw_flags() (Pavel)

   - Tons of fixes and cleanups (Pavel)

   - IORING_SQ_NEED_WAKEUP clear fix (Xiaoguang)"

* tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block: (127 commits)
  io_uring: flip if handling after io_setup_async_rw
  fs: optimise kiocb_set_rw_flags()
  io_uring: don't touch 'ctx' after installing file descriptor
  io_uring: get rid of atomic FAA for cq_timeouts
  io_uring: consolidate *_check_overflow accounting
  io_uring: fix stalled deferred requests
  io_uring: fix racy overflow count reporting
  io_uring: deduplicate __io_complete_rw()
  io_uring: de-unionise io_kiocb
  io-wq: update hash bits
  io_uring: fix missing io_queue_linked_timeout()
  io_uring: mark ->work uninitialised after cleanup
  io_uring: deduplicate io_grab_files() calls
  io_uring: don't do opcode prep twice
  io_uring: clear IORING_SQ_NEED_WAKEUP after executing task works
  io_uring: batch put_task_struct()
  tasks: add put_task_struct_many()
  io_uring: return locked and pinned page accounting
  io_uring: don't miscount pinned memory
  io_uring: don't open-code recv kbuf managment
  ...

1  2 
block/blk-core.c
fs/block_dev.c
fs/btrfs/file.c
fs/io_uring.c
include/linux/blkdev.h
include/linux/fs.h
include/linux/pagemap.h
mm/filemap.c

@@@ -956,13 -952,30 +956,18 @@@ static inline blk_status_t blk_check_zo
        return BLK_STS_OK;
  }
  
 -static noinline_for_stack bool
 -generic_make_request_checks(struct bio *bio)
 +static noinline_for_stack bool submit_bio_checks(struct bio *bio)
  {
 -      struct request_queue *q;
 -      int nr_sectors = bio_sectors(bio);
 +      struct request_queue *q = bio->bi_disk->queue;
        blk_status_t status = BLK_STS_IOERR;
 -      char b[BDEVNAME_SIZE];
+       struct blk_plug *plug;
  
        might_sleep();
  
 -      q = bio->bi_disk->queue;
 -      if (unlikely(!q)) {
 -              printk(KERN_ERR
 -                     "generic_make_request: Trying to access "
 -                      "nonexistent block-device %s (%Lu)\n",
 -                      bio_devname(bio, b), (long long)bio->bi_iter.bi_sector);
 -              goto end_io;
 -      }
 -
+       plug = blk_mq_plug(q, bio);
+       if (plug && plug->nowait)
+               bio->bi_opf |= REQ_NOWAIT;
        /*
         * For a REQ_NOWAIT based request, return -EOPNOTSUPP
         * if queue is not a request based queue.
diff --cc fs/block_dev.c
Simple merge
diff --cc fs/btrfs/file.c
Simple merge
diff --cc fs/io_uring.c
@@@ -2593,40 -2913,122 +2913,147 @@@ static int io_setup_async_rw(struct io_
  
                io_req_map_rw(req, io_size, iovec, fast_iov, iter);
        }
-       return 0;
+       return 0;
+ }
+ static inline int io_rw_prep_async(struct io_kiocb *req, int rw,
+                                  bool force_nonblock)
+ {
+       struct io_async_ctx *io = req->io;
+       struct iov_iter iter;
+       ssize_t ret;
+       io->rw.iov = io->rw.fast_iov;
+       req->io = NULL;
+       ret = io_import_iovec(rw, req, &io->rw.iov, &iter, !force_nonblock);
+       req->io = io;
+       if (unlikely(ret < 0))
+               return ret;
+       io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
+       return 0;
+ }
+ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
+                       bool force_nonblock)
+ {
+       ssize_t ret;
+       ret = io_prep_rw(req, sqe, force_nonblock);
+       if (ret)
+               return ret;
+       if (unlikely(!(req->file->f_mode & FMODE_READ)))
+               return -EBADF;
+       /* either don't need iovec imported or already have it */
+       if (!req->io || req->flags & REQ_F_NEED_CLEANUP)
+               return 0;
+       return io_rw_prep_async(req, READ, force_nonblock);
+ }
+ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
+                            int sync, void *arg)
+ {
+       struct wait_page_queue *wpq;
+       struct io_kiocb *req = wait->private;
+       struct wait_page_key *key = arg;
+       int ret;
+       wpq = container_of(wait, struct wait_page_queue, wait);
 -      ret = wake_page_match(wpq, key);
 -      if (ret != 1)
 -              return ret;
++      if (!wake_page_match(wpq, key))
++              return 0;
++
++      /* Stop waking things up if the page is locked again */
++      if (test_bit(key->bit_nr, &key->page->flags))
++              return -1;
+       list_del_init(&wait->entry);
+       init_task_work(&req->task_work, io_req_task_submit);
+       /* submit ref gets dropped, acquire a new one */
+       refcount_inc(&req->refs);
+       ret = io_req_task_work_add(req, &req->task_work);
+       if (unlikely(ret)) {
+               struct task_struct *tsk;
+               /* queue just for cancelation */
+               init_task_work(&req->task_work, io_req_task_cancel);
+               tsk = io_wq_get_task(req->ctx->io_wq);
+               task_work_add(tsk, &req->task_work, 0);
+               wake_up_process(tsk);
+       }
+       return 1;
+ }
++static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb,
++                                           struct wait_page_queue *wait,
++                                           wait_queue_func_t func,
++                                           void *data)
++{
++      /* Can't support async wakeup with polled IO */
++      if (kiocb->ki_flags & IOCB_HIPRI)
++              return -EINVAL;
++      if (kiocb->ki_filp->f_mode & FMODE_BUF_RASYNC) {
++              wait->wait.func = func;
++              wait->wait.private = data;
++              wait->wait.flags = 0;
++              INIT_LIST_HEAD(&wait->wait.entry);
++              kiocb->ki_flags |= IOCB_WAITQ;
++              kiocb->ki_waitq = wait;
++              return 0;
++      }
++
++      return -EOPNOTSUPP;
 +}
 +
- static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
-                       bool force_nonblock)
++
+ static bool io_rw_should_retry(struct io_kiocb *req)
  {
-       struct io_async_ctx *io;
-       struct iov_iter iter;
-       ssize_t ret;
+       struct kiocb *kiocb = &req->rw.kiocb;
+       int ret;
  
-       ret = io_prep_rw(req, sqe, force_nonblock);
-       if (ret)
-               return ret;
+       /* never retry for NOWAIT, we just complete with -EAGAIN */
+       if (req->flags & REQ_F_NOWAIT)
+               return false;
  
-       if (unlikely(!(req->file->f_mode & FMODE_READ)))
-               return -EBADF;
+       /* already tried, or we're doing O_DIRECT */
+       if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
+               return false;
+       /*
+        * just use poll if we can, and don't attempt if the fs doesn't
+        * support callback based unlocks
+        */
+       if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
+               return false;
  
-       /* either don't need iovec imported or already have it */
-       if (!req->io || req->flags & REQ_F_NEED_CLEANUP)
-               return 0;
+       /*
+        * If request type doesn't require req->io to defer in general,
+        * we need to allocate it here
+        */
+       if (!req->io && __io_alloc_async_ctx(req))
+               return false;
  
-       io = req->io;
-       io->rw.iov = io->rw.fast_iov;
-       req->io = NULL;
-       ret = io_import_iovec(READ, req, &io->rw.iov, &iter, !force_nonblock);
-       req->io = io;
-       if (ret < 0)
-               return ret;
+       ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
+                                               io_async_buf_func, req);
+       if (!ret) {
+               io_get_req_task(req);
+               return true;
+       }
  
-       io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
-       return 0;
+       return false;
+ }
+ static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
+ {
+       if (req->file->f_op->read_iter)
+               return call_read_iter(req->file, &req->rw.kiocb, iter);
+       return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
  }
  
- static int io_read(struct io_kiocb *req, bool force_nonblock)
+ static int io_read(struct io_kiocb *req, bool force_nonblock,
+                  struct io_comp_state *cs)
  {
        struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
        struct kiocb *kiocb = &req->rw.kiocb;
Simple merge
@@@ -315,7 -318,8 +318,9 @@@ enum rw_hint 
  #define IOCB_SYNC             (1 << 5)
  #define IOCB_WRITE            (1 << 6)
  #define IOCB_NOWAIT           (1 << 7)
+ /* iocb->ki_waitq is valid */
+ #define IOCB_WAITQ            (1 << 8)
 +#define IOCB_NOIO             (1 << 9)
  
  struct kiocb {
        struct file             *ki_filp;
@@@ -496,8 -496,67 +496,35 @@@ static inline pgoff_t linear_page_index
        return pgoff;
  }
  
 -static inline int wake_page_match(struct wait_page_queue *wait_page,
+ /* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */
+ struct wait_page_key {
+       struct page *page;
+       int bit_nr;
+       int page_match;
+ };
+ struct wait_page_queue {
+       struct page *page;
+       int bit_nr;
+       wait_queue_entry_t wait;
+ };
 -             return 0;
++static inline bool wake_page_match(struct wait_page_queue *wait_page,
+                                 struct wait_page_key *key)
+ {
+       if (wait_page->page != key->page)
 -              return 0;
 -
 -      /*
 -       * Stop walking if it's locked.
 -       * Is this safe if put_and_wait_on_page_locked() is in use?
 -       * Yes: the waker must hold a reference to this page, and if PG_locked
 -       * has now already been set by another task, that task must also hold
 -       * a reference to the *same usage* of this page; so there is no need
 -       * to walk on to wake even the put_and_wait_on_page_locked() callers.
 -       */
 -      if (test_bit(key->bit_nr, &key->page->flags))
 -              return -1;
 -
 -      return 1;
 -}
 -
 -static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb,
 -                                           struct wait_page_queue *wait,
 -                                           wait_queue_func_t func,
 -                                           void *data)
 -{
 -      /* Can't support async wakeup with polled IO */
 -      if (kiocb->ki_flags & IOCB_HIPRI)
 -              return -EINVAL;
 -      if (kiocb->ki_filp->f_mode & FMODE_BUF_RASYNC) {
 -              wait->wait.func = func;
 -              wait->wait.private = data;
 -              wait->wait.flags = 0;
 -              INIT_LIST_HEAD(&wait->wait.entry);
 -              kiocb->ki_flags |= IOCB_WAITQ;
 -              kiocb->ki_waitq = wait;
 -              return 0;
 -      }
++             return false;
+       key->page_match = 1;
+       if (wait_page->bit_nr != key->bit_nr)
 -      return -EOPNOTSUPP;
++              return false;
++      return true;
+ }
  extern void __lock_page(struct page *page);
  extern int __lock_page_killable(struct page *page);
+ extern int __lock_page_async(struct page *page, struct wait_page_queue *wait);
  extern int __lock_page_or_retry(struct page *page, struct mm_struct *mm,
                                unsigned int flags);
  extern void unlock_page(struct page *page);
diff --cc mm/filemap.c
@@@ -987,63 -987,17 +987,46 @@@ void __init pagecache_init(void
        page_writeback_init();
  }
  
- /* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */
- struct wait_page_key {
-       struct page *page;
-       int bit_nr;
-       int page_match;
- };
- struct wait_page_queue {
-       struct page *page;
-       int bit_nr;
-       wait_queue_entry_t wait;
- };
  static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
  {
 +      int ret;
        struct wait_page_key *key = arg;
        struct wait_page_queue *wait_page
                = container_of(wait, struct wait_page_queue, wait);
 -      int ret;
  
-       if (wait_page->page != key->page)
-              return 0;
-       key->page_match = 1;
-       if (wait_page->bit_nr != key->bit_nr)
 -      ret = wake_page_match(wait_page, key);
 -      if (ret != 1)
 -              return ret;
 -      return autoremove_wake_function(wait, mode, sync, key);
++      if (!wake_page_match(wait_page, key))
 +              return 0;
 +
 +      /*
 +       * If it's an exclusive wait, we get the bit for it, and
 +       * stop walking if we can't.
 +       *
 +       * If it's a non-exclusive wait, then the fact that this
 +       * wake function was called means that the bit already
 +       * was cleared, and we don't care if somebody then
 +       * re-took it.
 +       */
 +      ret = 0;
 +      if (wait->flags & WQ_FLAG_EXCLUSIVE) {
 +              if (test_and_set_bit(key->bit_nr, &key->page->flags))
 +                      return -1;
 +              ret = 1;
 +      }
 +      wait->flags |= WQ_FLAG_WOKEN;
 +
 +      wake_up_state(wait->private, mode);
 +
 +      /*
 +       * Ok, we have successfully done what we're waiting for,
 +       * and we can unconditionally remove the wait entry.
 +       *
 +       * Note that this has to be the absolute last thing we do,
 +       * since after list_del_init(&wait->entry) the wait entry
 +       * might be de-allocated and the process might even have
 +       * exited.
 +       */
 +      list_del_init_careful(&wait->entry);
 +      return ret;
  }
  
  static void wake_up_page_bit(struct page *page, int bit_nr)
@@@ -2061,8 -2044,6 +2087,8 @@@ find_page
  
                page = find_get_page(mapping, index);
                if (!page) {
-                       if (iocb->ki_flags & (IOCB_NOWAIT | IOCB_NOIO))
++                      if (iocb->ki_flags & IOCB_NOIO)
 +                              goto would_block;
                        page_cache_sync_readahead(mapping,
                                        ra, filp,
                                        index, last_index - index);
@@@ -2197,7 -2185,7 +2234,7 @@@ page_not_up_to_date_locked
                }
  
  readpage:
-               if (iocb->ki_flags & IOCB_NOIO) {
 -              if (iocb->ki_flags & IOCB_NOWAIT) {
++              if (iocb->ki_flags & (IOCB_NOIO | IOCB_NOWAIT)) {
                        unlock_page(page);
                        put_page(page);
                        goto would_block;