io_uring: remove REQ_F_IO_DRAINED
[linux-2.6-microblaze.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73 #include <linux/namei.h>
74 #include <linux/fsnotify.h>
75 #include <linux/fadvise.h>
76
77 #define CREATE_TRACE_POINTS
78 #include <trace/events/io_uring.h>
79
80 #include <uapi/linux/io_uring.h>
81
82 #include "internal.h"
83 #include "io-wq.h"
84
85 #define IORING_MAX_ENTRIES      32768
86 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
87
88 /*
89  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
90  */
91 #define IORING_FILE_TABLE_SHIFT 9
92 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
93 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
94 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
95
96 struct io_uring {
97         u32 head ____cacheline_aligned_in_smp;
98         u32 tail ____cacheline_aligned_in_smp;
99 };
100
101 /*
102  * This data is shared with the application through the mmap at offsets
103  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
104  *
105  * The offsets to the member fields are published through struct
106  * io_sqring_offsets when calling io_uring_setup.
107  */
108 struct io_rings {
109         /*
110          * Head and tail offsets into the ring; the offsets need to be
111          * masked to get valid indices.
112          *
113          * The kernel controls head of the sq ring and the tail of the cq ring,
114          * and the application controls tail of the sq ring and the head of the
115          * cq ring.
116          */
117         struct io_uring         sq, cq;
118         /*
119          * Bitmasks to apply to head and tail offsets (constant, equals
120          * ring_entries - 1)
121          */
122         u32                     sq_ring_mask, cq_ring_mask;
123         /* Ring sizes (constant, power of 2) */
124         u32                     sq_ring_entries, cq_ring_entries;
125         /*
126          * Number of invalid entries dropped by the kernel due to
127          * invalid index stored in array
128          *
129          * Written by the kernel, shouldn't be modified by the
130          * application (i.e. get number of "new events" by comparing to
131          * cached value).
132          *
133          * After a new SQ head value was read by the application this
134          * counter includes all submissions that were dropped reaching
135          * the new SQ head (and possibly more).
136          */
137         u32                     sq_dropped;
138         /*
139          * Runtime flags
140          *
141          * Written by the kernel, shouldn't be modified by the
142          * application.
143          *
144          * The application needs a full memory barrier before checking
145          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
146          */
147         u32                     sq_flags;
148         /*
149          * Number of completion events lost because the queue was full;
150          * this should be avoided by the application by making sure
151          * there are not more requests pending than there is space in
152          * the completion queue.
153          *
154          * Written by the kernel, shouldn't be modified by the
155          * application (i.e. get number of "new events" by comparing to
156          * cached value).
157          *
158          * As completion events come in out of order this counter is not
159          * ordered with any other data.
160          */
161         u32                     cq_overflow;
162         /*
163          * Ring buffer of completion events.
164          *
165          * The kernel writes completion events fresh every time they are
166          * produced, so the application is allowed to modify pending
167          * entries.
168          */
169         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
170 };
171
172 struct io_mapped_ubuf {
173         u64             ubuf;
174         size_t          len;
175         struct          bio_vec *bvec;
176         unsigned int    nr_bvecs;
177 };
178
179 struct fixed_file_table {
180         struct file             **files;
181 };
182
183 enum {
184         FFD_F_ATOMIC,
185 };
186
187 struct fixed_file_data {
188         struct fixed_file_table         *table;
189         struct io_ring_ctx              *ctx;
190
191         struct percpu_ref               refs;
192         struct llist_head               put_llist;
193         unsigned long                   state;
194         struct work_struct              ref_work;
195         struct completion               done;
196 };
197
198 struct io_ring_ctx {
199         struct {
200                 struct percpu_ref       refs;
201         } ____cacheline_aligned_in_smp;
202
203         struct {
204                 unsigned int            flags;
205                 int                     compat: 1;
206                 int                     account_mem: 1;
207                 int                     cq_overflow_flushed: 1;
208                 int                     drain_next: 1;
209                 int                     eventfd_async: 1;
210
211                 /*
212                  * Ring buffer of indices into array of io_uring_sqe, which is
213                  * mmapped by the application using the IORING_OFF_SQES offset.
214                  *
215                  * This indirection could e.g. be used to assign fixed
216                  * io_uring_sqe entries to operations and only submit them to
217                  * the queue when needed.
218                  *
219                  * The kernel modifies neither the indices array nor the entries
220                  * array.
221                  */
222                 u32                     *sq_array;
223                 unsigned                cached_sq_head;
224                 unsigned                sq_entries;
225                 unsigned                sq_mask;
226                 unsigned                sq_thread_idle;
227                 unsigned                cached_sq_dropped;
228                 atomic_t                cached_cq_overflow;
229                 unsigned long           sq_check_overflow;
230
231                 struct list_head        defer_list;
232                 struct list_head        timeout_list;
233                 struct list_head        cq_overflow_list;
234
235                 wait_queue_head_t       inflight_wait;
236                 struct io_uring_sqe     *sq_sqes;
237         } ____cacheline_aligned_in_smp;
238
239         struct io_rings *rings;
240
241         /* IO offload */
242         struct io_wq            *io_wq;
243         struct task_struct      *sqo_thread;    /* if using sq thread polling */
244         struct mm_struct        *sqo_mm;
245         wait_queue_head_t       sqo_wait;
246
247         /*
248          * If used, fixed file set. Writers must ensure that ->refs is dead,
249          * readers must ensure that ->refs is alive as long as the file* is
250          * used. Only updated through io_uring_register(2).
251          */
252         struct fixed_file_data  *file_data;
253         unsigned                nr_user_files;
254         int                     ring_fd;
255         struct file             *ring_file;
256
257         /* if used, fixed mapped user buffers */
258         unsigned                nr_user_bufs;
259         struct io_mapped_ubuf   *user_bufs;
260
261         struct user_struct      *user;
262
263         const struct cred       *creds;
264
265         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
266         struct completion       *completions;
267
268         /* if all else fails... */
269         struct io_kiocb         *fallback_req;
270
271 #if defined(CONFIG_UNIX)
272         struct socket           *ring_sock;
273 #endif
274
275         struct {
276                 unsigned                cached_cq_tail;
277                 unsigned                cq_entries;
278                 unsigned                cq_mask;
279                 atomic_t                cq_timeouts;
280                 unsigned long           cq_check_overflow;
281                 struct wait_queue_head  cq_wait;
282                 struct fasync_struct    *cq_fasync;
283                 struct eventfd_ctx      *cq_ev_fd;
284         } ____cacheline_aligned_in_smp;
285
286         struct {
287                 struct mutex            uring_lock;
288                 wait_queue_head_t       wait;
289         } ____cacheline_aligned_in_smp;
290
291         struct {
292                 spinlock_t              completion_lock;
293                 struct llist_head       poll_llist;
294
295                 /*
296                  * ->poll_list is protected by the ctx->uring_lock for
297                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
298                  * For SQPOLL, only the single threaded io_sq_thread() will
299                  * manipulate the list, hence no extra locking is needed there.
300                  */
301                 struct list_head        poll_list;
302                 struct hlist_head       *cancel_hash;
303                 unsigned                cancel_hash_bits;
304                 bool                    poll_multi_file;
305
306                 spinlock_t              inflight_lock;
307                 struct list_head        inflight_list;
308         } ____cacheline_aligned_in_smp;
309 };
310
311 /*
312  * First field must be the file pointer in all the
313  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
314  */
315 struct io_poll_iocb {
316         struct file                     *file;
317         union {
318                 struct wait_queue_head  *head;
319                 u64                     addr;
320         };
321         __poll_t                        events;
322         bool                            done;
323         bool                            canceled;
324         struct wait_queue_entry         wait;
325 };
326
327 struct io_close {
328         struct file                     *file;
329         struct file                     *put_file;
330         int                             fd;
331 };
332
333 struct io_timeout_data {
334         struct io_kiocb                 *req;
335         struct hrtimer                  timer;
336         struct timespec64               ts;
337         enum hrtimer_mode               mode;
338         u32                             seq_offset;
339 };
340
341 struct io_accept {
342         struct file                     *file;
343         struct sockaddr __user          *addr;
344         int __user                      *addr_len;
345         int                             flags;
346 };
347
348 struct io_sync {
349         struct file                     *file;
350         loff_t                          len;
351         loff_t                          off;
352         int                             flags;
353         int                             mode;
354 };
355
356 struct io_cancel {
357         struct file                     *file;
358         u64                             addr;
359 };
360
361 struct io_timeout {
362         struct file                     *file;
363         u64                             addr;
364         int                             flags;
365         unsigned                        count;
366 };
367
368 struct io_rw {
369         /* NOTE: kiocb has the file as the first member, so don't do it here */
370         struct kiocb                    kiocb;
371         u64                             addr;
372         u64                             len;
373 };
374
375 struct io_connect {
376         struct file                     *file;
377         struct sockaddr __user          *addr;
378         int                             addr_len;
379 };
380
381 struct io_sr_msg {
382         struct file                     *file;
383         union {
384                 struct user_msghdr __user *msg;
385                 void __user             *buf;
386         };
387         int                             msg_flags;
388         size_t                          len;
389 };
390
391 struct io_open {
392         struct file                     *file;
393         int                             dfd;
394         union {
395                 unsigned                mask;
396         };
397         struct filename                 *filename;
398         struct statx __user             *buffer;
399         struct open_how                 how;
400 };
401
402 struct io_files_update {
403         struct file                     *file;
404         u64                             arg;
405         u32                             nr_args;
406         u32                             offset;
407 };
408
409 struct io_fadvise {
410         struct file                     *file;
411         u64                             offset;
412         u32                             len;
413         u32                             advice;
414 };
415
416 struct io_madvise {
417         struct file                     *file;
418         u64                             addr;
419         u32                             len;
420         u32                             advice;
421 };
422
423 struct io_async_connect {
424         struct sockaddr_storage         address;
425 };
426
427 struct io_async_msghdr {
428         struct iovec                    fast_iov[UIO_FASTIOV];
429         struct iovec                    *iov;
430         struct sockaddr __user          *uaddr;
431         struct msghdr                   msg;
432 };
433
434 struct io_async_rw {
435         struct iovec                    fast_iov[UIO_FASTIOV];
436         struct iovec                    *iov;
437         ssize_t                         nr_segs;
438         ssize_t                         size;
439 };
440
441 struct io_async_open {
442         struct filename                 *filename;
443 };
444
445 struct io_async_ctx {
446         union {
447                 struct io_async_rw      rw;
448                 struct io_async_msghdr  msg;
449                 struct io_async_connect connect;
450                 struct io_timeout_data  timeout;
451                 struct io_async_open    open;
452         };
453 };
454
455 /*
456  * NOTE! Each of the iocb union members has the file pointer
457  * as the first entry in their struct definition. So you can
458  * access the file pointer through any of the sub-structs,
459  * or directly as just 'ki_filp' in this struct.
460  */
461 struct io_kiocb {
462         union {
463                 struct file             *file;
464                 struct io_rw            rw;
465                 struct io_poll_iocb     poll;
466                 struct io_accept        accept;
467                 struct io_sync          sync;
468                 struct io_cancel        cancel;
469                 struct io_timeout       timeout;
470                 struct io_connect       connect;
471                 struct io_sr_msg        sr_msg;
472                 struct io_open          open;
473                 struct io_close         close;
474                 struct io_files_update  files_update;
475                 struct io_fadvise       fadvise;
476                 struct io_madvise       madvise;
477         };
478
479         struct io_async_ctx             *io;
480         /*
481          * llist_node is only used for poll deferred completions
482          */
483         struct llist_node               llist_node;
484         bool                            has_user;
485         bool                            in_async;
486         bool                            needs_fixed_file;
487         u8                              opcode;
488
489         struct io_ring_ctx      *ctx;
490         union {
491                 struct list_head        list;
492                 struct hlist_node       hash_node;
493         };
494         struct list_head        link_list;
495         unsigned int            flags;
496         refcount_t              refs;
497 #define REQ_F_NOWAIT            1       /* must not punt to workers */
498 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
499 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
500 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
501 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
502 #define REQ_F_LINK              64      /* linked sqes */
503 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
504 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
505 #define REQ_F_TIMEOUT           1024    /* timeout request */
506 #define REQ_F_ISREG             2048    /* regular file */
507 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
508 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
509 #define REQ_F_INFLIGHT          16384   /* on inflight list */
510 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
511 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
512 #define REQ_F_FORCE_ASYNC       131072  /* IOSQE_ASYNC */
513 #define REQ_F_CUR_POS           262144  /* read/write uses file position */
514         u64                     user_data;
515         u32                     result;
516         u32                     sequence;
517
518         struct list_head        inflight_entry;
519
520         struct io_wq_work       work;
521 };
522
523 #define IO_PLUG_THRESHOLD               2
524 #define IO_IOPOLL_BATCH                 8
525
526 struct io_submit_state {
527         struct blk_plug         plug;
528
529         /*
530          * io_kiocb alloc cache
531          */
532         void                    *reqs[IO_IOPOLL_BATCH];
533         unsigned                int free_reqs;
534         unsigned                int cur_req;
535
536         /*
537          * File reference cache
538          */
539         struct file             *file;
540         unsigned int            fd;
541         unsigned int            has_refs;
542         unsigned int            used_refs;
543         unsigned int            ios_left;
544 };
545
546 struct io_op_def {
547         /* needs req->io allocated for deferral/async */
548         unsigned                async_ctx : 1;
549         /* needs current->mm setup, does mm access */
550         unsigned                needs_mm : 1;
551         /* needs req->file assigned */
552         unsigned                needs_file : 1;
553         /* needs req->file assigned IFF fd is >= 0 */
554         unsigned                fd_non_neg : 1;
555         /* hash wq insertion if file is a regular file */
556         unsigned                hash_reg_file : 1;
557         /* unbound wq insertion if file is a non-regular file */
558         unsigned                unbound_nonreg_file : 1;
559         /* opcode is not supported by this kernel */
560         unsigned                not_supported : 1;
561 };
562
563 static const struct io_op_def io_op_defs[] = {
564         {
565                 /* IORING_OP_NOP */
566         },
567         {
568                 /* IORING_OP_READV */
569                 .async_ctx              = 1,
570                 .needs_mm               = 1,
571                 .needs_file             = 1,
572                 .unbound_nonreg_file    = 1,
573         },
574         {
575                 /* IORING_OP_WRITEV */
576                 .async_ctx              = 1,
577                 .needs_mm               = 1,
578                 .needs_file             = 1,
579                 .hash_reg_file          = 1,
580                 .unbound_nonreg_file    = 1,
581         },
582         {
583                 /* IORING_OP_FSYNC */
584                 .needs_file             = 1,
585         },
586         {
587                 /* IORING_OP_READ_FIXED */
588                 .needs_file             = 1,
589                 .unbound_nonreg_file    = 1,
590         },
591         {
592                 /* IORING_OP_WRITE_FIXED */
593                 .needs_file             = 1,
594                 .hash_reg_file          = 1,
595                 .unbound_nonreg_file    = 1,
596         },
597         {
598                 /* IORING_OP_POLL_ADD */
599                 .needs_file             = 1,
600                 .unbound_nonreg_file    = 1,
601         },
602         {
603                 /* IORING_OP_POLL_REMOVE */
604         },
605         {
606                 /* IORING_OP_SYNC_FILE_RANGE */
607                 .needs_file             = 1,
608         },
609         {
610                 /* IORING_OP_SENDMSG */
611                 .async_ctx              = 1,
612                 .needs_mm               = 1,
613                 .needs_file             = 1,
614                 .unbound_nonreg_file    = 1,
615         },
616         {
617                 /* IORING_OP_RECVMSG */
618                 .async_ctx              = 1,
619                 .needs_mm               = 1,
620                 .needs_file             = 1,
621                 .unbound_nonreg_file    = 1,
622         },
623         {
624                 /* IORING_OP_TIMEOUT */
625                 .async_ctx              = 1,
626                 .needs_mm               = 1,
627         },
628         {
629                 /* IORING_OP_TIMEOUT_REMOVE */
630         },
631         {
632                 /* IORING_OP_ACCEPT */
633                 .needs_mm               = 1,
634                 .needs_file             = 1,
635                 .unbound_nonreg_file    = 1,
636         },
637         {
638                 /* IORING_OP_ASYNC_CANCEL */
639         },
640         {
641                 /* IORING_OP_LINK_TIMEOUT */
642                 .async_ctx              = 1,
643                 .needs_mm               = 1,
644         },
645         {
646                 /* IORING_OP_CONNECT */
647                 .async_ctx              = 1,
648                 .needs_mm               = 1,
649                 .needs_file             = 1,
650                 .unbound_nonreg_file    = 1,
651         },
652         {
653                 /* IORING_OP_FALLOCATE */
654                 .needs_file             = 1,
655         },
656         {
657                 /* IORING_OP_OPENAT */
658                 .needs_file             = 1,
659                 .fd_non_neg             = 1,
660         },
661         {
662                 /* IORING_OP_CLOSE */
663                 .needs_file             = 1,
664         },
665         {
666                 /* IORING_OP_FILES_UPDATE */
667                 .needs_mm               = 1,
668         },
669         {
670                 /* IORING_OP_STATX */
671                 .needs_mm               = 1,
672                 .needs_file             = 1,
673                 .fd_non_neg             = 1,
674         },
675         {
676                 /* IORING_OP_READ */
677                 .needs_mm               = 1,
678                 .needs_file             = 1,
679                 .unbound_nonreg_file    = 1,
680         },
681         {
682                 /* IORING_OP_WRITE */
683                 .needs_mm               = 1,
684                 .needs_file             = 1,
685                 .unbound_nonreg_file    = 1,
686         },
687         {
688                 /* IORING_OP_FADVISE */
689                 .needs_file             = 1,
690         },
691         {
692                 /* IORING_OP_MADVISE */
693                 .needs_mm               = 1,
694         },
695         {
696                 /* IORING_OP_SEND */
697                 .needs_mm               = 1,
698                 .needs_file             = 1,
699                 .unbound_nonreg_file    = 1,
700         },
701         {
702                 /* IORING_OP_RECV */
703                 .needs_mm               = 1,
704                 .needs_file             = 1,
705                 .unbound_nonreg_file    = 1,
706         },
707         {
708                 /* IORING_OP_OPENAT2 */
709                 .needs_file             = 1,
710                 .fd_non_neg             = 1,
711         },
712 };
713
714 static void io_wq_submit_work(struct io_wq_work **workptr);
715 static void io_cqring_fill_event(struct io_kiocb *req, long res);
716 static void io_put_req(struct io_kiocb *req);
717 static void __io_double_put_req(struct io_kiocb *req);
718 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
719 static void io_queue_linked_timeout(struct io_kiocb *req);
720 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
721                                  struct io_uring_files_update *ip,
722                                  unsigned nr_args);
723
724 static struct kmem_cache *req_cachep;
725
726 static const struct file_operations io_uring_fops;
727
728 struct sock *io_uring_get_socket(struct file *file)
729 {
730 #if defined(CONFIG_UNIX)
731         if (file->f_op == &io_uring_fops) {
732                 struct io_ring_ctx *ctx = file->private_data;
733
734                 return ctx->ring_sock->sk;
735         }
736 #endif
737         return NULL;
738 }
739 EXPORT_SYMBOL(io_uring_get_socket);
740
741 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
742 {
743         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
744
745         complete(&ctx->completions[0]);
746 }
747
748 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
749 {
750         struct io_ring_ctx *ctx;
751         int hash_bits;
752
753         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
754         if (!ctx)
755                 return NULL;
756
757         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
758         if (!ctx->fallback_req)
759                 goto err;
760
761         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
762         if (!ctx->completions)
763                 goto err;
764
765         /*
766          * Use 5 bits less than the max cq entries, that should give us around
767          * 32 entries per hash list if totally full and uniformly spread.
768          */
769         hash_bits = ilog2(p->cq_entries);
770         hash_bits -= 5;
771         if (hash_bits <= 0)
772                 hash_bits = 1;
773         ctx->cancel_hash_bits = hash_bits;
774         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
775                                         GFP_KERNEL);
776         if (!ctx->cancel_hash)
777                 goto err;
778         __hash_init(ctx->cancel_hash, 1U << hash_bits);
779
780         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
781                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
782                 goto err;
783
784         ctx->flags = p->flags;
785         init_waitqueue_head(&ctx->cq_wait);
786         INIT_LIST_HEAD(&ctx->cq_overflow_list);
787         init_completion(&ctx->completions[0]);
788         init_completion(&ctx->completions[1]);
789         mutex_init(&ctx->uring_lock);
790         init_waitqueue_head(&ctx->wait);
791         spin_lock_init(&ctx->completion_lock);
792         init_llist_head(&ctx->poll_llist);
793         INIT_LIST_HEAD(&ctx->poll_list);
794         INIT_LIST_HEAD(&ctx->defer_list);
795         INIT_LIST_HEAD(&ctx->timeout_list);
796         init_waitqueue_head(&ctx->inflight_wait);
797         spin_lock_init(&ctx->inflight_lock);
798         INIT_LIST_HEAD(&ctx->inflight_list);
799         return ctx;
800 err:
801         if (ctx->fallback_req)
802                 kmem_cache_free(req_cachep, ctx->fallback_req);
803         kfree(ctx->completions);
804         kfree(ctx->cancel_hash);
805         kfree(ctx);
806         return NULL;
807 }
808
809 static inline bool __req_need_defer(struct io_kiocb *req)
810 {
811         struct io_ring_ctx *ctx = req->ctx;
812
813         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
814                                         + atomic_read(&ctx->cached_cq_overflow);
815 }
816
817 static inline bool req_need_defer(struct io_kiocb *req)
818 {
819         if (unlikely(req->flags & REQ_F_IO_DRAIN))
820                 return __req_need_defer(req);
821
822         return false;
823 }
824
825 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
826 {
827         struct io_kiocb *req;
828
829         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
830         if (req && !req_need_defer(req)) {
831                 list_del_init(&req->list);
832                 return req;
833         }
834
835         return NULL;
836 }
837
838 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
839 {
840         struct io_kiocb *req;
841
842         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
843         if (req) {
844                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
845                         return NULL;
846                 if (!__req_need_defer(req)) {
847                         list_del_init(&req->list);
848                         return req;
849                 }
850         }
851
852         return NULL;
853 }
854
855 static void __io_commit_cqring(struct io_ring_ctx *ctx)
856 {
857         struct io_rings *rings = ctx->rings;
858
859         /* order cqe stores with ring update */
860         smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
861
862         if (wq_has_sleeper(&ctx->cq_wait)) {
863                 wake_up_interruptible(&ctx->cq_wait);
864                 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
865         }
866 }
867
868 static inline bool io_prep_async_work(struct io_kiocb *req,
869                                       struct io_kiocb **link)
870 {
871         const struct io_op_def *def = &io_op_defs[req->opcode];
872         bool do_hashed = false;
873
874         if (req->flags & REQ_F_ISREG) {
875                 if (def->hash_reg_file)
876                         do_hashed = true;
877         } else {
878                 if (def->unbound_nonreg_file)
879                         req->work.flags |= IO_WQ_WORK_UNBOUND;
880         }
881         if (def->needs_mm)
882                 req->work.flags |= IO_WQ_WORK_NEEDS_USER;
883
884         *link = io_prep_linked_timeout(req);
885         return do_hashed;
886 }
887
888 static inline void io_queue_async_work(struct io_kiocb *req)
889 {
890         struct io_ring_ctx *ctx = req->ctx;
891         struct io_kiocb *link;
892         bool do_hashed;
893
894         do_hashed = io_prep_async_work(req, &link);
895
896         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
897                                         req->flags);
898         if (!do_hashed) {
899                 io_wq_enqueue(ctx->io_wq, &req->work);
900         } else {
901                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
902                                         file_inode(req->file));
903         }
904
905         if (link)
906                 io_queue_linked_timeout(link);
907 }
908
909 static void io_kill_timeout(struct io_kiocb *req)
910 {
911         int ret;
912
913         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
914         if (ret != -1) {
915                 atomic_inc(&req->ctx->cq_timeouts);
916                 list_del_init(&req->list);
917                 io_cqring_fill_event(req, 0);
918                 io_put_req(req);
919         }
920 }
921
922 static void io_kill_timeouts(struct io_ring_ctx *ctx)
923 {
924         struct io_kiocb *req, *tmp;
925
926         spin_lock_irq(&ctx->completion_lock);
927         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
928                 io_kill_timeout(req);
929         spin_unlock_irq(&ctx->completion_lock);
930 }
931
932 static void io_commit_cqring(struct io_ring_ctx *ctx)
933 {
934         struct io_kiocb *req;
935
936         while ((req = io_get_timeout_req(ctx)) != NULL)
937                 io_kill_timeout(req);
938
939         __io_commit_cqring(ctx);
940
941         while ((req = io_get_deferred_req(ctx)) != NULL)
942                 io_queue_async_work(req);
943 }
944
945 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
946 {
947         struct io_rings *rings = ctx->rings;
948         unsigned tail;
949
950         tail = ctx->cached_cq_tail;
951         /*
952          * writes to the cq entry need to come after reading head; the
953          * control dependency is enough as we're using WRITE_ONCE to
954          * fill the cq entry
955          */
956         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
957                 return NULL;
958
959         ctx->cached_cq_tail++;
960         return &rings->cqes[tail & ctx->cq_mask];
961 }
962
963 static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx)
964 {
965         if (!ctx->eventfd_async)
966                 return true;
967         return io_wq_current_is_worker() || in_interrupt();
968 }
969
970 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
971 {
972         if (waitqueue_active(&ctx->wait))
973                 wake_up(&ctx->wait);
974         if (waitqueue_active(&ctx->sqo_wait))
975                 wake_up(&ctx->sqo_wait);
976         if (ctx->cq_ev_fd && io_should_trigger_evfd(ctx))
977                 eventfd_signal(ctx->cq_ev_fd, 1);
978 }
979
980 /* Returns true if there are no backlogged entries after the flush */
981 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
982 {
983         struct io_rings *rings = ctx->rings;
984         struct io_uring_cqe *cqe;
985         struct io_kiocb *req;
986         unsigned long flags;
987         LIST_HEAD(list);
988
989         if (!force) {
990                 if (list_empty_careful(&ctx->cq_overflow_list))
991                         return true;
992                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
993                     rings->cq_ring_entries))
994                         return false;
995         }
996
997         spin_lock_irqsave(&ctx->completion_lock, flags);
998
999         /* if force is set, the ring is going away. always drop after that */
1000         if (force)
1001                 ctx->cq_overflow_flushed = 1;
1002
1003         cqe = NULL;
1004         while (!list_empty(&ctx->cq_overflow_list)) {
1005                 cqe = io_get_cqring(ctx);
1006                 if (!cqe && !force)
1007                         break;
1008
1009                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
1010                                                 list);
1011                 list_move(&req->list, &list);
1012                 if (cqe) {
1013                         WRITE_ONCE(cqe->user_data, req->user_data);
1014                         WRITE_ONCE(cqe->res, req->result);
1015                         WRITE_ONCE(cqe->flags, 0);
1016                 } else {
1017                         WRITE_ONCE(ctx->rings->cq_overflow,
1018                                 atomic_inc_return(&ctx->cached_cq_overflow));
1019                 }
1020         }
1021
1022         io_commit_cqring(ctx);
1023         if (cqe) {
1024                 clear_bit(0, &ctx->sq_check_overflow);
1025                 clear_bit(0, &ctx->cq_check_overflow);
1026         }
1027         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1028         io_cqring_ev_posted(ctx);
1029
1030         while (!list_empty(&list)) {
1031                 req = list_first_entry(&list, struct io_kiocb, list);
1032                 list_del(&req->list);
1033                 io_put_req(req);
1034         }
1035
1036         return cqe != NULL;
1037 }
1038
1039 static void io_cqring_fill_event(struct io_kiocb *req, long res)
1040 {
1041         struct io_ring_ctx *ctx = req->ctx;
1042         struct io_uring_cqe *cqe;
1043
1044         trace_io_uring_complete(ctx, req->user_data, res);
1045
1046         /*
1047          * If we can't get a cq entry, userspace overflowed the
1048          * submission (by quite a lot). Increment the overflow count in
1049          * the ring.
1050          */
1051         cqe = io_get_cqring(ctx);
1052         if (likely(cqe)) {
1053                 WRITE_ONCE(cqe->user_data, req->user_data);
1054                 WRITE_ONCE(cqe->res, res);
1055                 WRITE_ONCE(cqe->flags, 0);
1056         } else if (ctx->cq_overflow_flushed) {
1057                 WRITE_ONCE(ctx->rings->cq_overflow,
1058                                 atomic_inc_return(&ctx->cached_cq_overflow));
1059         } else {
1060                 if (list_empty(&ctx->cq_overflow_list)) {
1061                         set_bit(0, &ctx->sq_check_overflow);
1062                         set_bit(0, &ctx->cq_check_overflow);
1063                 }
1064                 refcount_inc(&req->refs);
1065                 req->result = res;
1066                 list_add_tail(&req->list, &ctx->cq_overflow_list);
1067         }
1068 }
1069
1070 static void io_cqring_add_event(struct io_kiocb *req, long res)
1071 {
1072         struct io_ring_ctx *ctx = req->ctx;
1073         unsigned long flags;
1074
1075         spin_lock_irqsave(&ctx->completion_lock, flags);
1076         io_cqring_fill_event(req, res);
1077         io_commit_cqring(ctx);
1078         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1079
1080         io_cqring_ev_posted(ctx);
1081 }
1082
1083 static inline bool io_is_fallback_req(struct io_kiocb *req)
1084 {
1085         return req == (struct io_kiocb *)
1086                         ((unsigned long) req->ctx->fallback_req & ~1UL);
1087 }
1088
1089 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
1090 {
1091         struct io_kiocb *req;
1092
1093         req = ctx->fallback_req;
1094         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
1095                 return req;
1096
1097         return NULL;
1098 }
1099
1100 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
1101                                    struct io_submit_state *state)
1102 {
1103         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1104         struct io_kiocb *req;
1105
1106         if (!state) {
1107                 req = kmem_cache_alloc(req_cachep, gfp);
1108                 if (unlikely(!req))
1109                         goto fallback;
1110         } else if (!state->free_reqs) {
1111                 size_t sz;
1112                 int ret;
1113
1114                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
1115                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
1116
1117                 /*
1118                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
1119                  * retry single alloc to be on the safe side.
1120                  */
1121                 if (unlikely(ret <= 0)) {
1122                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1123                         if (!state->reqs[0])
1124                                 goto fallback;
1125                         ret = 1;
1126                 }
1127                 state->free_reqs = ret - 1;
1128                 state->cur_req = 1;
1129                 req = state->reqs[0];
1130         } else {
1131                 req = state->reqs[state->cur_req];
1132                 state->free_reqs--;
1133                 state->cur_req++;
1134         }
1135
1136 got_it:
1137         req->io = NULL;
1138         req->file = NULL;
1139         req->ctx = ctx;
1140         req->flags = 0;
1141         /* one is dropped after submission, the other at completion */
1142         refcount_set(&req->refs, 2);
1143         req->result = 0;
1144         INIT_IO_WORK(&req->work, io_wq_submit_work);
1145         return req;
1146 fallback:
1147         req = io_get_fallback_req(ctx);
1148         if (req)
1149                 goto got_it;
1150         percpu_ref_put(&ctx->refs);
1151         return NULL;
1152 }
1153
1154 static void __io_req_do_free(struct io_kiocb *req)
1155 {
1156         if (likely(!io_is_fallback_req(req)))
1157                 kmem_cache_free(req_cachep, req);
1158         else
1159                 clear_bit_unlock(0, (unsigned long *) req->ctx->fallback_req);
1160 }
1161
1162 static void __io_req_aux_free(struct io_kiocb *req)
1163 {
1164         struct io_ring_ctx *ctx = req->ctx;
1165
1166         kfree(req->io);
1167         if (req->file) {
1168                 if (req->flags & REQ_F_FIXED_FILE)
1169                         percpu_ref_put(&ctx->file_data->refs);
1170                 else
1171                         fput(req->file);
1172         }
1173 }
1174
1175 static void __io_free_req(struct io_kiocb *req)
1176 {
1177         __io_req_aux_free(req);
1178
1179         if (req->flags & REQ_F_INFLIGHT) {
1180                 struct io_ring_ctx *ctx = req->ctx;
1181                 unsigned long flags;
1182
1183                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1184                 list_del(&req->inflight_entry);
1185                 if (waitqueue_active(&ctx->inflight_wait))
1186                         wake_up(&ctx->inflight_wait);
1187                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1188         }
1189
1190         percpu_ref_put(&req->ctx->refs);
1191         __io_req_do_free(req);
1192 }
1193
1194 struct req_batch {
1195         void *reqs[IO_IOPOLL_BATCH];
1196         int to_free;
1197         int need_iter;
1198 };
1199
1200 static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
1201 {
1202         int fixed_refs = rb->to_free;
1203
1204         if (!rb->to_free)
1205                 return;
1206         if (rb->need_iter) {
1207                 int i, inflight = 0;
1208                 unsigned long flags;
1209
1210                 fixed_refs = 0;
1211                 for (i = 0; i < rb->to_free; i++) {
1212                         struct io_kiocb *req = rb->reqs[i];
1213
1214                         if (req->flags & REQ_F_FIXED_FILE) {
1215                                 req->file = NULL;
1216                                 fixed_refs++;
1217                         }
1218                         if (req->flags & REQ_F_INFLIGHT)
1219                                 inflight++;
1220                         __io_req_aux_free(req);
1221                 }
1222                 if (!inflight)
1223                         goto do_free;
1224
1225                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1226                 for (i = 0; i < rb->to_free; i++) {
1227                         struct io_kiocb *req = rb->reqs[i];
1228
1229                         if (req->flags & REQ_F_INFLIGHT) {
1230                                 list_del(&req->inflight_entry);
1231                                 if (!--inflight)
1232                                         break;
1233                         }
1234                 }
1235                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1236
1237                 if (waitqueue_active(&ctx->inflight_wait))
1238                         wake_up(&ctx->inflight_wait);
1239         }
1240 do_free:
1241         kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
1242         if (fixed_refs)
1243                 percpu_ref_put_many(&ctx->file_data->refs, fixed_refs);
1244         percpu_ref_put_many(&ctx->refs, rb->to_free);
1245         rb->to_free = rb->need_iter = 0;
1246 }
1247
1248 static bool io_link_cancel_timeout(struct io_kiocb *req)
1249 {
1250         struct io_ring_ctx *ctx = req->ctx;
1251         int ret;
1252
1253         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
1254         if (ret != -1) {
1255                 io_cqring_fill_event(req, -ECANCELED);
1256                 io_commit_cqring(ctx);
1257                 req->flags &= ~REQ_F_LINK;
1258                 io_put_req(req);
1259                 return true;
1260         }
1261
1262         return false;
1263 }
1264
1265 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1266 {
1267         struct io_ring_ctx *ctx = req->ctx;
1268         bool wake_ev = false;
1269
1270         /* Already got next link */
1271         if (req->flags & REQ_F_LINK_NEXT)
1272                 return;
1273
1274         /*
1275          * The list should never be empty when we are called here. But could
1276          * potentially happen if the chain is messed up, check to be on the
1277          * safe side.
1278          */
1279         while (!list_empty(&req->link_list)) {
1280                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
1281                                                 struct io_kiocb, link_list);
1282
1283                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
1284                              (nxt->flags & REQ_F_TIMEOUT))) {
1285                         list_del_init(&nxt->link_list);
1286                         wake_ev |= io_link_cancel_timeout(nxt);
1287                         req->flags &= ~REQ_F_LINK_TIMEOUT;
1288                         continue;
1289                 }
1290
1291                 list_del_init(&req->link_list);
1292                 if (!list_empty(&nxt->link_list))
1293                         nxt->flags |= REQ_F_LINK;
1294                 *nxtptr = nxt;
1295                 break;
1296         }
1297
1298         req->flags |= REQ_F_LINK_NEXT;
1299         if (wake_ev)
1300                 io_cqring_ev_posted(ctx);
1301 }
1302
1303 /*
1304  * Called if REQ_F_LINK is set, and we fail the head request
1305  */
1306 static void io_fail_links(struct io_kiocb *req)
1307 {
1308         struct io_ring_ctx *ctx = req->ctx;
1309         unsigned long flags;
1310
1311         spin_lock_irqsave(&ctx->completion_lock, flags);
1312
1313         while (!list_empty(&req->link_list)) {
1314                 struct io_kiocb *link = list_first_entry(&req->link_list,
1315                                                 struct io_kiocb, link_list);
1316
1317                 list_del_init(&link->link_list);
1318                 trace_io_uring_fail_link(req, link);
1319
1320                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
1321                     link->opcode == IORING_OP_LINK_TIMEOUT) {
1322                         io_link_cancel_timeout(link);
1323                 } else {
1324                         io_cqring_fill_event(link, -ECANCELED);
1325                         __io_double_put_req(link);
1326                 }
1327                 req->flags &= ~REQ_F_LINK_TIMEOUT;
1328         }
1329
1330         io_commit_cqring(ctx);
1331         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1332         io_cqring_ev_posted(ctx);
1333 }
1334
1335 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1336 {
1337         if (likely(!(req->flags & REQ_F_LINK)))
1338                 return;
1339
1340         /*
1341          * If LINK is set, we have dependent requests in this chain. If we
1342          * didn't fail this request, queue the first one up, moving any other
1343          * dependencies to the next request. In case of failure, fail the rest
1344          * of the chain.
1345          */
1346         if (req->flags & REQ_F_FAIL_LINK) {
1347                 io_fail_links(req);
1348         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1349                         REQ_F_LINK_TIMEOUT) {
1350                 struct io_ring_ctx *ctx = req->ctx;
1351                 unsigned long flags;
1352
1353                 /*
1354                  * If this is a timeout link, we could be racing with the
1355                  * timeout timer. Grab the completion lock for this case to
1356                  * protect against that.
1357                  */
1358                 spin_lock_irqsave(&ctx->completion_lock, flags);
1359                 io_req_link_next(req, nxt);
1360                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1361         } else {
1362                 io_req_link_next(req, nxt);
1363         }
1364 }
1365
1366 static void io_free_req(struct io_kiocb *req)
1367 {
1368         struct io_kiocb *nxt = NULL;
1369
1370         io_req_find_next(req, &nxt);
1371         __io_free_req(req);
1372
1373         if (nxt)
1374                 io_queue_async_work(nxt);
1375 }
1376
1377 /*
1378  * Drop reference to request, return next in chain (if there is one) if this
1379  * was the last reference to this request.
1380  */
1381 __attribute__((nonnull))
1382 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1383 {
1384         io_req_find_next(req, nxtptr);
1385
1386         if (refcount_dec_and_test(&req->refs))
1387                 __io_free_req(req);
1388 }
1389
1390 static void io_put_req(struct io_kiocb *req)
1391 {
1392         if (refcount_dec_and_test(&req->refs))
1393                 io_free_req(req);
1394 }
1395
1396 /*
1397  * Must only be used if we don't need to care about links, usually from
1398  * within the completion handling itself.
1399  */
1400 static void __io_double_put_req(struct io_kiocb *req)
1401 {
1402         /* drop both submit and complete references */
1403         if (refcount_sub_and_test(2, &req->refs))
1404                 __io_free_req(req);
1405 }
1406
1407 static void io_double_put_req(struct io_kiocb *req)
1408 {
1409         /* drop both submit and complete references */
1410         if (refcount_sub_and_test(2, &req->refs))
1411                 io_free_req(req);
1412 }
1413
1414 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1415 {
1416         struct io_rings *rings = ctx->rings;
1417
1418         if (test_bit(0, &ctx->cq_check_overflow)) {
1419                 /*
1420                  * noflush == true is from the waitqueue handler, just ensure
1421                  * we wake up the task, and the next invocation will flush the
1422                  * entries. We cannot safely to it from here.
1423                  */
1424                 if (noflush && !list_empty(&ctx->cq_overflow_list))
1425                         return -1U;
1426
1427                 io_cqring_overflow_flush(ctx, false);
1428         }
1429
1430         /* See comment at the top of this file */
1431         smp_rmb();
1432         return ctx->cached_cq_tail - READ_ONCE(rings->cq.head);
1433 }
1434
1435 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1436 {
1437         struct io_rings *rings = ctx->rings;
1438
1439         /* make sure SQ entry isn't read before tail */
1440         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1441 }
1442
1443 static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req)
1444 {
1445         if ((req->flags & REQ_F_LINK) || io_is_fallback_req(req))
1446                 return false;
1447
1448         if (!(req->flags & REQ_F_FIXED_FILE) || req->io)
1449                 rb->need_iter++;
1450
1451         rb->reqs[rb->to_free++] = req;
1452         if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
1453                 io_free_req_many(req->ctx, rb);
1454         return true;
1455 }
1456
1457 /*
1458  * Find and free completed poll iocbs
1459  */
1460 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1461                                struct list_head *done)
1462 {
1463         struct req_batch rb;
1464         struct io_kiocb *req;
1465
1466         rb.to_free = rb.need_iter = 0;
1467         while (!list_empty(done)) {
1468                 req = list_first_entry(done, struct io_kiocb, list);
1469                 list_del(&req->list);
1470
1471                 io_cqring_fill_event(req, req->result);
1472                 (*nr_events)++;
1473
1474                 if (refcount_dec_and_test(&req->refs) &&
1475                     !io_req_multi_free(&rb, req))
1476                         io_free_req(req);
1477         }
1478
1479         io_commit_cqring(ctx);
1480         io_free_req_many(ctx, &rb);
1481 }
1482
1483 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1484                         long min)
1485 {
1486         struct io_kiocb *req, *tmp;
1487         LIST_HEAD(done);
1488         bool spin;
1489         int ret;
1490
1491         /*
1492          * Only spin for completions if we don't have multiple devices hanging
1493          * off our complete list, and we're under the requested amount.
1494          */
1495         spin = !ctx->poll_multi_file && *nr_events < min;
1496
1497         ret = 0;
1498         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1499                 struct kiocb *kiocb = &req->rw.kiocb;
1500
1501                 /*
1502                  * Move completed entries to our local list. If we find a
1503                  * request that requires polling, break out and complete
1504                  * the done list first, if we have entries there.
1505                  */
1506                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1507                         list_move_tail(&req->list, &done);
1508                         continue;
1509                 }
1510                 if (!list_empty(&done))
1511                         break;
1512
1513                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1514                 if (ret < 0)
1515                         break;
1516
1517                 if (ret && spin)
1518                         spin = false;
1519                 ret = 0;
1520         }
1521
1522         if (!list_empty(&done))
1523                 io_iopoll_complete(ctx, nr_events, &done);
1524
1525         return ret;
1526 }
1527
1528 /*
1529  * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1530  * non-spinning poll check - we'll still enter the driver poll loop, but only
1531  * as a non-spinning completion check.
1532  */
1533 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1534                                 long min)
1535 {
1536         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1537                 int ret;
1538
1539                 ret = io_do_iopoll(ctx, nr_events, min);
1540                 if (ret < 0)
1541                         return ret;
1542                 if (!min || *nr_events >= min)
1543                         return 0;
1544         }
1545
1546         return 1;
1547 }
1548
1549 /*
1550  * We can't just wait for polled events to come to us, we have to actively
1551  * find and complete them.
1552  */
1553 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1554 {
1555         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1556                 return;
1557
1558         mutex_lock(&ctx->uring_lock);
1559         while (!list_empty(&ctx->poll_list)) {
1560                 unsigned int nr_events = 0;
1561
1562                 io_iopoll_getevents(ctx, &nr_events, 1);
1563
1564                 /*
1565                  * Ensure we allow local-to-the-cpu processing to take place,
1566                  * in this case we need to ensure that we reap all events.
1567                  */
1568                 cond_resched();
1569         }
1570         mutex_unlock(&ctx->uring_lock);
1571 }
1572
1573 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1574                             long min)
1575 {
1576         int iters = 0, ret = 0;
1577
1578         do {
1579                 int tmin = 0;
1580
1581                 /*
1582                  * Don't enter poll loop if we already have events pending.
1583                  * If we do, we can potentially be spinning for commands that
1584                  * already triggered a CQE (eg in error).
1585                  */
1586                 if (io_cqring_events(ctx, false))
1587                         break;
1588
1589                 /*
1590                  * If a submit got punted to a workqueue, we can have the
1591                  * application entering polling for a command before it gets
1592                  * issued. That app will hold the uring_lock for the duration
1593                  * of the poll right here, so we need to take a breather every
1594                  * now and then to ensure that the issue has a chance to add
1595                  * the poll to the issued list. Otherwise we can spin here
1596                  * forever, while the workqueue is stuck trying to acquire the
1597                  * very same mutex.
1598                  */
1599                 if (!(++iters & 7)) {
1600                         mutex_unlock(&ctx->uring_lock);
1601                         mutex_lock(&ctx->uring_lock);
1602                 }
1603
1604                 if (*nr_events < min)
1605                         tmin = min - *nr_events;
1606
1607                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1608                 if (ret <= 0)
1609                         break;
1610                 ret = 0;
1611         } while (min && !*nr_events && !need_resched());
1612
1613         return ret;
1614 }
1615
1616 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1617                            long min)
1618 {
1619         int ret;
1620
1621         /*
1622          * We disallow the app entering submit/complete with polling, but we
1623          * still need to lock the ring to prevent racing with polled issue
1624          * that got punted to a workqueue.
1625          */
1626         mutex_lock(&ctx->uring_lock);
1627         ret = __io_iopoll_check(ctx, nr_events, min);
1628         mutex_unlock(&ctx->uring_lock);
1629         return ret;
1630 }
1631
1632 static void kiocb_end_write(struct io_kiocb *req)
1633 {
1634         /*
1635          * Tell lockdep we inherited freeze protection from submission
1636          * thread.
1637          */
1638         if (req->flags & REQ_F_ISREG) {
1639                 struct inode *inode = file_inode(req->file);
1640
1641                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1642         }
1643         file_end_write(req->file);
1644 }
1645
1646 static inline void req_set_fail_links(struct io_kiocb *req)
1647 {
1648         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1649                 req->flags |= REQ_F_FAIL_LINK;
1650 }
1651
1652 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1653 {
1654         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1655
1656         if (kiocb->ki_flags & IOCB_WRITE)
1657                 kiocb_end_write(req);
1658
1659         if (res != req->result)
1660                 req_set_fail_links(req);
1661         io_cqring_add_event(req, res);
1662 }
1663
1664 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1665 {
1666         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1667
1668         io_complete_rw_common(kiocb, res);
1669         io_put_req(req);
1670 }
1671
1672 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1673 {
1674         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1675         struct io_kiocb *nxt = NULL;
1676
1677         io_complete_rw_common(kiocb, res);
1678         io_put_req_find_next(req, &nxt);
1679
1680         return nxt;
1681 }
1682
1683 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1684 {
1685         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1686
1687         if (kiocb->ki_flags & IOCB_WRITE)
1688                 kiocb_end_write(req);
1689
1690         if (res != req->result)
1691                 req_set_fail_links(req);
1692         req->result = res;
1693         if (res != -EAGAIN)
1694                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1695 }
1696
1697 /*
1698  * After the iocb has been issued, it's safe to be found on the poll list.
1699  * Adding the kiocb to the list AFTER submission ensures that we don't
1700  * find it from a io_iopoll_getevents() thread before the issuer is done
1701  * accessing the kiocb cookie.
1702  */
1703 static void io_iopoll_req_issued(struct io_kiocb *req)
1704 {
1705         struct io_ring_ctx *ctx = req->ctx;
1706
1707         /*
1708          * Track whether we have multiple files in our lists. This will impact
1709          * how we do polling eventually, not spinning if we're on potentially
1710          * different devices.
1711          */
1712         if (list_empty(&ctx->poll_list)) {
1713                 ctx->poll_multi_file = false;
1714         } else if (!ctx->poll_multi_file) {
1715                 struct io_kiocb *list_req;
1716
1717                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1718                                                 list);
1719                 if (list_req->file != req->file)
1720                         ctx->poll_multi_file = true;
1721         }
1722
1723         /*
1724          * For fast devices, IO may have already completed. If it has, add
1725          * it to the front so we find it first.
1726          */
1727         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1728                 list_add(&req->list, &ctx->poll_list);
1729         else
1730                 list_add_tail(&req->list, &ctx->poll_list);
1731 }
1732
1733 static void io_file_put(struct io_submit_state *state)
1734 {
1735         if (state->file) {
1736                 int diff = state->has_refs - state->used_refs;
1737
1738                 if (diff)
1739                         fput_many(state->file, diff);
1740                 state->file = NULL;
1741         }
1742 }
1743
1744 /*
1745  * Get as many references to a file as we have IOs left in this submission,
1746  * assuming most submissions are for one file, or at least that each file
1747  * has more than one submission.
1748  */
1749 static struct file *io_file_get(struct io_submit_state *state, int fd)
1750 {
1751         if (!state)
1752                 return fget(fd);
1753
1754         if (state->file) {
1755                 if (state->fd == fd) {
1756                         state->used_refs++;
1757                         state->ios_left--;
1758                         return state->file;
1759                 }
1760                 io_file_put(state);
1761         }
1762         state->file = fget_many(fd, state->ios_left);
1763         if (!state->file)
1764                 return NULL;
1765
1766         state->fd = fd;
1767         state->has_refs = state->ios_left;
1768         state->used_refs = 1;
1769         state->ios_left--;
1770         return state->file;
1771 }
1772
1773 /*
1774  * If we tracked the file through the SCM inflight mechanism, we could support
1775  * any file. For now, just ensure that anything potentially problematic is done
1776  * inline.
1777  */
1778 static bool io_file_supports_async(struct file *file)
1779 {
1780         umode_t mode = file_inode(file)->i_mode;
1781
1782         if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1783                 return true;
1784         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1785                 return true;
1786
1787         return false;
1788 }
1789
1790 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1791                       bool force_nonblock)
1792 {
1793         struct io_ring_ctx *ctx = req->ctx;
1794         struct kiocb *kiocb = &req->rw.kiocb;
1795         unsigned ioprio;
1796         int ret;
1797
1798         if (!req->file)
1799                 return -EBADF;
1800
1801         if (S_ISREG(file_inode(req->file)->i_mode))
1802                 req->flags |= REQ_F_ISREG;
1803
1804         kiocb->ki_pos = READ_ONCE(sqe->off);
1805         if (kiocb->ki_pos == -1 && !(req->file->f_mode & FMODE_STREAM)) {
1806                 req->flags |= REQ_F_CUR_POS;
1807                 kiocb->ki_pos = req->file->f_pos;
1808         }
1809         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1810         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1811
1812         ioprio = READ_ONCE(sqe->ioprio);
1813         if (ioprio) {
1814                 ret = ioprio_check_cap(ioprio);
1815                 if (ret)
1816                         return ret;
1817
1818                 kiocb->ki_ioprio = ioprio;
1819         } else
1820                 kiocb->ki_ioprio = get_current_ioprio();
1821
1822         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1823         if (unlikely(ret))
1824                 return ret;
1825
1826         /* don't allow async punt if RWF_NOWAIT was requested */
1827         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1828             (req->file->f_flags & O_NONBLOCK))
1829                 req->flags |= REQ_F_NOWAIT;
1830
1831         if (force_nonblock)
1832                 kiocb->ki_flags |= IOCB_NOWAIT;
1833
1834         if (ctx->flags & IORING_SETUP_IOPOLL) {
1835                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1836                     !kiocb->ki_filp->f_op->iopoll)
1837                         return -EOPNOTSUPP;
1838
1839                 kiocb->ki_flags |= IOCB_HIPRI;
1840                 kiocb->ki_complete = io_complete_rw_iopoll;
1841                 req->result = 0;
1842         } else {
1843                 if (kiocb->ki_flags & IOCB_HIPRI)
1844                         return -EINVAL;
1845                 kiocb->ki_complete = io_complete_rw;
1846         }
1847
1848         req->rw.addr = READ_ONCE(sqe->addr);
1849         req->rw.len = READ_ONCE(sqe->len);
1850         /* we own ->private, reuse it for the buffer index */
1851         req->rw.kiocb.private = (void *) (unsigned long)
1852                                         READ_ONCE(sqe->buf_index);
1853         return 0;
1854 }
1855
1856 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1857 {
1858         switch (ret) {
1859         case -EIOCBQUEUED:
1860                 break;
1861         case -ERESTARTSYS:
1862         case -ERESTARTNOINTR:
1863         case -ERESTARTNOHAND:
1864         case -ERESTART_RESTARTBLOCK:
1865                 /*
1866                  * We can't just restart the syscall, since previously
1867                  * submitted sqes may already be in progress. Just fail this
1868                  * IO with EINTR.
1869                  */
1870                 ret = -EINTR;
1871                 /* fall through */
1872         default:
1873                 kiocb->ki_complete(kiocb, ret, 0);
1874         }
1875 }
1876
1877 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1878                        bool in_async)
1879 {
1880         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1881
1882         if (req->flags & REQ_F_CUR_POS)
1883                 req->file->f_pos = kiocb->ki_pos;
1884         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1885                 *nxt = __io_complete_rw(kiocb, ret);
1886         else
1887                 io_rw_done(kiocb, ret);
1888 }
1889
1890 static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
1891                                struct iov_iter *iter)
1892 {
1893         struct io_ring_ctx *ctx = req->ctx;
1894         size_t len = req->rw.len;
1895         struct io_mapped_ubuf *imu;
1896         unsigned index, buf_index;
1897         size_t offset;
1898         u64 buf_addr;
1899
1900         /* attempt to use fixed buffers without having provided iovecs */
1901         if (unlikely(!ctx->user_bufs))
1902                 return -EFAULT;
1903
1904         buf_index = (unsigned long) req->rw.kiocb.private;
1905         if (unlikely(buf_index >= ctx->nr_user_bufs))
1906                 return -EFAULT;
1907
1908         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1909         imu = &ctx->user_bufs[index];
1910         buf_addr = req->rw.addr;
1911
1912         /* overflow */
1913         if (buf_addr + len < buf_addr)
1914                 return -EFAULT;
1915         /* not inside the mapped region */
1916         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1917                 return -EFAULT;
1918
1919         /*
1920          * May not be a start of buffer, set size appropriately
1921          * and advance us to the beginning.
1922          */
1923         offset = buf_addr - imu->ubuf;
1924         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1925
1926         if (offset) {
1927                 /*
1928                  * Don't use iov_iter_advance() here, as it's really slow for
1929                  * using the latter parts of a big fixed buffer - it iterates
1930                  * over each segment manually. We can cheat a bit here, because
1931                  * we know that:
1932                  *
1933                  * 1) it's a BVEC iter, we set it up
1934                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1935                  *    first and last bvec
1936                  *
1937                  * So just find our index, and adjust the iterator afterwards.
1938                  * If the offset is within the first bvec (or the whole first
1939                  * bvec, just use iov_iter_advance(). This makes it easier
1940                  * since we can just skip the first segment, which may not
1941                  * be PAGE_SIZE aligned.
1942                  */
1943                 const struct bio_vec *bvec = imu->bvec;
1944
1945                 if (offset <= bvec->bv_len) {
1946                         iov_iter_advance(iter, offset);
1947                 } else {
1948                         unsigned long seg_skip;
1949
1950                         /* skip first vec */
1951                         offset -= bvec->bv_len;
1952                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1953
1954                         iter->bvec = bvec + seg_skip;
1955                         iter->nr_segs -= seg_skip;
1956                         iter->count -= bvec->bv_len + offset;
1957                         iter->iov_offset = offset & ~PAGE_MASK;
1958                 }
1959         }
1960
1961         return len;
1962 }
1963
1964 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1965                                struct iovec **iovec, struct iov_iter *iter)
1966 {
1967         void __user *buf = u64_to_user_ptr(req->rw.addr);
1968         size_t sqe_len = req->rw.len;
1969         u8 opcode;
1970
1971         opcode = req->opcode;
1972         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1973                 *iovec = NULL;
1974                 return io_import_fixed(req, rw, iter);
1975         }
1976
1977         /* buffer index only valid with fixed read/write */
1978         if (req->rw.kiocb.private)
1979                 return -EINVAL;
1980
1981         if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
1982                 ssize_t ret;
1983                 ret = import_single_range(rw, buf, sqe_len, *iovec, iter);
1984                 *iovec = NULL;
1985                 return ret;
1986         }
1987
1988         if (req->io) {
1989                 struct io_async_rw *iorw = &req->io->rw;
1990
1991                 *iovec = iorw->iov;
1992                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1993                 if (iorw->iov == iorw->fast_iov)
1994                         *iovec = NULL;
1995                 return iorw->size;
1996         }
1997
1998         if (!req->has_user)
1999                 return -EFAULT;
2000
2001 #ifdef CONFIG_COMPAT
2002         if (req->ctx->compat)
2003                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
2004                                                 iovec, iter);
2005 #endif
2006
2007         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
2008 }
2009
2010 /*
2011  * For files that don't have ->read_iter() and ->write_iter(), handle them
2012  * by looping over ->read() or ->write() manually.
2013  */
2014 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
2015                            struct iov_iter *iter)
2016 {
2017         ssize_t ret = 0;
2018
2019         /*
2020          * Don't support polled IO through this interface, and we can't
2021          * support non-blocking either. For the latter, this just causes
2022          * the kiocb to be handled from an async context.
2023          */
2024         if (kiocb->ki_flags & IOCB_HIPRI)
2025                 return -EOPNOTSUPP;
2026         if (kiocb->ki_flags & IOCB_NOWAIT)
2027                 return -EAGAIN;
2028
2029         while (iov_iter_count(iter)) {
2030                 struct iovec iovec;
2031                 ssize_t nr;
2032
2033                 if (!iov_iter_is_bvec(iter)) {
2034                         iovec = iov_iter_iovec(iter);
2035                 } else {
2036                         /* fixed buffers import bvec */
2037                         iovec.iov_base = kmap(iter->bvec->bv_page)
2038                                                 + iter->iov_offset;
2039                         iovec.iov_len = min(iter->count,
2040                                         iter->bvec->bv_len - iter->iov_offset);
2041                 }
2042
2043                 if (rw == READ) {
2044                         nr = file->f_op->read(file, iovec.iov_base,
2045                                               iovec.iov_len, &kiocb->ki_pos);
2046                 } else {
2047                         nr = file->f_op->write(file, iovec.iov_base,
2048                                                iovec.iov_len, &kiocb->ki_pos);
2049                 }
2050
2051                 if (iov_iter_is_bvec(iter))
2052                         kunmap(iter->bvec->bv_page);
2053
2054                 if (nr < 0) {
2055                         if (!ret)
2056                                 ret = nr;
2057                         break;
2058                 }
2059                 ret += nr;
2060                 if (nr != iovec.iov_len)
2061                         break;
2062                 iov_iter_advance(iter, nr);
2063         }
2064
2065         return ret;
2066 }
2067
2068 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
2069                           struct iovec *iovec, struct iovec *fast_iov,
2070                           struct iov_iter *iter)
2071 {
2072         req->io->rw.nr_segs = iter->nr_segs;
2073         req->io->rw.size = io_size;
2074         req->io->rw.iov = iovec;
2075         if (!req->io->rw.iov) {
2076                 req->io->rw.iov = req->io->rw.fast_iov;
2077                 memcpy(req->io->rw.iov, fast_iov,
2078                         sizeof(struct iovec) * iter->nr_segs);
2079         }
2080 }
2081
2082 static int io_alloc_async_ctx(struct io_kiocb *req)
2083 {
2084         if (!io_op_defs[req->opcode].async_ctx)
2085                 return 0;
2086         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
2087         return req->io == NULL;
2088 }
2089
2090 static void io_rw_async(struct io_wq_work **workptr)
2091 {
2092         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2093         struct iovec *iov = NULL;
2094
2095         if (req->io->rw.iov != req->io->rw.fast_iov)
2096                 iov = req->io->rw.iov;
2097         io_wq_submit_work(workptr);
2098         kfree(iov);
2099 }
2100
2101 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
2102                              struct iovec *iovec, struct iovec *fast_iov,
2103                              struct iov_iter *iter)
2104 {
2105         if (req->opcode == IORING_OP_READ_FIXED ||
2106             req->opcode == IORING_OP_WRITE_FIXED)
2107                 return 0;
2108         if (!req->io && io_alloc_async_ctx(req))
2109                 return -ENOMEM;
2110
2111         io_req_map_rw(req, io_size, iovec, fast_iov, iter);
2112         req->work.func = io_rw_async;
2113         return 0;
2114 }
2115
2116 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2117                         bool force_nonblock)
2118 {
2119         struct io_async_ctx *io;
2120         struct iov_iter iter;
2121         ssize_t ret;
2122
2123         ret = io_prep_rw(req, sqe, force_nonblock);
2124         if (ret)
2125                 return ret;
2126
2127         if (unlikely(!(req->file->f_mode & FMODE_READ)))
2128                 return -EBADF;
2129
2130         if (!req->io)
2131                 return 0;
2132
2133         io = req->io;
2134         io->rw.iov = io->rw.fast_iov;
2135         req->io = NULL;
2136         ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
2137         req->io = io;
2138         if (ret < 0)
2139                 return ret;
2140
2141         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2142         return 0;
2143 }
2144
2145 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
2146                    bool force_nonblock)
2147 {
2148         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2149         struct kiocb *kiocb = &req->rw.kiocb;
2150         struct iov_iter iter;
2151         size_t iov_count;
2152         ssize_t io_size, ret;
2153
2154         ret = io_import_iovec(READ, req, &iovec, &iter);
2155         if (ret < 0)
2156                 return ret;
2157
2158         /* Ensure we clear previously set non-block flag */
2159         if (!force_nonblock)
2160                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2161
2162         req->result = 0;
2163         io_size = ret;
2164         if (req->flags & REQ_F_LINK)
2165                 req->result = io_size;
2166
2167         /*
2168          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2169          * we know to async punt it even if it was opened O_NONBLOCK
2170          */
2171         if (force_nonblock && !io_file_supports_async(req->file)) {
2172                 req->flags |= REQ_F_MUST_PUNT;
2173                 goto copy_iov;
2174         }
2175
2176         iov_count = iov_iter_count(&iter);
2177         ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
2178         if (!ret) {
2179                 ssize_t ret2;
2180
2181                 if (req->file->f_op->read_iter)
2182                         ret2 = call_read_iter(req->file, kiocb, &iter);
2183                 else
2184                         ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
2185
2186                 /* Catch -EAGAIN return for forced non-blocking submission */
2187                 if (!force_nonblock || ret2 != -EAGAIN) {
2188                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2189                 } else {
2190 copy_iov:
2191                         ret = io_setup_async_rw(req, io_size, iovec,
2192                                                 inline_vecs, &iter);
2193                         if (ret)
2194                                 goto out_free;
2195                         return -EAGAIN;
2196                 }
2197         }
2198 out_free:
2199         if (!io_wq_current_is_worker())
2200                 kfree(iovec);
2201         return ret;
2202 }
2203
2204 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2205                          bool force_nonblock)
2206 {
2207         struct io_async_ctx *io;
2208         struct iov_iter iter;
2209         ssize_t ret;
2210
2211         ret = io_prep_rw(req, sqe, force_nonblock);
2212         if (ret)
2213                 return ret;
2214
2215         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
2216                 return -EBADF;
2217
2218         if (!req->io)
2219                 return 0;
2220
2221         io = req->io;
2222         io->rw.iov = io->rw.fast_iov;
2223         req->io = NULL;
2224         ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
2225         req->io = io;
2226         if (ret < 0)
2227                 return ret;
2228
2229         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2230         return 0;
2231 }
2232
2233 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
2234                     bool force_nonblock)
2235 {
2236         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2237         struct kiocb *kiocb = &req->rw.kiocb;
2238         struct iov_iter iter;
2239         size_t iov_count;
2240         ssize_t ret, io_size;
2241
2242         ret = io_import_iovec(WRITE, req, &iovec, &iter);
2243         if (ret < 0)
2244                 return ret;
2245
2246         /* Ensure we clear previously set non-block flag */
2247         if (!force_nonblock)
2248                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2249
2250         req->result = 0;
2251         io_size = ret;
2252         if (req->flags & REQ_F_LINK)
2253                 req->result = io_size;
2254
2255         /*
2256          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2257          * we know to async punt it even if it was opened O_NONBLOCK
2258          */
2259         if (force_nonblock && !io_file_supports_async(req->file)) {
2260                 req->flags |= REQ_F_MUST_PUNT;
2261                 goto copy_iov;
2262         }
2263
2264         /* file path doesn't support NOWAIT for non-direct_IO */
2265         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
2266             (req->flags & REQ_F_ISREG))
2267                 goto copy_iov;
2268
2269         iov_count = iov_iter_count(&iter);
2270         ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
2271         if (!ret) {
2272                 ssize_t ret2;
2273
2274                 /*
2275                  * Open-code file_start_write here to grab freeze protection,
2276                  * which will be released by another thread in
2277                  * io_complete_rw().  Fool lockdep by telling it the lock got
2278                  * released so that it doesn't complain about the held lock when
2279                  * we return to userspace.
2280                  */
2281                 if (req->flags & REQ_F_ISREG) {
2282                         __sb_start_write(file_inode(req->file)->i_sb,
2283                                                 SB_FREEZE_WRITE, true);
2284                         __sb_writers_release(file_inode(req->file)->i_sb,
2285                                                 SB_FREEZE_WRITE);
2286                 }
2287                 kiocb->ki_flags |= IOCB_WRITE;
2288
2289                 if (req->file->f_op->write_iter)
2290                         ret2 = call_write_iter(req->file, kiocb, &iter);
2291                 else
2292                         ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
2293                 if (!force_nonblock || ret2 != -EAGAIN) {
2294                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2295                 } else {
2296 copy_iov:
2297                         ret = io_setup_async_rw(req, io_size, iovec,
2298                                                 inline_vecs, &iter);
2299                         if (ret)
2300                                 goto out_free;
2301                         return -EAGAIN;
2302                 }
2303         }
2304 out_free:
2305         if (!io_wq_current_is_worker())
2306                 kfree(iovec);
2307         return ret;
2308 }
2309
2310 /*
2311  * IORING_OP_NOP just posts a completion event, nothing else.
2312  */
2313 static int io_nop(struct io_kiocb *req)
2314 {
2315         struct io_ring_ctx *ctx = req->ctx;
2316
2317         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2318                 return -EINVAL;
2319
2320         io_cqring_add_event(req, 0);
2321         io_put_req(req);
2322         return 0;
2323 }
2324
2325 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2326 {
2327         struct io_ring_ctx *ctx = req->ctx;
2328
2329         if (!req->file)
2330                 return -EBADF;
2331
2332         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2333                 return -EINVAL;
2334         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2335                 return -EINVAL;
2336
2337         req->sync.flags = READ_ONCE(sqe->fsync_flags);
2338         if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
2339                 return -EINVAL;
2340
2341         req->sync.off = READ_ONCE(sqe->off);
2342         req->sync.len = READ_ONCE(sqe->len);
2343         return 0;
2344 }
2345
2346 static bool io_req_cancelled(struct io_kiocb *req)
2347 {
2348         if (req->work.flags & IO_WQ_WORK_CANCEL) {
2349                 req_set_fail_links(req);
2350                 io_cqring_add_event(req, -ECANCELED);
2351                 io_put_req(req);
2352                 return true;
2353         }
2354
2355         return false;
2356 }
2357
2358 static void io_link_work_cb(struct io_wq_work **workptr)
2359 {
2360         struct io_wq_work *work = *workptr;
2361         struct io_kiocb *link = work->data;
2362
2363         io_queue_linked_timeout(link);
2364         work->func = io_wq_submit_work;
2365 }
2366
2367 static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
2368 {
2369         struct io_kiocb *link;
2370
2371         io_prep_async_work(nxt, &link);
2372         *workptr = &nxt->work;
2373         if (link) {
2374                 nxt->work.flags |= IO_WQ_WORK_CB;
2375                 nxt->work.func = io_link_work_cb;
2376                 nxt->work.data = link;
2377         }
2378 }
2379
2380 static void io_fsync_finish(struct io_wq_work **workptr)
2381 {
2382         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2383         loff_t end = req->sync.off + req->sync.len;
2384         struct io_kiocb *nxt = NULL;
2385         int ret;
2386
2387         if (io_req_cancelled(req))
2388                 return;
2389
2390         ret = vfs_fsync_range(req->file, req->sync.off,
2391                                 end > 0 ? end : LLONG_MAX,
2392                                 req->sync.flags & IORING_FSYNC_DATASYNC);
2393         if (ret < 0)
2394                 req_set_fail_links(req);
2395         io_cqring_add_event(req, ret);
2396         io_put_req_find_next(req, &nxt);
2397         if (nxt)
2398                 io_wq_assign_next(workptr, nxt);
2399 }
2400
2401 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2402                     bool force_nonblock)
2403 {
2404         struct io_wq_work *work, *old_work;
2405
2406         /* fsync always requires a blocking context */
2407         if (force_nonblock) {
2408                 io_put_req(req);
2409                 req->work.func = io_fsync_finish;
2410                 return -EAGAIN;
2411         }
2412
2413         work = old_work = &req->work;
2414         io_fsync_finish(&work);
2415         if (work && work != old_work)
2416                 *nxt = container_of(work, struct io_kiocb, work);
2417         return 0;
2418 }
2419
2420 static void io_fallocate_finish(struct io_wq_work **workptr)
2421 {
2422         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2423         struct io_kiocb *nxt = NULL;
2424         int ret;
2425
2426         ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off,
2427                                 req->sync.len);
2428         if (ret < 0)
2429                 req_set_fail_links(req);
2430         io_cqring_add_event(req, ret);
2431         io_put_req_find_next(req, &nxt);
2432         if (nxt)
2433                 io_wq_assign_next(workptr, nxt);
2434 }
2435
2436 static int io_fallocate_prep(struct io_kiocb *req,
2437                              const struct io_uring_sqe *sqe)
2438 {
2439         if (sqe->ioprio || sqe->buf_index || sqe->rw_flags)
2440                 return -EINVAL;
2441
2442         req->sync.off = READ_ONCE(sqe->off);
2443         req->sync.len = READ_ONCE(sqe->addr);
2444         req->sync.mode = READ_ONCE(sqe->len);
2445         return 0;
2446 }
2447
2448 static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt,
2449                         bool force_nonblock)
2450 {
2451         struct io_wq_work *work, *old_work;
2452
2453         /* fallocate always requiring blocking context */
2454         if (force_nonblock) {
2455                 io_put_req(req);
2456                 req->work.func = io_fallocate_finish;
2457                 return -EAGAIN;
2458         }
2459
2460         work = old_work = &req->work;
2461         io_fallocate_finish(&work);
2462         if (work && work != old_work)
2463                 *nxt = container_of(work, struct io_kiocb, work);
2464
2465         return 0;
2466 }
2467
2468 static int io_openat_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2469 {
2470         const char __user *fname;
2471         int ret;
2472
2473         if (sqe->ioprio || sqe->buf_index)
2474                 return -EINVAL;
2475
2476         req->open.dfd = READ_ONCE(sqe->fd);
2477         req->open.how.mode = READ_ONCE(sqe->len);
2478         fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2479         req->open.how.flags = READ_ONCE(sqe->open_flags);
2480
2481         req->open.filename = getname(fname);
2482         if (IS_ERR(req->open.filename)) {
2483                 ret = PTR_ERR(req->open.filename);
2484                 req->open.filename = NULL;
2485                 return ret;
2486         }
2487
2488         return 0;
2489 }
2490
2491 static int io_openat2_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2492 {
2493         struct open_how __user *how;
2494         const char __user *fname;
2495         size_t len;
2496         int ret;
2497
2498         if (sqe->ioprio || sqe->buf_index)
2499                 return -EINVAL;
2500
2501         req->open.dfd = READ_ONCE(sqe->fd);
2502         fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2503         how = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2504         len = READ_ONCE(sqe->len);
2505
2506         if (len < OPEN_HOW_SIZE_VER0)
2507                 return -EINVAL;
2508
2509         ret = copy_struct_from_user(&req->open.how, sizeof(req->open.how), how,
2510                                         len);
2511         if (ret)
2512                 return ret;
2513
2514         if (!(req->open.how.flags & O_PATH) && force_o_largefile())
2515                 req->open.how.flags |= O_LARGEFILE;
2516
2517         req->open.filename = getname(fname);
2518         if (IS_ERR(req->open.filename)) {
2519                 ret = PTR_ERR(req->open.filename);
2520                 req->open.filename = NULL;
2521                 return ret;
2522         }
2523
2524         return 0;
2525 }
2526
2527 static int io_openat2(struct io_kiocb *req, struct io_kiocb **nxt,
2528                       bool force_nonblock)
2529 {
2530         struct open_flags op;
2531         struct file *file;
2532         int ret;
2533
2534         if (force_nonblock) {
2535                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2536                 return -EAGAIN;
2537         }
2538
2539         ret = build_open_flags(&req->open.how, &op);
2540         if (ret)
2541                 goto err;
2542
2543         ret = get_unused_fd_flags(req->open.how.flags);
2544         if (ret < 0)
2545                 goto err;
2546
2547         file = do_filp_open(req->open.dfd, req->open.filename, &op);
2548         if (IS_ERR(file)) {
2549                 put_unused_fd(ret);
2550                 ret = PTR_ERR(file);
2551         } else {
2552                 fsnotify_open(file);
2553                 fd_install(ret, file);
2554         }
2555 err:
2556         putname(req->open.filename);
2557         if (ret < 0)
2558                 req_set_fail_links(req);
2559         io_cqring_add_event(req, ret);
2560         io_put_req_find_next(req, nxt);
2561         return 0;
2562 }
2563
2564 static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt,
2565                      bool force_nonblock)
2566 {
2567         req->open.how = build_open_how(req->open.how.flags, req->open.how.mode);
2568         return io_openat2(req, nxt, force_nonblock);
2569 }
2570
2571 static int io_madvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2572 {
2573 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2574         if (sqe->ioprio || sqe->buf_index || sqe->off)
2575                 return -EINVAL;
2576
2577         req->madvise.addr = READ_ONCE(sqe->addr);
2578         req->madvise.len = READ_ONCE(sqe->len);
2579         req->madvise.advice = READ_ONCE(sqe->fadvise_advice);
2580         return 0;
2581 #else
2582         return -EOPNOTSUPP;
2583 #endif
2584 }
2585
2586 static int io_madvise(struct io_kiocb *req, struct io_kiocb **nxt,
2587                       bool force_nonblock)
2588 {
2589 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2590         struct io_madvise *ma = &req->madvise;
2591         int ret;
2592
2593         if (force_nonblock)
2594                 return -EAGAIN;
2595
2596         ret = do_madvise(ma->addr, ma->len, ma->advice);
2597         if (ret < 0)
2598                 req_set_fail_links(req);
2599         io_cqring_add_event(req, ret);
2600         io_put_req_find_next(req, nxt);
2601         return 0;
2602 #else
2603         return -EOPNOTSUPP;
2604 #endif
2605 }
2606
2607 static int io_fadvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2608 {
2609         if (sqe->ioprio || sqe->buf_index || sqe->addr)
2610                 return -EINVAL;
2611
2612         req->fadvise.offset = READ_ONCE(sqe->off);
2613         req->fadvise.len = READ_ONCE(sqe->len);
2614         req->fadvise.advice = READ_ONCE(sqe->fadvise_advice);
2615         return 0;
2616 }
2617
2618 static int io_fadvise(struct io_kiocb *req, struct io_kiocb **nxt,
2619                       bool force_nonblock)
2620 {
2621         struct io_fadvise *fa = &req->fadvise;
2622         int ret;
2623
2624         /* DONTNEED may block, others _should_ not */
2625         if (fa->advice == POSIX_FADV_DONTNEED && force_nonblock)
2626                 return -EAGAIN;
2627
2628         ret = vfs_fadvise(req->file, fa->offset, fa->len, fa->advice);
2629         if (ret < 0)
2630                 req_set_fail_links(req);
2631         io_cqring_add_event(req, ret);
2632         io_put_req_find_next(req, nxt);
2633         return 0;
2634 }
2635
2636 static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2637 {
2638         const char __user *fname;
2639         unsigned lookup_flags;
2640         int ret;
2641
2642         if (sqe->ioprio || sqe->buf_index)
2643                 return -EINVAL;
2644
2645         req->open.dfd = READ_ONCE(sqe->fd);
2646         req->open.mask = READ_ONCE(sqe->len);
2647         fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2648         req->open.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2649         req->open.how.flags = READ_ONCE(sqe->statx_flags);
2650
2651         if (vfs_stat_set_lookup_flags(&lookup_flags, req->open.how.flags))
2652                 return -EINVAL;
2653
2654         req->open.filename = getname_flags(fname, lookup_flags, NULL);
2655         if (IS_ERR(req->open.filename)) {
2656                 ret = PTR_ERR(req->open.filename);
2657                 req->open.filename = NULL;
2658                 return ret;
2659         }
2660
2661         return 0;
2662 }
2663
2664 static int io_statx(struct io_kiocb *req, struct io_kiocb **nxt,
2665                     bool force_nonblock)
2666 {
2667         struct io_open *ctx = &req->open;
2668         unsigned lookup_flags;
2669         struct path path;
2670         struct kstat stat;
2671         int ret;
2672
2673         if (force_nonblock)
2674                 return -EAGAIN;
2675
2676         if (vfs_stat_set_lookup_flags(&lookup_flags, ctx->how.flags))
2677                 return -EINVAL;
2678
2679 retry:
2680         /* filename_lookup() drops it, keep a reference */
2681         ctx->filename->refcnt++;
2682
2683         ret = filename_lookup(ctx->dfd, ctx->filename, lookup_flags, &path,
2684                                 NULL);
2685         if (ret)
2686                 goto err;
2687
2688         ret = vfs_getattr(&path, &stat, ctx->mask, ctx->how.flags);
2689         path_put(&path);
2690         if (retry_estale(ret, lookup_flags)) {
2691                 lookup_flags |= LOOKUP_REVAL;
2692                 goto retry;
2693         }
2694         if (!ret)
2695                 ret = cp_statx(&stat, ctx->buffer);
2696 err:
2697         putname(ctx->filename);
2698         if (ret < 0)
2699                 req_set_fail_links(req);
2700         io_cqring_add_event(req, ret);
2701         io_put_req_find_next(req, nxt);
2702         return 0;
2703 }
2704
2705 static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2706 {
2707         /*
2708          * If we queue this for async, it must not be cancellable. That would
2709          * leave the 'file' in an undeterminate state.
2710          */
2711         req->work.flags |= IO_WQ_WORK_NO_CANCEL;
2712
2713         if (sqe->ioprio || sqe->off || sqe->addr || sqe->len ||
2714             sqe->rw_flags || sqe->buf_index)
2715                 return -EINVAL;
2716         if (sqe->flags & IOSQE_FIXED_FILE)
2717                 return -EINVAL;
2718
2719         req->close.fd = READ_ONCE(sqe->fd);
2720         if (req->file->f_op == &io_uring_fops ||
2721             req->close.fd == req->ctx->ring_fd)
2722                 return -EBADF;
2723
2724         return 0;
2725 }
2726
2727 static void io_close_finish(struct io_wq_work **workptr)
2728 {
2729         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2730         struct io_kiocb *nxt = NULL;
2731
2732         /* Invoked with files, we need to do the close */
2733         if (req->work.files) {
2734                 int ret;
2735
2736                 ret = filp_close(req->close.put_file, req->work.files);
2737                 if (ret < 0) {
2738                         req_set_fail_links(req);
2739                 }
2740                 io_cqring_add_event(req, ret);
2741         }
2742
2743         fput(req->close.put_file);
2744
2745         /* we bypassed the re-issue, drop the submission reference */
2746         io_put_req(req);
2747         io_put_req_find_next(req, &nxt);
2748         if (nxt)
2749                 io_wq_assign_next(workptr, nxt);
2750 }
2751
2752 static int io_close(struct io_kiocb *req, struct io_kiocb **nxt,
2753                     bool force_nonblock)
2754 {
2755         int ret;
2756
2757         req->close.put_file = NULL;
2758         ret = __close_fd_get_file(req->close.fd, &req->close.put_file);
2759         if (ret < 0)
2760                 return ret;
2761
2762         /* if the file has a flush method, be safe and punt to async */
2763         if (req->close.put_file->f_op->flush && !io_wq_current_is_worker()) {
2764                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2765                 goto eagain;
2766         }
2767
2768         /*
2769          * No ->flush(), safely close from here and just punt the
2770          * fput() to async context.
2771          */
2772         ret = filp_close(req->close.put_file, current->files);
2773
2774         if (ret < 0)
2775                 req_set_fail_links(req);
2776         io_cqring_add_event(req, ret);
2777
2778         if (io_wq_current_is_worker()) {
2779                 struct io_wq_work *old_work, *work;
2780
2781                 old_work = work = &req->work;
2782                 io_close_finish(&work);
2783                 if (work && work != old_work)
2784                         *nxt = container_of(work, struct io_kiocb, work);
2785                 return 0;
2786         }
2787
2788 eagain:
2789         req->work.func = io_close_finish;
2790         return -EAGAIN;
2791 }
2792
2793 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2794 {
2795         struct io_ring_ctx *ctx = req->ctx;
2796
2797         if (!req->file)
2798                 return -EBADF;
2799
2800         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2801                 return -EINVAL;
2802         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2803                 return -EINVAL;
2804
2805         req->sync.off = READ_ONCE(sqe->off);
2806         req->sync.len = READ_ONCE(sqe->len);
2807         req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2808         return 0;
2809 }
2810
2811 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2812 {
2813         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2814         struct io_kiocb *nxt = NULL;
2815         int ret;
2816
2817         if (io_req_cancelled(req))
2818                 return;
2819
2820         ret = sync_file_range(req->file, req->sync.off, req->sync.len,
2821                                 req->sync.flags);
2822         if (ret < 0)
2823                 req_set_fail_links(req);
2824         io_cqring_add_event(req, ret);
2825         io_put_req_find_next(req, &nxt);
2826         if (nxt)
2827                 io_wq_assign_next(workptr, nxt);
2828 }
2829
2830 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2831                               bool force_nonblock)
2832 {
2833         struct io_wq_work *work, *old_work;
2834
2835         /* sync_file_range always requires a blocking context */
2836         if (force_nonblock) {
2837                 io_put_req(req);
2838                 req->work.func = io_sync_file_range_finish;
2839                 return -EAGAIN;
2840         }
2841
2842         work = old_work = &req->work;
2843         io_sync_file_range_finish(&work);
2844         if (work && work != old_work)
2845                 *nxt = container_of(work, struct io_kiocb, work);
2846         return 0;
2847 }
2848
2849 #if defined(CONFIG_NET)
2850 static void io_sendrecv_async(struct io_wq_work **workptr)
2851 {
2852         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2853         struct iovec *iov = NULL;
2854
2855         if (req->io->rw.iov != req->io->rw.fast_iov)
2856                 iov = req->io->msg.iov;
2857         io_wq_submit_work(workptr);
2858         kfree(iov);
2859 }
2860 #endif
2861
2862 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2863 {
2864 #if defined(CONFIG_NET)
2865         struct io_sr_msg *sr = &req->sr_msg;
2866         struct io_async_ctx *io = req->io;
2867
2868         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2869         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2870         sr->len = READ_ONCE(sqe->len);
2871
2872         if (!io || req->opcode == IORING_OP_SEND)
2873                 return 0;
2874
2875         io->msg.iov = io->msg.fast_iov;
2876         return sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2877                                         &io->msg.iov);
2878 #else
2879         return -EOPNOTSUPP;
2880 #endif
2881 }
2882
2883 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2884                       bool force_nonblock)
2885 {
2886 #if defined(CONFIG_NET)
2887         struct io_async_msghdr *kmsg = NULL;
2888         struct socket *sock;
2889         int ret;
2890
2891         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2892                 return -EINVAL;
2893
2894         sock = sock_from_file(req->file, &ret);
2895         if (sock) {
2896                 struct io_async_ctx io;
2897                 struct sockaddr_storage addr;
2898                 unsigned flags;
2899
2900                 if (req->io) {
2901                         kmsg = &req->io->msg;
2902                         kmsg->msg.msg_name = &addr;
2903                         /* if iov is set, it's allocated already */
2904                         if (!kmsg->iov)
2905                                 kmsg->iov = kmsg->fast_iov;
2906                         kmsg->msg.msg_iter.iov = kmsg->iov;
2907                 } else {
2908                         struct io_sr_msg *sr = &req->sr_msg;
2909
2910                         kmsg = &io.msg;
2911                         kmsg->msg.msg_name = &addr;
2912
2913                         io.msg.iov = io.msg.fast_iov;
2914                         ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
2915                                         sr->msg_flags, &io.msg.iov);
2916                         if (ret)
2917                                 return ret;
2918                 }
2919
2920                 flags = req->sr_msg.msg_flags;
2921                 if (flags & MSG_DONTWAIT)
2922                         req->flags |= REQ_F_NOWAIT;
2923                 else if (force_nonblock)
2924                         flags |= MSG_DONTWAIT;
2925
2926                 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2927                 if (force_nonblock && ret == -EAGAIN) {
2928                         if (req->io)
2929                                 return -EAGAIN;
2930                         if (io_alloc_async_ctx(req))
2931                                 return -ENOMEM;
2932                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2933                         req->work.func = io_sendrecv_async;
2934                         return -EAGAIN;
2935                 }
2936                 if (ret == -ERESTARTSYS)
2937                         ret = -EINTR;
2938         }
2939
2940         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2941                 kfree(kmsg->iov);
2942         io_cqring_add_event(req, ret);
2943         if (ret < 0)
2944                 req_set_fail_links(req);
2945         io_put_req_find_next(req, nxt);
2946         return 0;
2947 #else
2948         return -EOPNOTSUPP;
2949 #endif
2950 }
2951
2952 static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
2953                    bool force_nonblock)
2954 {
2955 #if defined(CONFIG_NET)
2956         struct socket *sock;
2957         int ret;
2958
2959         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2960                 return -EINVAL;
2961
2962         sock = sock_from_file(req->file, &ret);
2963         if (sock) {
2964                 struct io_sr_msg *sr = &req->sr_msg;
2965                 struct msghdr msg;
2966                 struct iovec iov;
2967                 unsigned flags;
2968
2969                 ret = import_single_range(WRITE, sr->buf, sr->len, &iov,
2970                                                 &msg.msg_iter);
2971                 if (ret)
2972                         return ret;
2973
2974                 msg.msg_name = NULL;
2975                 msg.msg_control = NULL;
2976                 msg.msg_controllen = 0;
2977                 msg.msg_namelen = 0;
2978
2979                 flags = req->sr_msg.msg_flags;
2980                 if (flags & MSG_DONTWAIT)
2981                         req->flags |= REQ_F_NOWAIT;
2982                 else if (force_nonblock)
2983                         flags |= MSG_DONTWAIT;
2984
2985                 ret = __sys_sendmsg_sock(sock, &msg, flags);
2986                 if (force_nonblock && ret == -EAGAIN)
2987                         return -EAGAIN;
2988                 if (ret == -ERESTARTSYS)
2989                         ret = -EINTR;
2990         }
2991
2992         io_cqring_add_event(req, ret);
2993         if (ret < 0)
2994                 req_set_fail_links(req);
2995         io_put_req_find_next(req, nxt);
2996         return 0;
2997 #else
2998         return -EOPNOTSUPP;
2999 #endif
3000 }
3001
3002 static int io_recvmsg_prep(struct io_kiocb *req,
3003                            const struct io_uring_sqe *sqe)
3004 {
3005 #if defined(CONFIG_NET)
3006         struct io_sr_msg *sr = &req->sr_msg;
3007         struct io_async_ctx *io = req->io;
3008
3009         sr->msg_flags = READ_ONCE(sqe->msg_flags);
3010         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
3011
3012         if (!io || req->opcode == IORING_OP_RECV)
3013                 return 0;
3014
3015         io->msg.iov = io->msg.fast_iov;
3016         return recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
3017                                         &io->msg.uaddr, &io->msg.iov);
3018 #else
3019         return -EOPNOTSUPP;
3020 #endif
3021 }
3022
3023 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
3024                       bool force_nonblock)
3025 {
3026 #if defined(CONFIG_NET)
3027         struct io_async_msghdr *kmsg = NULL;
3028         struct socket *sock;
3029         int ret;
3030
3031         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3032                 return -EINVAL;
3033
3034         sock = sock_from_file(req->file, &ret);
3035         if (sock) {
3036                 struct io_async_ctx io;
3037                 struct sockaddr_storage addr;
3038                 unsigned flags;
3039
3040                 if (req->io) {
3041                         kmsg = &req->io->msg;
3042                         kmsg->msg.msg_name = &addr;
3043                         /* if iov is set, it's allocated already */
3044                         if (!kmsg->iov)
3045                                 kmsg->iov = kmsg->fast_iov;
3046                         kmsg->msg.msg_iter.iov = kmsg->iov;
3047                 } else {
3048                         struct io_sr_msg *sr = &req->sr_msg;
3049
3050                         kmsg = &io.msg;
3051                         kmsg->msg.msg_name = &addr;
3052
3053                         io.msg.iov = io.msg.fast_iov;
3054                         ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg,
3055                                         sr->msg_flags, &io.msg.uaddr,
3056                                         &io.msg.iov);
3057                         if (ret)
3058                                 return ret;
3059                 }
3060
3061                 flags = req->sr_msg.msg_flags;
3062                 if (flags & MSG_DONTWAIT)
3063                         req->flags |= REQ_F_NOWAIT;
3064                 else if (force_nonblock)
3065                         flags |= MSG_DONTWAIT;
3066
3067                 ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
3068                                                 kmsg->uaddr, flags);
3069                 if (force_nonblock && ret == -EAGAIN) {
3070                         if (req->io)
3071                                 return -EAGAIN;
3072                         if (io_alloc_async_ctx(req))
3073                                 return -ENOMEM;
3074                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
3075                         req->work.func = io_sendrecv_async;
3076                         return -EAGAIN;
3077                 }
3078                 if (ret == -ERESTARTSYS)
3079                         ret = -EINTR;
3080         }
3081
3082         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
3083                 kfree(kmsg->iov);
3084         io_cqring_add_event(req, ret);
3085         if (ret < 0)
3086                 req_set_fail_links(req);
3087         io_put_req_find_next(req, nxt);
3088         return 0;
3089 #else
3090         return -EOPNOTSUPP;
3091 #endif
3092 }
3093
3094 static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt,
3095                    bool force_nonblock)
3096 {
3097 #if defined(CONFIG_NET)
3098         struct socket *sock;
3099         int ret;
3100
3101         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3102                 return -EINVAL;
3103
3104         sock = sock_from_file(req->file, &ret);
3105         if (sock) {
3106                 struct io_sr_msg *sr = &req->sr_msg;
3107                 struct msghdr msg;
3108                 struct iovec iov;
3109                 unsigned flags;
3110
3111                 ret = import_single_range(READ, sr->buf, sr->len, &iov,
3112                                                 &msg.msg_iter);
3113                 if (ret)
3114                         return ret;
3115
3116                 msg.msg_name = NULL;
3117                 msg.msg_control = NULL;
3118                 msg.msg_controllen = 0;
3119                 msg.msg_namelen = 0;
3120                 msg.msg_iocb = NULL;
3121                 msg.msg_flags = 0;
3122
3123                 flags = req->sr_msg.msg_flags;
3124                 if (flags & MSG_DONTWAIT)
3125                         req->flags |= REQ_F_NOWAIT;
3126                 else if (force_nonblock)
3127                         flags |= MSG_DONTWAIT;
3128
3129                 ret = __sys_recvmsg_sock(sock, &msg, NULL, NULL, flags);
3130                 if (force_nonblock && ret == -EAGAIN)
3131                         return -EAGAIN;
3132                 if (ret == -ERESTARTSYS)
3133                         ret = -EINTR;
3134         }
3135
3136         io_cqring_add_event(req, ret);
3137         if (ret < 0)
3138                 req_set_fail_links(req);
3139         io_put_req_find_next(req, nxt);
3140         return 0;
3141 #else
3142         return -EOPNOTSUPP;
3143 #endif
3144 }
3145
3146
3147 static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3148 {
3149 #if defined(CONFIG_NET)
3150         struct io_accept *accept = &req->accept;
3151
3152         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3153                 return -EINVAL;
3154         if (sqe->ioprio || sqe->len || sqe->buf_index)
3155                 return -EINVAL;
3156
3157         accept->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3158         accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2));
3159         accept->flags = READ_ONCE(sqe->accept_flags);
3160         return 0;
3161 #else
3162         return -EOPNOTSUPP;
3163 #endif
3164 }
3165
3166 #if defined(CONFIG_NET)
3167 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
3168                        bool force_nonblock)
3169 {
3170         struct io_accept *accept = &req->accept;
3171         unsigned file_flags;
3172         int ret;
3173
3174         file_flags = force_nonblock ? O_NONBLOCK : 0;
3175         ret = __sys_accept4_file(req->file, file_flags, accept->addr,
3176                                         accept->addr_len, accept->flags);
3177         if (ret == -EAGAIN && force_nonblock)
3178                 return -EAGAIN;
3179         if (ret == -ERESTARTSYS)
3180                 ret = -EINTR;
3181         if (ret < 0)
3182                 req_set_fail_links(req);
3183         io_cqring_add_event(req, ret);
3184         io_put_req_find_next(req, nxt);
3185         return 0;
3186 }
3187
3188 static void io_accept_finish(struct io_wq_work **workptr)
3189 {
3190         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3191         struct io_kiocb *nxt = NULL;
3192
3193         if (io_req_cancelled(req))
3194                 return;
3195         __io_accept(req, &nxt, false);
3196         if (nxt)
3197                 io_wq_assign_next(workptr, nxt);
3198 }
3199 #endif
3200
3201 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
3202                      bool force_nonblock)
3203 {
3204 #if defined(CONFIG_NET)
3205         int ret;
3206
3207         ret = __io_accept(req, nxt, force_nonblock);
3208         if (ret == -EAGAIN && force_nonblock) {
3209                 req->work.func = io_accept_finish;
3210                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3211                 io_put_req(req);
3212                 return -EAGAIN;
3213         }
3214         return 0;
3215 #else
3216         return -EOPNOTSUPP;
3217 #endif
3218 }
3219
3220 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3221 {
3222 #if defined(CONFIG_NET)
3223         struct io_connect *conn = &req->connect;
3224         struct io_async_ctx *io = req->io;
3225
3226         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3227                 return -EINVAL;
3228         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
3229                 return -EINVAL;
3230
3231         conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3232         conn->addr_len =  READ_ONCE(sqe->addr2);
3233
3234         if (!io)
3235                 return 0;
3236
3237         return move_addr_to_kernel(conn->addr, conn->addr_len,
3238                                         &io->connect.address);
3239 #else
3240         return -EOPNOTSUPP;
3241 #endif
3242 }
3243
3244 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
3245                       bool force_nonblock)
3246 {
3247 #if defined(CONFIG_NET)
3248         struct io_async_ctx __io, *io;
3249         unsigned file_flags;
3250         int ret;
3251
3252         if (req->io) {
3253                 io = req->io;
3254         } else {
3255                 ret = move_addr_to_kernel(req->connect.addr,
3256                                                 req->connect.addr_len,
3257                                                 &__io.connect.address);
3258                 if (ret)
3259                         goto out;
3260                 io = &__io;
3261         }
3262
3263         file_flags = force_nonblock ? O_NONBLOCK : 0;
3264
3265         ret = __sys_connect_file(req->file, &io->connect.address,
3266                                         req->connect.addr_len, file_flags);
3267         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
3268                 if (req->io)
3269                         return -EAGAIN;
3270                 if (io_alloc_async_ctx(req)) {
3271                         ret = -ENOMEM;
3272                         goto out;
3273                 }
3274                 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
3275                 return -EAGAIN;
3276         }
3277         if (ret == -ERESTARTSYS)
3278                 ret = -EINTR;
3279 out:
3280         if (ret < 0)
3281                 req_set_fail_links(req);
3282         io_cqring_add_event(req, ret);
3283         io_put_req_find_next(req, nxt);
3284         return 0;
3285 #else
3286         return -EOPNOTSUPP;
3287 #endif
3288 }
3289
3290 static void io_poll_remove_one(struct io_kiocb *req)
3291 {
3292         struct io_poll_iocb *poll = &req->poll;
3293
3294         spin_lock(&poll->head->lock);
3295         WRITE_ONCE(poll->canceled, true);
3296         if (!list_empty(&poll->wait.entry)) {
3297                 list_del_init(&poll->wait.entry);
3298                 io_queue_async_work(req);
3299         }
3300         spin_unlock(&poll->head->lock);
3301         hash_del(&req->hash_node);
3302 }
3303
3304 static void io_poll_remove_all(struct io_ring_ctx *ctx)
3305 {
3306         struct hlist_node *tmp;
3307         struct io_kiocb *req;
3308         int i;
3309
3310         spin_lock_irq(&ctx->completion_lock);
3311         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
3312                 struct hlist_head *list;
3313
3314                 list = &ctx->cancel_hash[i];
3315                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
3316                         io_poll_remove_one(req);
3317         }
3318         spin_unlock_irq(&ctx->completion_lock);
3319 }
3320
3321 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
3322 {
3323         struct hlist_head *list;
3324         struct io_kiocb *req;
3325
3326         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
3327         hlist_for_each_entry(req, list, hash_node) {
3328                 if (sqe_addr == req->user_data) {
3329                         io_poll_remove_one(req);
3330                         return 0;
3331                 }
3332         }
3333
3334         return -ENOENT;
3335 }
3336
3337 static int io_poll_remove_prep(struct io_kiocb *req,
3338                                const struct io_uring_sqe *sqe)
3339 {
3340         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3341                 return -EINVAL;
3342         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
3343             sqe->poll_events)
3344                 return -EINVAL;
3345
3346         req->poll.addr = READ_ONCE(sqe->addr);
3347         return 0;
3348 }
3349
3350 /*
3351  * Find a running poll command that matches one specified in sqe->addr,
3352  * and remove it if found.
3353  */
3354 static int io_poll_remove(struct io_kiocb *req)
3355 {
3356         struct io_ring_ctx *ctx = req->ctx;
3357         u64 addr;
3358         int ret;
3359
3360         addr = req->poll.addr;
3361         spin_lock_irq(&ctx->completion_lock);
3362         ret = io_poll_cancel(ctx, addr);
3363         spin_unlock_irq(&ctx->completion_lock);
3364
3365         io_cqring_add_event(req, ret);
3366         if (ret < 0)
3367                 req_set_fail_links(req);
3368         io_put_req(req);
3369         return 0;
3370 }
3371
3372 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
3373 {
3374         struct io_ring_ctx *ctx = req->ctx;
3375
3376         req->poll.done = true;
3377         if (error)
3378                 io_cqring_fill_event(req, error);
3379         else
3380                 io_cqring_fill_event(req, mangle_poll(mask));
3381         io_commit_cqring(ctx);
3382 }
3383
3384 static void io_poll_complete_work(struct io_wq_work **workptr)
3385 {
3386         struct io_wq_work *work = *workptr;
3387         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3388         struct io_poll_iocb *poll = &req->poll;
3389         struct poll_table_struct pt = { ._key = poll->events };
3390         struct io_ring_ctx *ctx = req->ctx;
3391         struct io_kiocb *nxt = NULL;
3392         __poll_t mask = 0;
3393         int ret = 0;
3394
3395         if (work->flags & IO_WQ_WORK_CANCEL) {
3396                 WRITE_ONCE(poll->canceled, true);
3397                 ret = -ECANCELED;
3398         } else if (READ_ONCE(poll->canceled)) {
3399                 ret = -ECANCELED;
3400         }
3401
3402         if (ret != -ECANCELED)
3403                 mask = vfs_poll(poll->file, &pt) & poll->events;
3404
3405         /*
3406          * Note that ->ki_cancel callers also delete iocb from active_reqs after
3407          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
3408          * synchronize with them.  In the cancellation case the list_del_init
3409          * itself is not actually needed, but harmless so we keep it in to
3410          * avoid further branches in the fast path.
3411          */
3412         spin_lock_irq(&ctx->completion_lock);
3413         if (!mask && ret != -ECANCELED) {
3414                 add_wait_queue(poll->head, &poll->wait);
3415                 spin_unlock_irq(&ctx->completion_lock);
3416                 return;
3417         }
3418         hash_del(&req->hash_node);
3419         io_poll_complete(req, mask, ret);
3420         spin_unlock_irq(&ctx->completion_lock);
3421
3422         io_cqring_ev_posted(ctx);
3423
3424         if (ret < 0)
3425                 req_set_fail_links(req);
3426         io_put_req_find_next(req, &nxt);
3427         if (nxt)
3428                 io_wq_assign_next(workptr, nxt);
3429 }
3430
3431 static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
3432 {
3433         struct io_kiocb *req, *tmp;
3434         struct req_batch rb;
3435
3436         rb.to_free = rb.need_iter = 0;
3437         spin_lock_irq(&ctx->completion_lock);
3438         llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
3439                 hash_del(&req->hash_node);
3440                 io_poll_complete(req, req->result, 0);
3441
3442                 if (refcount_dec_and_test(&req->refs) &&
3443                     !io_req_multi_free(&rb, req)) {
3444                         req->flags |= REQ_F_COMP_LOCKED;
3445                         io_free_req(req);
3446                 }
3447         }
3448         spin_unlock_irq(&ctx->completion_lock);
3449
3450         io_cqring_ev_posted(ctx);
3451         io_free_req_many(ctx, &rb);
3452 }
3453
3454 static void io_poll_flush(struct io_wq_work **workptr)
3455 {
3456         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3457         struct llist_node *nodes;
3458
3459         nodes = llist_del_all(&req->ctx->poll_llist);
3460         if (nodes)
3461                 __io_poll_flush(req->ctx, nodes);
3462 }
3463
3464 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
3465                         void *key)
3466 {
3467         struct io_poll_iocb *poll = wait->private;
3468         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
3469         struct io_ring_ctx *ctx = req->ctx;
3470         __poll_t mask = key_to_poll(key);
3471
3472         /* for instances that support it check for an event match first: */
3473         if (mask && !(mask & poll->events))
3474                 return 0;
3475
3476         list_del_init(&poll->wait.entry);
3477
3478         /*
3479          * Run completion inline if we can. We're using trylock here because
3480          * we are violating the completion_lock -> poll wq lock ordering.
3481          * If we have a link timeout we're going to need the completion_lock
3482          * for finalizing the request, mark us as having grabbed that already.
3483          */
3484         if (mask) {
3485                 unsigned long flags;
3486
3487                 if (llist_empty(&ctx->poll_llist) &&
3488                     spin_trylock_irqsave(&ctx->completion_lock, flags)) {
3489                         hash_del(&req->hash_node);
3490                         io_poll_complete(req, mask, 0);
3491                         req->flags |= REQ_F_COMP_LOCKED;
3492                         io_put_req(req);
3493                         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3494
3495                         io_cqring_ev_posted(ctx);
3496                         req = NULL;
3497                 } else {
3498                         req->result = mask;
3499                         req->llist_node.next = NULL;
3500                         /* if the list wasn't empty, we're done */
3501                         if (!llist_add(&req->llist_node, &ctx->poll_llist))
3502                                 req = NULL;
3503                         else
3504                                 req->work.func = io_poll_flush;
3505                 }
3506         }
3507         if (req)
3508                 io_queue_async_work(req);
3509
3510         return 1;
3511 }
3512
3513 struct io_poll_table {
3514         struct poll_table_struct pt;
3515         struct io_kiocb *req;
3516         int error;
3517 };
3518
3519 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
3520                                struct poll_table_struct *p)
3521 {
3522         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
3523
3524         if (unlikely(pt->req->poll.head)) {
3525                 pt->error = -EINVAL;
3526                 return;
3527         }
3528
3529         pt->error = 0;
3530         pt->req->poll.head = head;
3531         add_wait_queue(head, &pt->req->poll.wait);
3532 }
3533
3534 static void io_poll_req_insert(struct io_kiocb *req)
3535 {
3536         struct io_ring_ctx *ctx = req->ctx;
3537         struct hlist_head *list;
3538
3539         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
3540         hlist_add_head(&req->hash_node, list);
3541 }
3542
3543 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3544 {
3545         struct io_poll_iocb *poll = &req->poll;
3546         u16 events;
3547
3548         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3549                 return -EINVAL;
3550         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
3551                 return -EINVAL;
3552         if (!poll->file)
3553                 return -EBADF;
3554
3555         events = READ_ONCE(sqe->poll_events);
3556         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
3557         return 0;
3558 }
3559
3560 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
3561 {
3562         struct io_poll_iocb *poll = &req->poll;
3563         struct io_ring_ctx *ctx = req->ctx;
3564         struct io_poll_table ipt;
3565         bool cancel = false;
3566         __poll_t mask;
3567
3568         INIT_IO_WORK(&req->work, io_poll_complete_work);
3569         INIT_HLIST_NODE(&req->hash_node);
3570
3571         poll->head = NULL;
3572         poll->done = false;
3573         poll->canceled = false;
3574
3575         ipt.pt._qproc = io_poll_queue_proc;
3576         ipt.pt._key = poll->events;
3577         ipt.req = req;
3578         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
3579
3580         /* initialized the list so that we can do list_empty checks */
3581         INIT_LIST_HEAD(&poll->wait.entry);
3582         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
3583         poll->wait.private = poll;
3584
3585         INIT_LIST_HEAD(&req->list);
3586
3587         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
3588
3589         spin_lock_irq(&ctx->completion_lock);
3590         if (likely(poll->head)) {
3591                 spin_lock(&poll->head->lock);
3592                 if (unlikely(list_empty(&poll->wait.entry))) {
3593                         if (ipt.error)
3594                                 cancel = true;
3595                         ipt.error = 0;
3596                         mask = 0;
3597                 }
3598                 if (mask || ipt.error)
3599                         list_del_init(&poll->wait.entry);
3600                 else if (cancel)
3601                         WRITE_ONCE(poll->canceled, true);
3602                 else if (!poll->done) /* actually waiting for an event */
3603                         io_poll_req_insert(req);
3604                 spin_unlock(&poll->head->lock);
3605         }
3606         if (mask) { /* no async, we'd stolen it */
3607                 ipt.error = 0;
3608                 io_poll_complete(req, mask, 0);
3609         }
3610         spin_unlock_irq(&ctx->completion_lock);
3611
3612         if (mask) {
3613                 io_cqring_ev_posted(ctx);
3614                 io_put_req_find_next(req, nxt);
3615         }
3616         return ipt.error;
3617 }
3618
3619 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
3620 {
3621         struct io_timeout_data *data = container_of(timer,
3622                                                 struct io_timeout_data, timer);
3623         struct io_kiocb *req = data->req;
3624         struct io_ring_ctx *ctx = req->ctx;
3625         unsigned long flags;
3626
3627         atomic_inc(&ctx->cq_timeouts);
3628
3629         spin_lock_irqsave(&ctx->completion_lock, flags);
3630         /*
3631          * We could be racing with timeout deletion. If the list is empty,
3632          * then timeout lookup already found it and will be handling it.
3633          */
3634         if (!list_empty(&req->list)) {
3635                 struct io_kiocb *prev;
3636
3637                 /*
3638                  * Adjust the reqs sequence before the current one because it
3639                  * will consume a slot in the cq_ring and the cq_tail
3640                  * pointer will be increased, otherwise other timeout reqs may
3641                  * return in advance without waiting for enough wait_nr.
3642                  */
3643                 prev = req;
3644                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
3645                         prev->sequence++;
3646                 list_del_init(&req->list);
3647         }
3648
3649         io_cqring_fill_event(req, -ETIME);
3650         io_commit_cqring(ctx);
3651         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3652
3653         io_cqring_ev_posted(ctx);
3654         req_set_fail_links(req);
3655         io_put_req(req);
3656         return HRTIMER_NORESTART;
3657 }
3658
3659 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
3660 {
3661         struct io_kiocb *req;
3662         int ret = -ENOENT;
3663
3664         list_for_each_entry(req, &ctx->timeout_list, list) {
3665                 if (user_data == req->user_data) {
3666                         list_del_init(&req->list);
3667                         ret = 0;
3668                         break;
3669                 }
3670         }
3671
3672         if (ret == -ENOENT)
3673                 return ret;
3674
3675         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
3676         if (ret == -1)
3677                 return -EALREADY;
3678
3679         req_set_fail_links(req);
3680         io_cqring_fill_event(req, -ECANCELED);
3681         io_put_req(req);
3682         return 0;
3683 }
3684
3685 static int io_timeout_remove_prep(struct io_kiocb *req,
3686                                   const struct io_uring_sqe *sqe)
3687 {
3688         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3689                 return -EINVAL;
3690         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
3691                 return -EINVAL;
3692
3693         req->timeout.addr = READ_ONCE(sqe->addr);
3694         req->timeout.flags = READ_ONCE(sqe->timeout_flags);
3695         if (req->timeout.flags)
3696                 return -EINVAL;
3697
3698         return 0;
3699 }
3700
3701 /*
3702  * Remove or update an existing timeout command
3703  */
3704 static int io_timeout_remove(struct io_kiocb *req)
3705 {
3706         struct io_ring_ctx *ctx = req->ctx;
3707         int ret;
3708
3709         spin_lock_irq(&ctx->completion_lock);
3710         ret = io_timeout_cancel(ctx, req->timeout.addr);
3711
3712         io_cqring_fill_event(req, ret);
3713         io_commit_cqring(ctx);
3714         spin_unlock_irq(&ctx->completion_lock);
3715         io_cqring_ev_posted(ctx);
3716         if (ret < 0)
3717                 req_set_fail_links(req);
3718         io_put_req(req);
3719         return 0;
3720 }
3721
3722 static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3723                            bool is_timeout_link)
3724 {
3725         struct io_timeout_data *data;
3726         unsigned flags;
3727
3728         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3729                 return -EINVAL;
3730         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
3731                 return -EINVAL;
3732         if (sqe->off && is_timeout_link)
3733                 return -EINVAL;
3734         flags = READ_ONCE(sqe->timeout_flags);
3735         if (flags & ~IORING_TIMEOUT_ABS)
3736                 return -EINVAL;
3737
3738         req->timeout.count = READ_ONCE(sqe->off);
3739
3740         if (!req->io && io_alloc_async_ctx(req))
3741                 return -ENOMEM;
3742
3743         data = &req->io->timeout;
3744         data->req = req;
3745         req->flags |= REQ_F_TIMEOUT;
3746
3747         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
3748                 return -EFAULT;
3749
3750         if (flags & IORING_TIMEOUT_ABS)
3751                 data->mode = HRTIMER_MODE_ABS;
3752         else
3753                 data->mode = HRTIMER_MODE_REL;
3754
3755         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
3756         return 0;
3757 }
3758
3759 static int io_timeout(struct io_kiocb *req)
3760 {
3761         unsigned count;
3762         struct io_ring_ctx *ctx = req->ctx;
3763         struct io_timeout_data *data;
3764         struct list_head *entry;
3765         unsigned span = 0;
3766
3767         data = &req->io->timeout;
3768
3769         /*
3770          * sqe->off holds how many events that need to occur for this
3771          * timeout event to be satisfied. If it isn't set, then this is
3772          * a pure timeout request, sequence isn't used.
3773          */
3774         count = req->timeout.count;
3775         if (!count) {
3776                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
3777                 spin_lock_irq(&ctx->completion_lock);
3778                 entry = ctx->timeout_list.prev;
3779                 goto add;
3780         }
3781
3782         req->sequence = ctx->cached_sq_head + count - 1;
3783         data->seq_offset = count;
3784
3785         /*
3786          * Insertion sort, ensuring the first entry in the list is always
3787          * the one we need first.
3788          */
3789         spin_lock_irq(&ctx->completion_lock);
3790         list_for_each_prev(entry, &ctx->timeout_list) {
3791                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
3792                 unsigned nxt_sq_head;
3793                 long long tmp, tmp_nxt;
3794                 u32 nxt_offset = nxt->io->timeout.seq_offset;
3795
3796                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
3797                         continue;
3798
3799                 /*
3800                  * Since cached_sq_head + count - 1 can overflow, use type long
3801                  * long to store it.
3802                  */
3803                 tmp = (long long)ctx->cached_sq_head + count - 1;
3804                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
3805                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
3806
3807                 /*
3808                  * cached_sq_head may overflow, and it will never overflow twice
3809                  * once there is some timeout req still be valid.
3810                  */
3811                 if (ctx->cached_sq_head < nxt_sq_head)
3812                         tmp += UINT_MAX;
3813
3814                 if (tmp > tmp_nxt)
3815                         break;
3816
3817                 /*
3818                  * Sequence of reqs after the insert one and itself should
3819                  * be adjusted because each timeout req consumes a slot.
3820                  */
3821                 span++;
3822                 nxt->sequence++;
3823         }
3824         req->sequence -= span;
3825 add:
3826         list_add(&req->list, entry);
3827         data->timer.function = io_timeout_fn;
3828         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
3829         spin_unlock_irq(&ctx->completion_lock);
3830         return 0;
3831 }
3832
3833 static bool io_cancel_cb(struct io_wq_work *work, void *data)
3834 {
3835         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3836
3837         return req->user_data == (unsigned long) data;
3838 }
3839
3840 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
3841 {
3842         enum io_wq_cancel cancel_ret;
3843         int ret = 0;
3844
3845         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
3846         switch (cancel_ret) {
3847         case IO_WQ_CANCEL_OK:
3848                 ret = 0;
3849                 break;
3850         case IO_WQ_CANCEL_RUNNING:
3851                 ret = -EALREADY;
3852                 break;
3853         case IO_WQ_CANCEL_NOTFOUND:
3854                 ret = -ENOENT;
3855                 break;
3856         }
3857
3858         return ret;
3859 }
3860
3861 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
3862                                      struct io_kiocb *req, __u64 sqe_addr,
3863                                      struct io_kiocb **nxt, int success_ret)
3864 {
3865         unsigned long flags;
3866         int ret;
3867
3868         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
3869         if (ret != -ENOENT) {
3870                 spin_lock_irqsave(&ctx->completion_lock, flags);
3871                 goto done;
3872         }
3873
3874         spin_lock_irqsave(&ctx->completion_lock, flags);
3875         ret = io_timeout_cancel(ctx, sqe_addr);
3876         if (ret != -ENOENT)
3877                 goto done;
3878         ret = io_poll_cancel(ctx, sqe_addr);
3879 done:
3880         if (!ret)
3881                 ret = success_ret;
3882         io_cqring_fill_event(req, ret);
3883         io_commit_cqring(ctx);
3884         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3885         io_cqring_ev_posted(ctx);
3886
3887         if (ret < 0)
3888                 req_set_fail_links(req);
3889         io_put_req_find_next(req, nxt);
3890 }
3891
3892 static int io_async_cancel_prep(struct io_kiocb *req,
3893                                 const struct io_uring_sqe *sqe)
3894 {
3895         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3896                 return -EINVAL;
3897         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3898             sqe->cancel_flags)
3899                 return -EINVAL;
3900
3901         req->cancel.addr = READ_ONCE(sqe->addr);
3902         return 0;
3903 }
3904
3905 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3906 {
3907         struct io_ring_ctx *ctx = req->ctx;
3908
3909         io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0);
3910         return 0;
3911 }
3912
3913 static int io_files_update_prep(struct io_kiocb *req,
3914                                 const struct io_uring_sqe *sqe)
3915 {
3916         if (sqe->flags || sqe->ioprio || sqe->rw_flags)
3917                 return -EINVAL;
3918
3919         req->files_update.offset = READ_ONCE(sqe->off);
3920         req->files_update.nr_args = READ_ONCE(sqe->len);
3921         if (!req->files_update.nr_args)
3922                 return -EINVAL;
3923         req->files_update.arg = READ_ONCE(sqe->addr);
3924         return 0;
3925 }
3926
3927 static int io_files_update(struct io_kiocb *req, bool force_nonblock)
3928 {
3929         struct io_ring_ctx *ctx = req->ctx;
3930         struct io_uring_files_update up;
3931         int ret;
3932
3933         if (force_nonblock) {
3934                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3935                 return -EAGAIN;
3936         }
3937
3938         up.offset = req->files_update.offset;
3939         up.fds = req->files_update.arg;
3940
3941         mutex_lock(&ctx->uring_lock);
3942         ret = __io_sqe_files_update(ctx, &up, req->files_update.nr_args);
3943         mutex_unlock(&ctx->uring_lock);
3944
3945         if (ret < 0)
3946                 req_set_fail_links(req);
3947         io_cqring_add_event(req, ret);
3948         io_put_req(req);
3949         return 0;
3950 }
3951
3952 static int io_req_defer_prep(struct io_kiocb *req,
3953                              const struct io_uring_sqe *sqe)
3954 {
3955         ssize_t ret = 0;
3956
3957         switch (req->opcode) {
3958         case IORING_OP_NOP:
3959                 break;
3960         case IORING_OP_READV:
3961         case IORING_OP_READ_FIXED:
3962         case IORING_OP_READ:
3963                 ret = io_read_prep(req, sqe, true);
3964                 break;
3965         case IORING_OP_WRITEV:
3966         case IORING_OP_WRITE_FIXED:
3967         case IORING_OP_WRITE:
3968                 ret = io_write_prep(req, sqe, true);
3969                 break;
3970         case IORING_OP_POLL_ADD:
3971                 ret = io_poll_add_prep(req, sqe);
3972                 break;
3973         case IORING_OP_POLL_REMOVE:
3974                 ret = io_poll_remove_prep(req, sqe);
3975                 break;
3976         case IORING_OP_FSYNC:
3977                 ret = io_prep_fsync(req, sqe);
3978                 break;
3979         case IORING_OP_SYNC_FILE_RANGE:
3980                 ret = io_prep_sfr(req, sqe);
3981                 break;
3982         case IORING_OP_SENDMSG:
3983         case IORING_OP_SEND:
3984                 ret = io_sendmsg_prep(req, sqe);
3985                 break;
3986         case IORING_OP_RECVMSG:
3987         case IORING_OP_RECV:
3988                 ret = io_recvmsg_prep(req, sqe);
3989                 break;
3990         case IORING_OP_CONNECT:
3991                 ret = io_connect_prep(req, sqe);
3992                 break;
3993         case IORING_OP_TIMEOUT:
3994                 ret = io_timeout_prep(req, sqe, false);
3995                 break;
3996         case IORING_OP_TIMEOUT_REMOVE:
3997                 ret = io_timeout_remove_prep(req, sqe);
3998                 break;
3999         case IORING_OP_ASYNC_CANCEL:
4000                 ret = io_async_cancel_prep(req, sqe);
4001                 break;
4002         case IORING_OP_LINK_TIMEOUT:
4003                 ret = io_timeout_prep(req, sqe, true);
4004                 break;
4005         case IORING_OP_ACCEPT:
4006                 ret = io_accept_prep(req, sqe);
4007                 break;
4008         case IORING_OP_FALLOCATE:
4009                 ret = io_fallocate_prep(req, sqe);
4010                 break;
4011         case IORING_OP_OPENAT:
4012                 ret = io_openat_prep(req, sqe);
4013                 break;
4014         case IORING_OP_CLOSE:
4015                 ret = io_close_prep(req, sqe);
4016                 break;
4017         case IORING_OP_FILES_UPDATE:
4018                 ret = io_files_update_prep(req, sqe);
4019                 break;
4020         case IORING_OP_STATX:
4021                 ret = io_statx_prep(req, sqe);
4022                 break;
4023         case IORING_OP_FADVISE:
4024                 ret = io_fadvise_prep(req, sqe);
4025                 break;
4026         case IORING_OP_MADVISE:
4027                 ret = io_madvise_prep(req, sqe);
4028                 break;
4029         case IORING_OP_OPENAT2:
4030                 ret = io_openat2_prep(req, sqe);
4031                 break;
4032         default:
4033                 printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
4034                                 req->opcode);
4035                 ret = -EINVAL;
4036                 break;
4037         }
4038
4039         return ret;
4040 }
4041
4042 static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4043 {
4044         struct io_ring_ctx *ctx = req->ctx;
4045         int ret;
4046
4047         /* Still need defer if there is pending req in defer list. */
4048         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
4049                 return 0;
4050
4051         if (!req->io && io_alloc_async_ctx(req))
4052                 return -EAGAIN;
4053
4054         ret = io_req_defer_prep(req, sqe);
4055         if (ret < 0)
4056                 return ret;
4057
4058         spin_lock_irq(&ctx->completion_lock);
4059         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
4060                 spin_unlock_irq(&ctx->completion_lock);
4061                 return 0;
4062         }
4063
4064         trace_io_uring_defer(ctx, req, req->user_data);
4065         list_add_tail(&req->list, &ctx->defer_list);
4066         spin_unlock_irq(&ctx->completion_lock);
4067         return -EIOCBQUEUED;
4068 }
4069
4070 static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4071                         struct io_kiocb **nxt, bool force_nonblock)
4072 {
4073         struct io_ring_ctx *ctx = req->ctx;
4074         int ret;
4075
4076         switch (req->opcode) {
4077         case IORING_OP_NOP:
4078                 ret = io_nop(req);
4079                 break;
4080         case IORING_OP_READV:
4081         case IORING_OP_READ_FIXED:
4082         case IORING_OP_READ:
4083                 if (sqe) {
4084                         ret = io_read_prep(req, sqe, force_nonblock);
4085                         if (ret < 0)
4086                                 break;
4087                 }
4088                 ret = io_read(req, nxt, force_nonblock);
4089                 break;
4090         case IORING_OP_WRITEV:
4091         case IORING_OP_WRITE_FIXED:
4092         case IORING_OP_WRITE:
4093                 if (sqe) {
4094                         ret = io_write_prep(req, sqe, force_nonblock);
4095                         if (ret < 0)
4096                                 break;
4097                 }
4098                 ret = io_write(req, nxt, force_nonblock);
4099                 break;
4100         case IORING_OP_FSYNC:
4101                 if (sqe) {
4102                         ret = io_prep_fsync(req, sqe);
4103                         if (ret < 0)
4104                                 break;
4105                 }
4106                 ret = io_fsync(req, nxt, force_nonblock);
4107                 break;
4108         case IORING_OP_POLL_ADD:
4109                 if (sqe) {
4110                         ret = io_poll_add_prep(req, sqe);
4111                         if (ret)
4112                                 break;
4113                 }
4114                 ret = io_poll_add(req, nxt);
4115                 break;
4116         case IORING_OP_POLL_REMOVE:
4117                 if (sqe) {
4118                         ret = io_poll_remove_prep(req, sqe);
4119                         if (ret < 0)
4120                                 break;
4121                 }
4122                 ret = io_poll_remove(req);
4123                 break;
4124         case IORING_OP_SYNC_FILE_RANGE:
4125                 if (sqe) {
4126                         ret = io_prep_sfr(req, sqe);
4127                         if (ret < 0)
4128                                 break;
4129                 }
4130                 ret = io_sync_file_range(req, nxt, force_nonblock);
4131                 break;
4132         case IORING_OP_SENDMSG:
4133         case IORING_OP_SEND:
4134                 if (sqe) {
4135                         ret = io_sendmsg_prep(req, sqe);
4136                         if (ret < 0)
4137                                 break;
4138                 }
4139                 if (req->opcode == IORING_OP_SENDMSG)
4140                         ret = io_sendmsg(req, nxt, force_nonblock);
4141                 else
4142                         ret = io_send(req, nxt, force_nonblock);
4143                 break;
4144         case IORING_OP_RECVMSG:
4145         case IORING_OP_RECV:
4146                 if (sqe) {
4147                         ret = io_recvmsg_prep(req, sqe);
4148                         if (ret)
4149                                 break;
4150                 }
4151                 if (req->opcode == IORING_OP_RECVMSG)
4152                         ret = io_recvmsg(req, nxt, force_nonblock);
4153                 else
4154                         ret = io_recv(req, nxt, force_nonblock);
4155                 break;
4156         case IORING_OP_TIMEOUT:
4157                 if (sqe) {
4158                         ret = io_timeout_prep(req, sqe, false);
4159                         if (ret)
4160                                 break;
4161                 }
4162                 ret = io_timeout(req);
4163                 break;
4164         case IORING_OP_TIMEOUT_REMOVE:
4165                 if (sqe) {
4166                         ret = io_timeout_remove_prep(req, sqe);
4167                         if (ret)
4168                                 break;
4169                 }
4170                 ret = io_timeout_remove(req);
4171                 break;
4172         case IORING_OP_ACCEPT:
4173                 if (sqe) {
4174                         ret = io_accept_prep(req, sqe);
4175                         if (ret)
4176                                 break;
4177                 }
4178                 ret = io_accept(req, nxt, force_nonblock);
4179                 break;
4180         case IORING_OP_CONNECT:
4181                 if (sqe) {
4182                         ret = io_connect_prep(req, sqe);
4183                         if (ret)
4184                                 break;
4185                 }
4186                 ret = io_connect(req, nxt, force_nonblock);
4187                 break;
4188         case IORING_OP_ASYNC_CANCEL:
4189                 if (sqe) {
4190                         ret = io_async_cancel_prep(req, sqe);
4191                         if (ret)
4192                                 break;
4193                 }
4194                 ret = io_async_cancel(req, nxt);
4195                 break;
4196         case IORING_OP_FALLOCATE:
4197                 if (sqe) {
4198                         ret = io_fallocate_prep(req, sqe);
4199                         if (ret)
4200                                 break;
4201                 }
4202                 ret = io_fallocate(req, nxt, force_nonblock);
4203                 break;
4204         case IORING_OP_OPENAT:
4205                 if (sqe) {
4206                         ret = io_openat_prep(req, sqe);
4207                         if (ret)
4208                                 break;
4209                 }
4210                 ret = io_openat(req, nxt, force_nonblock);
4211                 break;
4212         case IORING_OP_CLOSE:
4213                 if (sqe) {
4214                         ret = io_close_prep(req, sqe);
4215                         if (ret)
4216                                 break;
4217                 }
4218                 ret = io_close(req, nxt, force_nonblock);
4219                 break;
4220         case IORING_OP_FILES_UPDATE:
4221                 if (sqe) {
4222                         ret = io_files_update_prep(req, sqe);
4223                         if (ret)
4224                                 break;
4225                 }
4226                 ret = io_files_update(req, force_nonblock);
4227                 break;
4228         case IORING_OP_STATX:
4229                 if (sqe) {
4230                         ret = io_statx_prep(req, sqe);
4231                         if (ret)
4232                                 break;
4233                 }
4234                 ret = io_statx(req, nxt, force_nonblock);
4235                 break;
4236         case IORING_OP_FADVISE:
4237                 if (sqe) {
4238                         ret = io_fadvise_prep(req, sqe);
4239                         if (ret)
4240                                 break;
4241                 }
4242                 ret = io_fadvise(req, nxt, force_nonblock);
4243                 break;
4244         case IORING_OP_MADVISE:
4245                 if (sqe) {
4246                         ret = io_madvise_prep(req, sqe);
4247                         if (ret)
4248                                 break;
4249                 }
4250                 ret = io_madvise(req, nxt, force_nonblock);
4251                 break;
4252         case IORING_OP_OPENAT2:
4253                 if (sqe) {
4254                         ret = io_openat2_prep(req, sqe);
4255                         if (ret)
4256                                 break;
4257                 }
4258                 ret = io_openat2(req, nxt, force_nonblock);
4259                 break;
4260         default:
4261                 ret = -EINVAL;
4262                 break;
4263         }
4264
4265         if (ret)
4266                 return ret;
4267
4268         if (ctx->flags & IORING_SETUP_IOPOLL) {
4269                 const bool in_async = io_wq_current_is_worker();
4270
4271                 if (req->result == -EAGAIN)
4272                         return -EAGAIN;
4273
4274                 /* workqueue context doesn't hold uring_lock, grab it now */
4275                 if (in_async)
4276                         mutex_lock(&ctx->uring_lock);
4277
4278                 io_iopoll_req_issued(req);
4279
4280                 if (in_async)
4281                         mutex_unlock(&ctx->uring_lock);
4282         }
4283
4284         return 0;
4285 }
4286
4287 static void io_wq_submit_work(struct io_wq_work **workptr)
4288 {
4289         struct io_wq_work *work = *workptr;
4290         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4291         struct io_kiocb *nxt = NULL;
4292         int ret = 0;
4293
4294         /* if NO_CANCEL is set, we must still run the work */
4295         if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
4296                                 IO_WQ_WORK_CANCEL) {
4297                 ret = -ECANCELED;
4298         }
4299
4300         if (!ret) {
4301                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
4302                 req->in_async = true;
4303                 do {
4304                         ret = io_issue_sqe(req, NULL, &nxt, false);
4305                         /*
4306                          * We can get EAGAIN for polled IO even though we're
4307                          * forcing a sync submission from here, since we can't
4308                          * wait for request slots on the block side.
4309                          */
4310                         if (ret != -EAGAIN)
4311                                 break;
4312                         cond_resched();
4313                 } while (1);
4314         }
4315
4316         /* drop submission reference */
4317         io_put_req(req);
4318
4319         if (ret) {
4320                 req_set_fail_links(req);
4321                 io_cqring_add_event(req, ret);
4322                 io_put_req(req);
4323         }
4324
4325         /* if a dependent link is ready, pass it back */
4326         if (!ret && nxt)
4327                 io_wq_assign_next(workptr, nxt);
4328 }
4329
4330 static int io_req_needs_file(struct io_kiocb *req, int fd)
4331 {
4332         if (!io_op_defs[req->opcode].needs_file)
4333                 return 0;
4334         if (fd == -1 && io_op_defs[req->opcode].fd_non_neg)
4335                 return 0;
4336         return 1;
4337 }
4338
4339 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
4340                                               int index)
4341 {
4342         struct fixed_file_table *table;
4343
4344         table = &ctx->file_data->table[index >> IORING_FILE_TABLE_SHIFT];
4345         return table->files[index & IORING_FILE_TABLE_MASK];;
4346 }
4347
4348 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req,
4349                            const struct io_uring_sqe *sqe)
4350 {
4351         struct io_ring_ctx *ctx = req->ctx;
4352         unsigned flags;
4353         int fd;
4354
4355         flags = READ_ONCE(sqe->flags);
4356         fd = READ_ONCE(sqe->fd);
4357
4358         if (flags & IOSQE_IO_DRAIN)
4359                 req->flags |= REQ_F_IO_DRAIN;
4360
4361         if (!io_req_needs_file(req, fd))
4362                 return 0;
4363
4364         if (flags & IOSQE_FIXED_FILE) {
4365                 if (unlikely(!ctx->file_data ||
4366                     (unsigned) fd >= ctx->nr_user_files))
4367                         return -EBADF;
4368                 fd = array_index_nospec(fd, ctx->nr_user_files);
4369                 req->file = io_file_from_index(ctx, fd);
4370                 if (!req->file)
4371                         return -EBADF;
4372                 req->flags |= REQ_F_FIXED_FILE;
4373                 percpu_ref_get(&ctx->file_data->refs);
4374         } else {
4375                 if (req->needs_fixed_file)
4376                         return -EBADF;
4377                 trace_io_uring_file_get(ctx, fd);
4378                 req->file = io_file_get(state, fd);
4379                 if (unlikely(!req->file))
4380                         return -EBADF;
4381         }
4382
4383         return 0;
4384 }
4385
4386 static int io_grab_files(struct io_kiocb *req)
4387 {
4388         int ret = -EBADF;
4389         struct io_ring_ctx *ctx = req->ctx;
4390
4391         if (!ctx->ring_file)
4392                 return -EBADF;
4393
4394         rcu_read_lock();
4395         spin_lock_irq(&ctx->inflight_lock);
4396         /*
4397          * We use the f_ops->flush() handler to ensure that we can flush
4398          * out work accessing these files if the fd is closed. Check if
4399          * the fd has changed since we started down this path, and disallow
4400          * this operation if it has.
4401          */
4402         if (fcheck(ctx->ring_fd) == ctx->ring_file) {
4403                 list_add(&req->inflight_entry, &ctx->inflight_list);
4404                 req->flags |= REQ_F_INFLIGHT;
4405                 req->work.files = current->files;
4406                 ret = 0;
4407         }
4408         spin_unlock_irq(&ctx->inflight_lock);
4409         rcu_read_unlock();
4410
4411         return ret;
4412 }
4413
4414 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
4415 {
4416         struct io_timeout_data *data = container_of(timer,
4417                                                 struct io_timeout_data, timer);
4418         struct io_kiocb *req = data->req;
4419         struct io_ring_ctx *ctx = req->ctx;
4420         struct io_kiocb *prev = NULL;
4421         unsigned long flags;
4422
4423         spin_lock_irqsave(&ctx->completion_lock, flags);
4424
4425         /*
4426          * We don't expect the list to be empty, that will only happen if we
4427          * race with the completion of the linked work.
4428          */
4429         if (!list_empty(&req->link_list)) {
4430                 prev = list_entry(req->link_list.prev, struct io_kiocb,
4431                                   link_list);
4432                 if (refcount_inc_not_zero(&prev->refs)) {
4433                         list_del_init(&req->link_list);
4434                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
4435                 } else
4436                         prev = NULL;
4437         }
4438
4439         spin_unlock_irqrestore(&ctx->completion_lock, flags);
4440
4441         if (prev) {
4442                 req_set_fail_links(prev);
4443                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
4444                                                 -ETIME);
4445                 io_put_req(prev);
4446         } else {
4447                 io_cqring_add_event(req, -ETIME);
4448                 io_put_req(req);
4449         }
4450         return HRTIMER_NORESTART;
4451 }
4452
4453 static void io_queue_linked_timeout(struct io_kiocb *req)
4454 {
4455         struct io_ring_ctx *ctx = req->ctx;
4456
4457         /*
4458          * If the list is now empty, then our linked request finished before
4459          * we got a chance to setup the timer
4460          */
4461         spin_lock_irq(&ctx->completion_lock);
4462         if (!list_empty(&req->link_list)) {
4463                 struct io_timeout_data *data = &req->io->timeout;
4464
4465                 data->timer.function = io_link_timeout_fn;
4466                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
4467                                 data->mode);
4468         }
4469         spin_unlock_irq(&ctx->completion_lock);
4470
4471         /* drop submission reference */
4472         io_put_req(req);
4473 }
4474
4475 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
4476 {
4477         struct io_kiocb *nxt;
4478
4479         if (!(req->flags & REQ_F_LINK))
4480                 return NULL;
4481
4482         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
4483                                         link_list);
4484         if (!nxt || nxt->opcode != IORING_OP_LINK_TIMEOUT)
4485                 return NULL;
4486
4487         req->flags |= REQ_F_LINK_TIMEOUT;
4488         return nxt;
4489 }
4490
4491 static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4492 {
4493         struct io_kiocb *linked_timeout;
4494         struct io_kiocb *nxt = NULL;
4495         int ret;
4496
4497 again:
4498         linked_timeout = io_prep_linked_timeout(req);
4499
4500         ret = io_issue_sqe(req, sqe, &nxt, true);
4501
4502         /*
4503          * We async punt it if the file wasn't marked NOWAIT, or if the file
4504          * doesn't support non-blocking read/write attempts
4505          */
4506         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
4507             (req->flags & REQ_F_MUST_PUNT))) {
4508                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
4509                         ret = io_grab_files(req);
4510                         if (ret)
4511                                 goto err;
4512                 }
4513
4514                 /*
4515                  * Queued up for async execution, worker will release
4516                  * submit reference when the iocb is actually submitted.
4517                  */
4518                 io_queue_async_work(req);
4519                 goto done_req;
4520         }
4521
4522 err:
4523         /* drop submission reference */
4524         io_put_req(req);
4525
4526         if (linked_timeout) {
4527                 if (!ret)
4528                         io_queue_linked_timeout(linked_timeout);
4529                 else
4530                         io_put_req(linked_timeout);
4531         }
4532
4533         /* and drop final reference, if we failed */
4534         if (ret) {
4535                 io_cqring_add_event(req, ret);
4536                 req_set_fail_links(req);
4537                 io_put_req(req);
4538         }
4539 done_req:
4540         if (nxt) {
4541                 req = nxt;
4542                 nxt = NULL;
4543                 goto again;
4544         }
4545 }
4546
4547 static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4548 {
4549         int ret;
4550
4551         ret = io_req_defer(req, sqe);
4552         if (ret) {
4553                 if (ret != -EIOCBQUEUED) {
4554                         io_cqring_add_event(req, ret);
4555                         req_set_fail_links(req);
4556                         io_double_put_req(req);
4557                 }
4558         } else if (req->flags & REQ_F_FORCE_ASYNC) {
4559                 /*
4560                  * Never try inline submit of IOSQE_ASYNC is set, go straight
4561                  * to async execution.
4562                  */
4563                 req->work.flags |= IO_WQ_WORK_CONCURRENT;
4564                 io_queue_async_work(req);
4565         } else {
4566                 __io_queue_sqe(req, sqe);
4567         }
4568 }
4569
4570 static inline void io_queue_link_head(struct io_kiocb *req)
4571 {
4572         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
4573                 io_cqring_add_event(req, -ECANCELED);
4574                 io_double_put_req(req);
4575         } else
4576                 io_queue_sqe(req, NULL);
4577 }
4578
4579 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
4580                                 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
4581
4582 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4583                           struct io_submit_state *state, struct io_kiocb **link)
4584 {
4585         struct io_ring_ctx *ctx = req->ctx;
4586         unsigned int sqe_flags;
4587         int ret;
4588
4589         sqe_flags = READ_ONCE(sqe->flags);
4590
4591         /* enforce forwards compatibility on users */
4592         if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) {
4593                 ret = -EINVAL;
4594                 goto err_req;
4595         }
4596         if (sqe_flags & IOSQE_ASYNC)
4597                 req->flags |= REQ_F_FORCE_ASYNC;
4598
4599         ret = io_req_set_file(state, req, sqe);
4600         if (unlikely(ret)) {
4601 err_req:
4602                 io_cqring_add_event(req, ret);
4603                 io_double_put_req(req);
4604                 return false;
4605         }
4606
4607         /*
4608          * If we already have a head request, queue this one for async
4609          * submittal once the head completes. If we don't have a head but
4610          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
4611          * submitted sync once the chain is complete. If none of those
4612          * conditions are true (normal request), then just queue it.
4613          */
4614         if (*link) {
4615                 struct io_kiocb *head = *link;
4616
4617                 if (sqe_flags & IOSQE_IO_DRAIN) {
4618                         head->flags |= REQ_F_IO_DRAIN;
4619                         ctx->drain_next = 1;
4620                 }
4621
4622                 if (sqe_flags & IOSQE_IO_HARDLINK)
4623                         req->flags |= REQ_F_HARDLINK;
4624
4625                 if (io_alloc_async_ctx(req)) {
4626                         ret = -EAGAIN;
4627                         goto err_req;
4628                 }
4629
4630                 ret = io_req_defer_prep(req, sqe);
4631                 if (ret) {
4632                         /* fail even hard links since we don't submit */
4633                         head->flags |= REQ_F_FAIL_LINK;
4634                         goto err_req;
4635                 }
4636                 trace_io_uring_link(ctx, req, head);
4637                 list_add_tail(&req->link_list, &head->link_list);
4638
4639                 /* last request of a link, enqueue the link */
4640                 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK))) {
4641                         io_queue_link_head(head);
4642                         *link = NULL;
4643                 }
4644         } else {
4645                 if (unlikely(ctx->drain_next)) {
4646                         req->flags |= REQ_F_IO_DRAIN;
4647                         req->ctx->drain_next = 0;
4648                 }
4649                 if (sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
4650                         req->flags |= REQ_F_LINK;
4651                         if (sqe_flags & IOSQE_IO_HARDLINK)
4652                                 req->flags |= REQ_F_HARDLINK;
4653
4654                         INIT_LIST_HEAD(&req->link_list);
4655                         ret = io_req_defer_prep(req, sqe);
4656                         if (ret)
4657                                 req->flags |= REQ_F_FAIL_LINK;
4658                         *link = req;
4659                 } else {
4660                         io_queue_sqe(req, sqe);
4661                 }
4662         }
4663
4664         return true;
4665 }
4666
4667 /*
4668  * Batched submission is done, ensure local IO is flushed out.
4669  */
4670 static void io_submit_state_end(struct io_submit_state *state)
4671 {
4672         blk_finish_plug(&state->plug);
4673         io_file_put(state);
4674         if (state->free_reqs)
4675                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
4676                                         &state->reqs[state->cur_req]);
4677 }
4678
4679 /*
4680  * Start submission side cache.
4681  */
4682 static void io_submit_state_start(struct io_submit_state *state,
4683                                   unsigned int max_ios)
4684 {
4685         blk_start_plug(&state->plug);
4686         state->free_reqs = 0;
4687         state->file = NULL;
4688         state->ios_left = max_ios;
4689 }
4690
4691 static void io_commit_sqring(struct io_ring_ctx *ctx)
4692 {
4693         struct io_rings *rings = ctx->rings;
4694
4695         /*
4696          * Ensure any loads from the SQEs are done at this point,
4697          * since once we write the new head, the application could
4698          * write new data to them.
4699          */
4700         smp_store_release(&rings->sq.head, ctx->cached_sq_head);
4701 }
4702
4703 /*
4704  * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
4705  * that is mapped by userspace. This means that care needs to be taken to
4706  * ensure that reads are stable, as we cannot rely on userspace always
4707  * being a good citizen. If members of the sqe are validated and then later
4708  * used, it's important that those reads are done through READ_ONCE() to
4709  * prevent a re-load down the line.
4710  */
4711 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req,
4712                           const struct io_uring_sqe **sqe_ptr)
4713 {
4714         u32 *sq_array = ctx->sq_array;
4715         unsigned head;
4716
4717         /*
4718          * The cached sq head (or cq tail) serves two purposes:
4719          *
4720          * 1) allows us to batch the cost of updating the user visible
4721          *    head updates.
4722          * 2) allows the kernel side to track the head on its own, even
4723          *    though the application is the one updating it.
4724          */
4725         head = READ_ONCE(sq_array[ctx->cached_sq_head & ctx->sq_mask]);
4726         if (likely(head < ctx->sq_entries)) {
4727                 /*
4728                  * All io need record the previous position, if LINK vs DARIN,
4729                  * it can be used to mark the position of the first IO in the
4730                  * link list.
4731                  */
4732                 req->sequence = ctx->cached_sq_head;
4733                 *sqe_ptr = &ctx->sq_sqes[head];
4734                 req->opcode = READ_ONCE((*sqe_ptr)->opcode);
4735                 req->user_data = READ_ONCE((*sqe_ptr)->user_data);
4736                 ctx->cached_sq_head++;
4737                 return true;
4738         }
4739
4740         /* drop invalid entries */
4741         ctx->cached_sq_head++;
4742         ctx->cached_sq_dropped++;
4743         WRITE_ONCE(ctx->rings->sq_dropped, ctx->cached_sq_dropped);
4744         return false;
4745 }
4746
4747 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
4748                           struct file *ring_file, int ring_fd,
4749                           struct mm_struct **mm, bool async)
4750 {
4751         struct io_submit_state state, *statep = NULL;
4752         struct io_kiocb *link = NULL;
4753         int i, submitted = 0;
4754         bool mm_fault = false;
4755
4756         /* if we have a backlog and couldn't flush it all, return BUSY */
4757         if (test_bit(0, &ctx->sq_check_overflow)) {
4758                 if (!list_empty(&ctx->cq_overflow_list) &&
4759                     !io_cqring_overflow_flush(ctx, false))
4760                         return -EBUSY;
4761         }
4762
4763         /* make sure SQ entry isn't read before tail */
4764         nr = min3(nr, ctx->sq_entries, io_sqring_entries(ctx));
4765
4766         if (!percpu_ref_tryget_many(&ctx->refs, nr))
4767                 return -EAGAIN;
4768
4769         if (nr > IO_PLUG_THRESHOLD) {
4770                 io_submit_state_start(&state, nr);
4771                 statep = &state;
4772         }
4773
4774         ctx->ring_fd = ring_fd;
4775         ctx->ring_file = ring_file;
4776
4777         for (i = 0; i < nr; i++) {
4778                 const struct io_uring_sqe *sqe;
4779                 struct io_kiocb *req;
4780
4781                 req = io_get_req(ctx, statep);
4782                 if (unlikely(!req)) {
4783                         if (!submitted)
4784                                 submitted = -EAGAIN;
4785                         break;
4786                 }
4787                 if (!io_get_sqring(ctx, req, &sqe)) {
4788                         __io_req_do_free(req);
4789                         break;
4790                 }
4791
4792                 /* will complete beyond this point, count as submitted */
4793                 submitted++;
4794
4795                 if (unlikely(req->opcode >= IORING_OP_LAST)) {
4796                         io_cqring_add_event(req, -EINVAL);
4797                         io_double_put_req(req);
4798                         break;
4799                 }
4800
4801                 if (io_op_defs[req->opcode].needs_mm && !*mm) {
4802                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
4803                         if (!mm_fault) {
4804                                 use_mm(ctx->sqo_mm);
4805                                 *mm = ctx->sqo_mm;
4806                         }
4807                 }
4808
4809                 req->has_user = *mm != NULL;
4810                 req->in_async = async;
4811                 req->needs_fixed_file = async;
4812                 trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data,
4813                                                 true, async);
4814                 if (!io_submit_sqe(req, sqe, statep, &link))
4815                         break;
4816         }
4817
4818         if (submitted != nr)
4819                 percpu_ref_put_many(&ctx->refs, nr - submitted);
4820         if (link)
4821                 io_queue_link_head(link);
4822         if (statep)
4823                 io_submit_state_end(&state);
4824
4825          /* Commit SQ ring head once we've consumed and submitted all SQEs */
4826         io_commit_sqring(ctx);
4827
4828         return submitted;
4829 }
4830
4831 static int io_sq_thread(void *data)
4832 {
4833         struct io_ring_ctx *ctx = data;
4834         struct mm_struct *cur_mm = NULL;
4835         const struct cred *old_cred;
4836         mm_segment_t old_fs;
4837         DEFINE_WAIT(wait);
4838         unsigned inflight;
4839         unsigned long timeout;
4840         int ret;
4841
4842         complete(&ctx->completions[1]);
4843
4844         old_fs = get_fs();
4845         set_fs(USER_DS);
4846         old_cred = override_creds(ctx->creds);
4847
4848         ret = timeout = inflight = 0;
4849         while (!kthread_should_park()) {
4850                 unsigned int to_submit;
4851
4852                 if (inflight) {
4853                         unsigned nr_events = 0;
4854
4855                         if (ctx->flags & IORING_SETUP_IOPOLL) {
4856                                 /*
4857                                  * inflight is the count of the maximum possible
4858                                  * entries we submitted, but it can be smaller
4859                                  * if we dropped some of them. If we don't have
4860                                  * poll entries available, then we know that we
4861                                  * have nothing left to poll for. Reset the
4862                                  * inflight count to zero in that case.
4863                                  */
4864                                 mutex_lock(&ctx->uring_lock);
4865                                 if (!list_empty(&ctx->poll_list))
4866                                         __io_iopoll_check(ctx, &nr_events, 0);
4867                                 else
4868                                         inflight = 0;
4869                                 mutex_unlock(&ctx->uring_lock);
4870                         } else {
4871                                 /*
4872                                  * Normal IO, just pretend everything completed.
4873                                  * We don't have to poll completions for that.
4874                                  */
4875                                 nr_events = inflight;
4876                         }
4877
4878                         inflight -= nr_events;
4879                         if (!inflight)
4880                                 timeout = jiffies + ctx->sq_thread_idle;
4881                 }
4882
4883                 to_submit = io_sqring_entries(ctx);
4884
4885                 /*
4886                  * If submit got -EBUSY, flag us as needing the application
4887                  * to enter the kernel to reap and flush events.
4888                  */
4889                 if (!to_submit || ret == -EBUSY) {
4890                         /*
4891                          * We're polling. If we're within the defined idle
4892                          * period, then let us spin without work before going
4893                          * to sleep. The exception is if we got EBUSY doing
4894                          * more IO, we should wait for the application to
4895                          * reap events and wake us up.
4896                          */
4897                         if (inflight ||
4898                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
4899                                 cond_resched();
4900                                 continue;
4901                         }
4902
4903                         /*
4904                          * Drop cur_mm before scheduling, we can't hold it for
4905                          * long periods (or over schedule()). Do this before
4906                          * adding ourselves to the waitqueue, as the unuse/drop
4907                          * may sleep.
4908                          */
4909                         if (cur_mm) {
4910                                 unuse_mm(cur_mm);
4911                                 mmput(cur_mm);
4912                                 cur_mm = NULL;
4913                         }
4914
4915                         prepare_to_wait(&ctx->sqo_wait, &wait,
4916                                                 TASK_INTERRUPTIBLE);
4917
4918                         /* Tell userspace we may need a wakeup call */
4919                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
4920                         /* make sure to read SQ tail after writing flags */
4921                         smp_mb();
4922
4923                         to_submit = io_sqring_entries(ctx);
4924                         if (!to_submit || ret == -EBUSY) {
4925                                 if (kthread_should_park()) {
4926                                         finish_wait(&ctx->sqo_wait, &wait);
4927                                         break;
4928                                 }
4929                                 if (signal_pending(current))
4930                                         flush_signals(current);
4931                                 schedule();
4932                                 finish_wait(&ctx->sqo_wait, &wait);
4933
4934                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4935                                 continue;
4936                         }
4937                         finish_wait(&ctx->sqo_wait, &wait);
4938
4939                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4940                 }
4941
4942                 mutex_lock(&ctx->uring_lock);
4943                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
4944                 mutex_unlock(&ctx->uring_lock);
4945                 if (ret > 0)
4946                         inflight += ret;
4947         }
4948
4949         set_fs(old_fs);
4950         if (cur_mm) {
4951                 unuse_mm(cur_mm);
4952                 mmput(cur_mm);
4953         }
4954         revert_creds(old_cred);
4955
4956         kthread_parkme();
4957
4958         return 0;
4959 }
4960
4961 struct io_wait_queue {
4962         struct wait_queue_entry wq;
4963         struct io_ring_ctx *ctx;
4964         unsigned to_wait;
4965         unsigned nr_timeouts;
4966 };
4967
4968 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
4969 {
4970         struct io_ring_ctx *ctx = iowq->ctx;
4971
4972         /*
4973          * Wake up if we have enough events, or if a timeout occurred since we
4974          * started waiting. For timeouts, we always want to return to userspace,
4975          * regardless of event count.
4976          */
4977         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
4978                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
4979 }
4980
4981 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
4982                             int wake_flags, void *key)
4983 {
4984         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
4985                                                         wq);
4986
4987         /* use noflush == true, as we can't safely rely on locking context */
4988         if (!io_should_wake(iowq, true))
4989                 return -1;
4990
4991         return autoremove_wake_function(curr, mode, wake_flags, key);
4992 }
4993
4994 /*
4995  * Wait until events become available, if we don't already have some. The
4996  * application must reap them itself, as they reside on the shared cq ring.
4997  */
4998 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
4999                           const sigset_t __user *sig, size_t sigsz)
5000 {
5001         struct io_wait_queue iowq = {
5002                 .wq = {
5003                         .private        = current,
5004                         .func           = io_wake_function,
5005                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
5006                 },
5007                 .ctx            = ctx,
5008                 .to_wait        = min_events,
5009         };
5010         struct io_rings *rings = ctx->rings;
5011         int ret = 0;
5012
5013         if (io_cqring_events(ctx, false) >= min_events)
5014                 return 0;
5015
5016         if (sig) {
5017 #ifdef CONFIG_COMPAT
5018                 if (in_compat_syscall())
5019                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
5020                                                       sigsz);
5021                 else
5022 #endif
5023                         ret = set_user_sigmask(sig, sigsz);
5024
5025                 if (ret)
5026                         return ret;
5027         }
5028
5029         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
5030         trace_io_uring_cqring_wait(ctx, min_events);
5031         do {
5032                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
5033                                                 TASK_INTERRUPTIBLE);
5034                 if (io_should_wake(&iowq, false))
5035                         break;
5036                 schedule();
5037                 if (signal_pending(current)) {
5038                         ret = -EINTR;
5039                         break;
5040                 }
5041         } while (1);
5042         finish_wait(&ctx->wait, &iowq.wq);
5043
5044         restore_saved_sigmask_unless(ret == -EINTR);
5045
5046         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
5047 }
5048
5049 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
5050 {
5051 #if defined(CONFIG_UNIX)
5052         if (ctx->ring_sock) {
5053                 struct sock *sock = ctx->ring_sock->sk;
5054                 struct sk_buff *skb;
5055
5056                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
5057                         kfree_skb(skb);
5058         }
5059 #else
5060         int i;
5061
5062         for (i = 0; i < ctx->nr_user_files; i++) {
5063                 struct file *file;
5064
5065                 file = io_file_from_index(ctx, i);
5066                 if (file)
5067                         fput(file);
5068         }
5069 #endif
5070 }
5071
5072 static void io_file_ref_kill(struct percpu_ref *ref)
5073 {
5074         struct fixed_file_data *data;
5075
5076         data = container_of(ref, struct fixed_file_data, refs);
5077         complete(&data->done);
5078 }
5079
5080 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
5081 {
5082         struct fixed_file_data *data = ctx->file_data;
5083         unsigned nr_tables, i;
5084
5085         if (!data)
5086                 return -ENXIO;
5087
5088         /* protect against inflight atomic switch, which drops the ref */
5089         percpu_ref_get(&data->refs);
5090         /* wait for existing switches */
5091         flush_work(&data->ref_work);
5092         percpu_ref_kill_and_confirm(&data->refs, io_file_ref_kill);
5093         wait_for_completion(&data->done);
5094         percpu_ref_put(&data->refs);
5095         /* flush potential new switch */
5096         flush_work(&data->ref_work);
5097         percpu_ref_exit(&data->refs);
5098
5099         __io_sqe_files_unregister(ctx);
5100         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
5101         for (i = 0; i < nr_tables; i++)
5102                 kfree(data->table[i].files);
5103         kfree(data->table);
5104         kfree(data);
5105         ctx->file_data = NULL;
5106         ctx->nr_user_files = 0;
5107         return 0;
5108 }
5109
5110 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
5111 {
5112         if (ctx->sqo_thread) {
5113                 wait_for_completion(&ctx->completions[1]);
5114                 /*
5115                  * The park is a bit of a work-around, without it we get
5116                  * warning spews on shutdown with SQPOLL set and affinity
5117                  * set to a single CPU.
5118                  */
5119                 kthread_park(ctx->sqo_thread);
5120                 kthread_stop(ctx->sqo_thread);
5121                 ctx->sqo_thread = NULL;
5122         }
5123 }
5124
5125 static void io_finish_async(struct io_ring_ctx *ctx)
5126 {
5127         io_sq_thread_stop(ctx);
5128
5129         if (ctx->io_wq) {
5130                 io_wq_destroy(ctx->io_wq);
5131                 ctx->io_wq = NULL;
5132         }
5133 }
5134
5135 #if defined(CONFIG_UNIX)
5136 /*
5137  * Ensure the UNIX gc is aware of our file set, so we are certain that
5138  * the io_uring can be safely unregistered on process exit, even if we have
5139  * loops in the file referencing.
5140  */
5141 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
5142 {
5143         struct sock *sk = ctx->ring_sock->sk;
5144         struct scm_fp_list *fpl;
5145         struct sk_buff *skb;
5146         int i, nr_files;
5147
5148         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
5149                 unsigned long inflight = ctx->user->unix_inflight + nr;
5150
5151                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
5152                         return -EMFILE;
5153         }
5154
5155         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
5156         if (!fpl)
5157                 return -ENOMEM;
5158
5159         skb = alloc_skb(0, GFP_KERNEL);
5160         if (!skb) {
5161                 kfree(fpl);
5162                 return -ENOMEM;
5163         }
5164
5165         skb->sk = sk;
5166
5167         nr_files = 0;
5168         fpl->user = get_uid(ctx->user);
5169         for (i = 0; i < nr; i++) {
5170                 struct file *file = io_file_from_index(ctx, i + offset);
5171
5172                 if (!file)
5173                         continue;
5174                 fpl->fp[nr_files] = get_file(file);
5175                 unix_inflight(fpl->user, fpl->fp[nr_files]);
5176                 nr_files++;
5177         }
5178
5179         if (nr_files) {
5180                 fpl->max = SCM_MAX_FD;
5181                 fpl->count = nr_files;
5182                 UNIXCB(skb).fp = fpl;
5183                 skb->destructor = unix_destruct_scm;
5184                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
5185                 skb_queue_head(&sk->sk_receive_queue, skb);
5186
5187                 for (i = 0; i < nr_files; i++)
5188                         fput(fpl->fp[i]);
5189         } else {
5190                 kfree_skb(skb);
5191                 kfree(fpl);
5192         }
5193
5194         return 0;
5195 }
5196
5197 /*
5198  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
5199  * causes regular reference counting to break down. We rely on the UNIX
5200  * garbage collection to take care of this problem for us.
5201  */
5202 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5203 {
5204         unsigned left, total;
5205         int ret = 0;
5206
5207         total = 0;
5208         left = ctx->nr_user_files;
5209         while (left) {
5210                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
5211
5212                 ret = __io_sqe_files_scm(ctx, this_files, total);
5213                 if (ret)
5214                         break;
5215                 left -= this_files;
5216                 total += this_files;
5217         }
5218
5219         if (!ret)
5220                 return 0;
5221
5222         while (total < ctx->nr_user_files) {
5223                 struct file *file = io_file_from_index(ctx, total);
5224
5225                 if (file)
5226                         fput(file);
5227                 total++;
5228         }
5229
5230         return ret;
5231 }
5232 #else
5233 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5234 {
5235         return 0;
5236 }
5237 #endif
5238
5239 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
5240                                     unsigned nr_files)
5241 {
5242         int i;
5243
5244         for (i = 0; i < nr_tables; i++) {
5245                 struct fixed_file_table *table = &ctx->file_data->table[i];
5246                 unsigned this_files;
5247
5248                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
5249                 table->files = kcalloc(this_files, sizeof(struct file *),
5250                                         GFP_KERNEL);
5251                 if (!table->files)
5252                         break;
5253                 nr_files -= this_files;
5254         }
5255
5256         if (i == nr_tables)
5257                 return 0;
5258
5259         for (i = 0; i < nr_tables; i++) {
5260                 struct fixed_file_table *table = &ctx->file_data->table[i];
5261                 kfree(table->files);
5262         }
5263         return 1;
5264 }
5265
5266 static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file)
5267 {
5268 #if defined(CONFIG_UNIX)
5269         struct sock *sock = ctx->ring_sock->sk;
5270         struct sk_buff_head list, *head = &sock->sk_receive_queue;
5271         struct sk_buff *skb;
5272         int i;
5273
5274         __skb_queue_head_init(&list);
5275
5276         /*
5277          * Find the skb that holds this file in its SCM_RIGHTS. When found,
5278          * remove this entry and rearrange the file array.
5279          */
5280         skb = skb_dequeue(head);
5281         while (skb) {
5282                 struct scm_fp_list *fp;
5283
5284                 fp = UNIXCB(skb).fp;
5285                 for (i = 0; i < fp->count; i++) {
5286                         int left;
5287
5288                         if (fp->fp[i] != file)
5289                                 continue;
5290
5291                         unix_notinflight(fp->user, fp->fp[i]);
5292                         left = fp->count - 1 - i;
5293                         if (left) {
5294                                 memmove(&fp->fp[i], &fp->fp[i + 1],
5295                                                 left * sizeof(struct file *));
5296                         }
5297                         fp->count--;
5298                         if (!fp->count) {
5299                                 kfree_skb(skb);
5300                                 skb = NULL;
5301                         } else {
5302                                 __skb_queue_tail(&list, skb);
5303                         }
5304                         fput(file);
5305                         file = NULL;
5306                         break;
5307                 }
5308
5309                 if (!file)
5310                         break;
5311
5312                 __skb_queue_tail(&list, skb);
5313
5314                 skb = skb_dequeue(head);
5315         }
5316
5317         if (skb_peek(&list)) {
5318                 spin_lock_irq(&head->lock);
5319                 while ((skb = __skb_dequeue(&list)) != NULL)
5320                         __skb_queue_tail(head, skb);
5321                 spin_unlock_irq(&head->lock);
5322         }
5323 #else
5324         fput(file);
5325 #endif
5326 }
5327
5328 struct io_file_put {
5329         struct llist_node llist;
5330         struct file *file;
5331         struct completion *done;
5332 };
5333
5334 static void io_ring_file_ref_switch(struct work_struct *work)
5335 {
5336         struct io_file_put *pfile, *tmp;
5337         struct fixed_file_data *data;
5338         struct llist_node *node;
5339
5340         data = container_of(work, struct fixed_file_data, ref_work);
5341
5342         while ((node = llist_del_all(&data->put_llist)) != NULL) {
5343                 llist_for_each_entry_safe(pfile, tmp, node, llist) {
5344                         io_ring_file_put(data->ctx, pfile->file);
5345                         if (pfile->done)
5346                                 complete(pfile->done);
5347                         else
5348                                 kfree(pfile);
5349                 }
5350         }
5351
5352         percpu_ref_get(&data->refs);
5353         percpu_ref_switch_to_percpu(&data->refs);
5354 }
5355
5356 static void io_file_data_ref_zero(struct percpu_ref *ref)
5357 {
5358         struct fixed_file_data *data;
5359
5360         data = container_of(ref, struct fixed_file_data, refs);
5361
5362         /* we can't safely switch from inside this context, punt to wq */
5363         queue_work(system_wq, &data->ref_work);
5364 }
5365
5366 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
5367                                  unsigned nr_args)
5368 {
5369         __s32 __user *fds = (__s32 __user *) arg;
5370         unsigned nr_tables;
5371         struct file *file;
5372         int fd, ret = 0;
5373         unsigned i;
5374
5375         if (ctx->file_data)
5376                 return -EBUSY;
5377         if (!nr_args)
5378                 return -EINVAL;
5379         if (nr_args > IORING_MAX_FIXED_FILES)
5380                 return -EMFILE;
5381
5382         ctx->file_data = kzalloc(sizeof(*ctx->file_data), GFP_KERNEL);
5383         if (!ctx->file_data)
5384                 return -ENOMEM;
5385         ctx->file_data->ctx = ctx;
5386         init_completion(&ctx->file_data->done);
5387
5388         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
5389         ctx->file_data->table = kcalloc(nr_tables,
5390                                         sizeof(struct fixed_file_table),
5391                                         GFP_KERNEL);
5392         if (!ctx->file_data->table) {
5393                 kfree(ctx->file_data);
5394                 ctx->file_data = NULL;
5395                 return -ENOMEM;
5396         }
5397
5398         if (percpu_ref_init(&ctx->file_data->refs, io_file_data_ref_zero,
5399                                 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
5400                 kfree(ctx->file_data->table);
5401                 kfree(ctx->file_data);
5402                 ctx->file_data = NULL;
5403                 return -ENOMEM;
5404         }
5405         ctx->file_data->put_llist.first = NULL;
5406         INIT_WORK(&ctx->file_data->ref_work, io_ring_file_ref_switch);
5407
5408         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
5409                 percpu_ref_exit(&ctx->file_data->refs);
5410                 kfree(ctx->file_data->table);
5411                 kfree(ctx->file_data);
5412                 ctx->file_data = NULL;
5413                 return -ENOMEM;
5414         }
5415
5416         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
5417                 struct fixed_file_table *table;
5418                 unsigned index;
5419
5420                 ret = -EFAULT;
5421                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
5422                         break;
5423                 /* allow sparse sets */
5424                 if (fd == -1) {
5425                         ret = 0;
5426                         continue;
5427                 }
5428
5429                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5430                 index = i & IORING_FILE_TABLE_MASK;
5431                 file = fget(fd);
5432
5433                 ret = -EBADF;
5434                 if (!file)
5435                         break;
5436
5437                 /*
5438                  * Don't allow io_uring instances to be registered. If UNIX
5439                  * isn't enabled, then this causes a reference cycle and this
5440                  * instance can never get freed. If UNIX is enabled we'll
5441                  * handle it just fine, but there's still no point in allowing
5442                  * a ring fd as it doesn't support regular read/write anyway.
5443                  */
5444                 if (file->f_op == &io_uring_fops) {
5445                         fput(file);
5446                         break;
5447                 }
5448                 ret = 0;
5449                 table->files[index] = file;
5450         }
5451
5452         if (ret) {
5453                 for (i = 0; i < ctx->nr_user_files; i++) {
5454                         file = io_file_from_index(ctx, i);
5455                         if (file)
5456                                 fput(file);
5457                 }
5458                 for (i = 0; i < nr_tables; i++)
5459                         kfree(ctx->file_data->table[i].files);
5460
5461                 kfree(ctx->file_data->table);
5462                 kfree(ctx->file_data);
5463                 ctx->file_data = NULL;
5464                 ctx->nr_user_files = 0;
5465                 return ret;
5466         }
5467
5468         ret = io_sqe_files_scm(ctx);
5469         if (ret)
5470                 io_sqe_files_unregister(ctx);
5471
5472         return ret;
5473 }
5474
5475 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
5476                                 int index)
5477 {
5478 #if defined(CONFIG_UNIX)
5479         struct sock *sock = ctx->ring_sock->sk;
5480         struct sk_buff_head *head = &sock->sk_receive_queue;
5481         struct sk_buff *skb;
5482
5483         /*
5484          * See if we can merge this file into an existing skb SCM_RIGHTS
5485          * file set. If there's no room, fall back to allocating a new skb
5486          * and filling it in.
5487          */
5488         spin_lock_irq(&head->lock);
5489         skb = skb_peek(head);
5490         if (skb) {
5491                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
5492
5493                 if (fpl->count < SCM_MAX_FD) {
5494                         __skb_unlink(skb, head);
5495                         spin_unlock_irq(&head->lock);
5496                         fpl->fp[fpl->count] = get_file(file);
5497                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
5498                         fpl->count++;
5499                         spin_lock_irq(&head->lock);
5500                         __skb_queue_head(head, skb);
5501                 } else {
5502                         skb = NULL;
5503                 }
5504         }
5505         spin_unlock_irq(&head->lock);
5506
5507         if (skb) {
5508                 fput(file);
5509                 return 0;
5510         }
5511
5512         return __io_sqe_files_scm(ctx, 1, index);
5513 #else
5514         return 0;
5515 #endif
5516 }
5517
5518 static void io_atomic_switch(struct percpu_ref *ref)
5519 {
5520         struct fixed_file_data *data;
5521
5522         data = container_of(ref, struct fixed_file_data, refs);
5523         clear_bit(FFD_F_ATOMIC, &data->state);
5524 }
5525
5526 static bool io_queue_file_removal(struct fixed_file_data *data,
5527                                   struct file *file)
5528 {
5529         struct io_file_put *pfile, pfile_stack;
5530         DECLARE_COMPLETION_ONSTACK(done);
5531
5532         /*
5533          * If we fail allocating the struct we need for doing async reomval
5534          * of this file, just punt to sync and wait for it.
5535          */
5536         pfile = kzalloc(sizeof(*pfile), GFP_KERNEL);
5537         if (!pfile) {
5538                 pfile = &pfile_stack;
5539                 pfile->done = &done;
5540         }
5541
5542         pfile->file = file;
5543         llist_add(&pfile->llist, &data->put_llist);
5544
5545         if (pfile == &pfile_stack) {
5546                 if (!test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5547                         percpu_ref_put(&data->refs);
5548                         percpu_ref_switch_to_atomic(&data->refs,
5549                                                         io_atomic_switch);
5550                 }
5551                 wait_for_completion(&done);
5552                 flush_work(&data->ref_work);
5553                 return false;
5554         }
5555
5556         return true;
5557 }
5558
5559 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
5560                                  struct io_uring_files_update *up,
5561                                  unsigned nr_args)
5562 {
5563         struct fixed_file_data *data = ctx->file_data;
5564         bool ref_switch = false;
5565         struct file *file;
5566         __s32 __user *fds;
5567         int fd, i, err;
5568         __u32 done;
5569
5570         if (check_add_overflow(up->offset, nr_args, &done))
5571                 return -EOVERFLOW;
5572         if (done > ctx->nr_user_files)
5573                 return -EINVAL;
5574
5575         done = 0;
5576         fds = u64_to_user_ptr(up->fds);
5577         while (nr_args) {
5578                 struct fixed_file_table *table;
5579                 unsigned index;
5580
5581                 err = 0;
5582                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
5583                         err = -EFAULT;
5584                         break;
5585                 }
5586                 i = array_index_nospec(up->offset, ctx->nr_user_files);
5587                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5588                 index = i & IORING_FILE_TABLE_MASK;
5589                 if (table->files[index]) {
5590                         file = io_file_from_index(ctx, index);
5591                         table->files[index] = NULL;
5592                         if (io_queue_file_removal(data, file))
5593                                 ref_switch = true;
5594                 }
5595                 if (fd != -1) {
5596                         file = fget(fd);
5597                         if (!file) {
5598                                 err = -EBADF;
5599                                 break;
5600                         }
5601                         /*
5602                          * Don't allow io_uring instances to be registered. If
5603                          * UNIX isn't enabled, then this causes a reference
5604                          * cycle and this instance can never get freed. If UNIX
5605                          * is enabled we'll handle it just fine, but there's
5606                          * still no point in allowing a ring fd as it doesn't
5607                          * support regular read/write anyway.
5608                          */
5609                         if (file->f_op == &io_uring_fops) {
5610                                 fput(file);
5611                                 err = -EBADF;
5612                                 break;
5613                         }
5614                         table->files[index] = file;
5615                         err = io_sqe_file_register(ctx, file, i);
5616                         if (err)
5617                                 break;
5618                 }
5619                 nr_args--;
5620                 done++;
5621                 up->offset++;
5622         }
5623
5624         if (ref_switch && !test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5625                 percpu_ref_put(&data->refs);
5626                 percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch);
5627         }
5628
5629         return done ? done : err;
5630 }
5631 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
5632                                unsigned nr_args)
5633 {
5634         struct io_uring_files_update up;
5635
5636         if (!ctx->file_data)
5637                 return -ENXIO;
5638         if (!nr_args)
5639                 return -EINVAL;
5640         if (copy_from_user(&up, arg, sizeof(up)))
5641                 return -EFAULT;
5642         if (up.resv)
5643                 return -EINVAL;
5644
5645         return __io_sqe_files_update(ctx, &up, nr_args);
5646 }
5647
5648 static void io_put_work(struct io_wq_work *work)
5649 {
5650         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5651
5652         io_put_req(req);
5653 }
5654
5655 static void io_get_work(struct io_wq_work *work)
5656 {
5657         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5658
5659         refcount_inc(&req->refs);
5660 }
5661
5662 static int io_sq_offload_start(struct io_ring_ctx *ctx,
5663                                struct io_uring_params *p)
5664 {
5665         struct io_wq_data data;
5666         unsigned concurrency;
5667         int ret;
5668
5669         init_waitqueue_head(&ctx->sqo_wait);
5670         mmgrab(current->mm);
5671         ctx->sqo_mm = current->mm;
5672
5673         if (ctx->flags & IORING_SETUP_SQPOLL) {
5674                 ret = -EPERM;
5675                 if (!capable(CAP_SYS_ADMIN))
5676                         goto err;
5677
5678                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
5679                 if (!ctx->sq_thread_idle)
5680                         ctx->sq_thread_idle = HZ;
5681
5682                 if (p->flags & IORING_SETUP_SQ_AFF) {
5683                         int cpu = p->sq_thread_cpu;
5684
5685                         ret = -EINVAL;
5686                         if (cpu >= nr_cpu_ids)
5687                                 goto err;
5688                         if (!cpu_online(cpu))
5689                                 goto err;
5690
5691                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
5692                                                         ctx, cpu,
5693                                                         "io_uring-sq");
5694                 } else {
5695                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
5696                                                         "io_uring-sq");
5697                 }
5698                 if (IS_ERR(ctx->sqo_thread)) {
5699                         ret = PTR_ERR(ctx->sqo_thread);
5700                         ctx->sqo_thread = NULL;
5701                         goto err;
5702                 }
5703                 wake_up_process(ctx->sqo_thread);
5704         } else if (p->flags & IORING_SETUP_SQ_AFF) {
5705                 /* Can't have SQ_AFF without SQPOLL */
5706                 ret = -EINVAL;
5707                 goto err;
5708         }
5709
5710         data.mm = ctx->sqo_mm;
5711         data.user = ctx->user;
5712         data.creds = ctx->creds;
5713         data.get_work = io_get_work;
5714         data.put_work = io_put_work;
5715
5716         /* Do QD, or 4 * CPUS, whatever is smallest */
5717         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
5718         ctx->io_wq = io_wq_create(concurrency, &data);
5719         if (IS_ERR(ctx->io_wq)) {
5720                 ret = PTR_ERR(ctx->io_wq);
5721                 ctx->io_wq = NULL;
5722                 goto err;
5723         }
5724
5725         return 0;
5726 err:
5727         io_finish_async(ctx);
5728         mmdrop(ctx->sqo_mm);
5729         ctx->sqo_mm = NULL;
5730         return ret;
5731 }
5732
5733 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
5734 {
5735         atomic_long_sub(nr_pages, &user->locked_vm);
5736 }
5737
5738 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
5739 {
5740         unsigned long page_limit, cur_pages, new_pages;
5741
5742         /* Don't allow more pages than we can safely lock */
5743         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
5744
5745         do {
5746                 cur_pages = atomic_long_read(&user->locked_vm);
5747                 new_pages = cur_pages + nr_pages;
5748                 if (new_pages > page_limit)
5749                         return -ENOMEM;
5750         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
5751                                         new_pages) != cur_pages);
5752
5753         return 0;
5754 }
5755
5756 static void io_mem_free(void *ptr)
5757 {
5758         struct page *page;
5759
5760         if (!ptr)
5761                 return;
5762
5763         page = virt_to_head_page(ptr);
5764         if (put_page_testzero(page))
5765                 free_compound_page(page);
5766 }
5767
5768 static void *io_mem_alloc(size_t size)
5769 {
5770         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
5771                                 __GFP_NORETRY;
5772
5773         return (void *) __get_free_pages(gfp_flags, get_order(size));
5774 }
5775
5776 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
5777                                 size_t *sq_offset)
5778 {
5779         struct io_rings *rings;
5780         size_t off, sq_array_size;
5781
5782         off = struct_size(rings, cqes, cq_entries);
5783         if (off == SIZE_MAX)
5784                 return SIZE_MAX;
5785
5786 #ifdef CONFIG_SMP
5787         off = ALIGN(off, SMP_CACHE_BYTES);
5788         if (off == 0)
5789                 return SIZE_MAX;
5790 #endif
5791
5792         sq_array_size = array_size(sizeof(u32), sq_entries);
5793         if (sq_array_size == SIZE_MAX)
5794                 return SIZE_MAX;
5795
5796         if (check_add_overflow(off, sq_array_size, &off))
5797                 return SIZE_MAX;
5798
5799         if (sq_offset)
5800                 *sq_offset = off;
5801
5802         return off;
5803 }
5804
5805 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
5806 {
5807         size_t pages;
5808
5809         pages = (size_t)1 << get_order(
5810                 rings_size(sq_entries, cq_entries, NULL));
5811         pages += (size_t)1 << get_order(
5812                 array_size(sizeof(struct io_uring_sqe), sq_entries));
5813
5814         return pages;
5815 }
5816
5817 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
5818 {
5819         int i, j;
5820
5821         if (!ctx->user_bufs)
5822                 return -ENXIO;
5823
5824         for (i = 0; i < ctx->nr_user_bufs; i++) {
5825                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5826
5827                 for (j = 0; j < imu->nr_bvecs; j++)
5828                         put_user_page(imu->bvec[j].bv_page);
5829
5830                 if (ctx->account_mem)
5831                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
5832                 kvfree(imu->bvec);
5833                 imu->nr_bvecs = 0;
5834         }
5835
5836         kfree(ctx->user_bufs);
5837         ctx->user_bufs = NULL;
5838         ctx->nr_user_bufs = 0;
5839         return 0;
5840 }
5841
5842 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
5843                        void __user *arg, unsigned index)
5844 {
5845         struct iovec __user *src;
5846
5847 #ifdef CONFIG_COMPAT
5848         if (ctx->compat) {
5849                 struct compat_iovec __user *ciovs;
5850                 struct compat_iovec ciov;
5851
5852                 ciovs = (struct compat_iovec __user *) arg;
5853                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
5854                         return -EFAULT;
5855
5856                 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
5857                 dst->iov_len = ciov.iov_len;
5858                 return 0;
5859         }
5860 #endif
5861         src = (struct iovec __user *) arg;
5862         if (copy_from_user(dst, &src[index], sizeof(*dst)))
5863                 return -EFAULT;
5864         return 0;
5865 }
5866
5867 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
5868                                   unsigned nr_args)
5869 {
5870         struct vm_area_struct **vmas = NULL;
5871         struct page **pages = NULL;
5872         int i, j, got_pages = 0;
5873         int ret = -EINVAL;
5874
5875         if (ctx->user_bufs)
5876                 return -EBUSY;
5877         if (!nr_args || nr_args > UIO_MAXIOV)
5878                 return -EINVAL;
5879
5880         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
5881                                         GFP_KERNEL);
5882         if (!ctx->user_bufs)
5883                 return -ENOMEM;
5884
5885         for (i = 0; i < nr_args; i++) {
5886                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5887                 unsigned long off, start, end, ubuf;
5888                 int pret, nr_pages;
5889                 struct iovec iov;
5890                 size_t size;
5891
5892                 ret = io_copy_iov(ctx, &iov, arg, i);
5893                 if (ret)
5894                         goto err;
5895
5896                 /*
5897                  * Don't impose further limits on the size and buffer
5898                  * constraints here, we'll -EINVAL later when IO is
5899                  * submitted if they are wrong.
5900                  */
5901                 ret = -EFAULT;
5902                 if (!iov.iov_base || !iov.iov_len)
5903                         goto err;
5904
5905                 /* arbitrary limit, but we need something */
5906                 if (iov.iov_len > SZ_1G)
5907                         goto err;
5908
5909                 ubuf = (unsigned long) iov.iov_base;
5910                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
5911                 start = ubuf >> PAGE_SHIFT;
5912                 nr_pages = end - start;
5913
5914                 if (ctx->account_mem) {
5915                         ret = io_account_mem(ctx->user, nr_pages);
5916                         if (ret)
5917                                 goto err;
5918                 }
5919
5920                 ret = 0;
5921                 if (!pages || nr_pages > got_pages) {
5922                         kfree(vmas);
5923                         kfree(pages);
5924                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
5925                                                 GFP_KERNEL);
5926                         vmas = kvmalloc_array(nr_pages,
5927                                         sizeof(struct vm_area_struct *),
5928                                         GFP_KERNEL);
5929                         if (!pages || !vmas) {
5930                                 ret = -ENOMEM;
5931                                 if (ctx->account_mem)
5932                                         io_unaccount_mem(ctx->user, nr_pages);
5933                                 goto err;
5934                         }
5935                         got_pages = nr_pages;
5936                 }
5937
5938                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
5939                                                 GFP_KERNEL);
5940                 ret = -ENOMEM;
5941                 if (!imu->bvec) {
5942                         if (ctx->account_mem)
5943                                 io_unaccount_mem(ctx->user, nr_pages);
5944                         goto err;
5945                 }
5946
5947                 ret = 0;
5948                 down_read(&current->mm->mmap_sem);
5949                 pret = get_user_pages(ubuf, nr_pages,
5950                                       FOLL_WRITE | FOLL_LONGTERM,
5951                                       pages, vmas);
5952                 if (pret == nr_pages) {
5953                         /* don't support file backed memory */
5954                         for (j = 0; j < nr_pages; j++) {
5955                                 struct vm_area_struct *vma = vmas[j];
5956
5957                                 if (vma->vm_file &&
5958                                     !is_file_hugepages(vma->vm_file)) {
5959                                         ret = -EOPNOTSUPP;
5960                                         break;
5961                                 }
5962                         }
5963                 } else {
5964                         ret = pret < 0 ? pret : -EFAULT;
5965                 }
5966                 up_read(&current->mm->mmap_sem);
5967                 if (ret) {
5968                         /*
5969                          * if we did partial map, or found file backed vmas,
5970                          * release any pages we did get
5971                          */
5972                         if (pret > 0)
5973                                 put_user_pages(pages, pret);
5974                         if (ctx->account_mem)
5975                                 io_unaccount_mem(ctx->user, nr_pages);
5976                         kvfree(imu->bvec);
5977                         goto err;
5978                 }
5979
5980                 off = ubuf & ~PAGE_MASK;
5981                 size = iov.iov_len;
5982                 for (j = 0; j < nr_pages; j++) {
5983                         size_t vec_len;
5984
5985                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
5986                         imu->bvec[j].bv_page = pages[j];
5987                         imu->bvec[j].bv_len = vec_len;
5988                         imu->bvec[j].bv_offset = off;
5989                         off = 0;
5990                         size -= vec_len;
5991                 }
5992                 /* store original address for later verification */
5993                 imu->ubuf = ubuf;
5994                 imu->len = iov.iov_len;
5995                 imu->nr_bvecs = nr_pages;
5996
5997                 ctx->nr_user_bufs++;
5998         }
5999         kvfree(pages);
6000         kvfree(vmas);
6001         return 0;
6002 err:
6003         kvfree(pages);
6004         kvfree(vmas);
6005         io_sqe_buffer_unregister(ctx);
6006         return ret;
6007 }
6008
6009 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
6010 {
6011         __s32 __user *fds = arg;
6012         int fd;
6013
6014         if (ctx->cq_ev_fd)
6015                 return -EBUSY;
6016
6017         if (copy_from_user(&fd, fds, sizeof(*fds)))
6018                 return -EFAULT;
6019
6020         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
6021         if (IS_ERR(ctx->cq_ev_fd)) {
6022                 int ret = PTR_ERR(ctx->cq_ev_fd);
6023                 ctx->cq_ev_fd = NULL;
6024                 return ret;
6025         }
6026
6027         return 0;
6028 }
6029
6030 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
6031 {
6032         if (ctx->cq_ev_fd) {
6033                 eventfd_ctx_put(ctx->cq_ev_fd);
6034                 ctx->cq_ev_fd = NULL;
6035                 return 0;
6036         }
6037
6038         return -ENXIO;
6039 }
6040
6041 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
6042 {
6043         io_finish_async(ctx);
6044         if (ctx->sqo_mm)
6045                 mmdrop(ctx->sqo_mm);
6046
6047         io_iopoll_reap_events(ctx);
6048         io_sqe_buffer_unregister(ctx);
6049         io_sqe_files_unregister(ctx);
6050         io_eventfd_unregister(ctx);
6051
6052 #if defined(CONFIG_UNIX)
6053         if (ctx->ring_sock) {
6054                 ctx->ring_sock->file = NULL; /* so that iput() is called */
6055                 sock_release(ctx->ring_sock);
6056         }
6057 #endif
6058
6059         io_mem_free(ctx->rings);
6060         io_mem_free(ctx->sq_sqes);
6061
6062         percpu_ref_exit(&ctx->refs);
6063         if (ctx->account_mem)
6064                 io_unaccount_mem(ctx->user,
6065                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
6066         free_uid(ctx->user);
6067         put_cred(ctx->creds);
6068         kfree(ctx->completions);
6069         kfree(ctx->cancel_hash);
6070         kmem_cache_free(req_cachep, ctx->fallback_req);
6071         kfree(ctx);
6072 }
6073
6074 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
6075 {
6076         struct io_ring_ctx *ctx = file->private_data;
6077         __poll_t mask = 0;
6078
6079         poll_wait(file, &ctx->cq_wait, wait);
6080         /*
6081          * synchronizes with barrier from wq_has_sleeper call in
6082          * io_commit_cqring
6083          */
6084         smp_rmb();
6085         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
6086             ctx->rings->sq_ring_entries)
6087                 mask |= EPOLLOUT | EPOLLWRNORM;
6088         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
6089                 mask |= EPOLLIN | EPOLLRDNORM;
6090
6091         return mask;
6092 }
6093
6094 static int io_uring_fasync(int fd, struct file *file, int on)
6095 {
6096         struct io_ring_ctx *ctx = file->private_data;
6097
6098         return fasync_helper(fd, file, on, &ctx->cq_fasync);
6099 }
6100
6101 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
6102 {
6103         mutex_lock(&ctx->uring_lock);
6104         percpu_ref_kill(&ctx->refs);
6105         mutex_unlock(&ctx->uring_lock);
6106
6107         io_kill_timeouts(ctx);
6108         io_poll_remove_all(ctx);
6109
6110         if (ctx->io_wq)
6111                 io_wq_cancel_all(ctx->io_wq);
6112
6113         io_iopoll_reap_events(ctx);
6114         /* if we failed setting up the ctx, we might not have any rings */
6115         if (ctx->rings)
6116                 io_cqring_overflow_flush(ctx, true);
6117         wait_for_completion(&ctx->completions[0]);
6118         io_ring_ctx_free(ctx);
6119 }
6120
6121 static int io_uring_release(struct inode *inode, struct file *file)
6122 {
6123         struct io_ring_ctx *ctx = file->private_data;
6124
6125         file->private_data = NULL;
6126         io_ring_ctx_wait_and_kill(ctx);
6127         return 0;
6128 }
6129
6130 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
6131                                   struct files_struct *files)
6132 {
6133         struct io_kiocb *req;
6134         DEFINE_WAIT(wait);
6135
6136         while (!list_empty_careful(&ctx->inflight_list)) {
6137                 struct io_kiocb *cancel_req = NULL;
6138
6139                 spin_lock_irq(&ctx->inflight_lock);
6140                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
6141                         if (req->work.files != files)
6142                                 continue;
6143                         /* req is being completed, ignore */
6144                         if (!refcount_inc_not_zero(&req->refs))
6145                                 continue;
6146                         cancel_req = req;
6147                         break;
6148                 }
6149                 if (cancel_req)
6150                         prepare_to_wait(&ctx->inflight_wait, &wait,
6151                                                 TASK_UNINTERRUPTIBLE);
6152                 spin_unlock_irq(&ctx->inflight_lock);
6153
6154                 /* We need to keep going until we don't find a matching req */
6155                 if (!cancel_req)
6156                         break;
6157
6158                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
6159                 io_put_req(cancel_req);
6160                 schedule();
6161         }
6162         finish_wait(&ctx->inflight_wait, &wait);
6163 }
6164
6165 static int io_uring_flush(struct file *file, void *data)
6166 {
6167         struct io_ring_ctx *ctx = file->private_data;
6168
6169         io_uring_cancel_files(ctx, data);
6170         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
6171                 io_cqring_overflow_flush(ctx, true);
6172                 io_wq_cancel_all(ctx->io_wq);
6173         }
6174         return 0;
6175 }
6176
6177 static void *io_uring_validate_mmap_request(struct file *file,
6178                                             loff_t pgoff, size_t sz)
6179 {
6180         struct io_ring_ctx *ctx = file->private_data;
6181         loff_t offset = pgoff << PAGE_SHIFT;
6182         struct page *page;
6183         void *ptr;
6184
6185         switch (offset) {
6186         case IORING_OFF_SQ_RING:
6187         case IORING_OFF_CQ_RING:
6188                 ptr = ctx->rings;
6189                 break;
6190         case IORING_OFF_SQES:
6191                 ptr = ctx->sq_sqes;
6192                 break;
6193         default:
6194                 return ERR_PTR(-EINVAL);
6195         }
6196
6197         page = virt_to_head_page(ptr);
6198         if (sz > page_size(page))
6199                 return ERR_PTR(-EINVAL);
6200
6201         return ptr;
6202 }
6203
6204 #ifdef CONFIG_MMU
6205
6206 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6207 {
6208         size_t sz = vma->vm_end - vma->vm_start;
6209         unsigned long pfn;
6210         void *ptr;
6211
6212         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
6213         if (IS_ERR(ptr))
6214                 return PTR_ERR(ptr);
6215
6216         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
6217         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
6218 }
6219
6220 #else /* !CONFIG_MMU */
6221
6222 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6223 {
6224         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
6225 }
6226
6227 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
6228 {
6229         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
6230 }
6231
6232 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
6233         unsigned long addr, unsigned long len,
6234         unsigned long pgoff, unsigned long flags)
6235 {
6236         void *ptr;
6237
6238         ptr = io_uring_validate_mmap_request(file, pgoff, len);
6239         if (IS_ERR(ptr))
6240                 return PTR_ERR(ptr);
6241
6242         return (unsigned long) ptr;
6243 }
6244
6245 #endif /* !CONFIG_MMU */
6246
6247 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
6248                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
6249                 size_t, sigsz)
6250 {
6251         struct io_ring_ctx *ctx;
6252         long ret = -EBADF;
6253         int submitted = 0;
6254         struct fd f;
6255
6256         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
6257                 return -EINVAL;
6258
6259         f = fdget(fd);
6260         if (!f.file)
6261                 return -EBADF;
6262
6263         ret = -EOPNOTSUPP;
6264         if (f.file->f_op != &io_uring_fops)
6265                 goto out_fput;
6266
6267         ret = -ENXIO;
6268         ctx = f.file->private_data;
6269         if (!percpu_ref_tryget(&ctx->refs))
6270                 goto out_fput;
6271
6272         /*
6273          * For SQ polling, the thread will do all submissions and completions.
6274          * Just return the requested submit count, and wake the thread if
6275          * we were asked to.
6276          */
6277         ret = 0;
6278         if (ctx->flags & IORING_SETUP_SQPOLL) {
6279                 if (!list_empty_careful(&ctx->cq_overflow_list))
6280                         io_cqring_overflow_flush(ctx, false);
6281                 if (flags & IORING_ENTER_SQ_WAKEUP)
6282                         wake_up(&ctx->sqo_wait);
6283                 submitted = to_submit;
6284         } else if (to_submit) {
6285                 struct mm_struct *cur_mm;
6286
6287                 if (current->mm != ctx->sqo_mm ||
6288                     current_cred() != ctx->creds) {
6289                         ret = -EPERM;
6290                         goto out;
6291                 }
6292
6293                 mutex_lock(&ctx->uring_lock);
6294                 /* already have mm, so io_submit_sqes() won't try to grab it */
6295                 cur_mm = ctx->sqo_mm;
6296                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
6297                                            &cur_mm, false);
6298                 mutex_unlock(&ctx->uring_lock);
6299
6300                 if (submitted != to_submit)
6301                         goto out;
6302         }
6303         if (flags & IORING_ENTER_GETEVENTS) {
6304                 unsigned nr_events = 0;
6305
6306                 min_complete = min(min_complete, ctx->cq_entries);
6307
6308                 if (ctx->flags & IORING_SETUP_IOPOLL) {
6309                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
6310                 } else {
6311                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
6312                 }
6313         }
6314
6315 out:
6316         percpu_ref_put(&ctx->refs);
6317 out_fput:
6318         fdput(f);
6319         return submitted ? submitted : ret;
6320 }
6321
6322 static const struct file_operations io_uring_fops = {
6323         .release        = io_uring_release,
6324         .flush          = io_uring_flush,
6325         .mmap           = io_uring_mmap,
6326 #ifndef CONFIG_MMU
6327         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
6328         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
6329 #endif
6330         .poll           = io_uring_poll,
6331         .fasync         = io_uring_fasync,
6332 };
6333
6334 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
6335                                   struct io_uring_params *p)
6336 {
6337         struct io_rings *rings;
6338         size_t size, sq_array_offset;
6339
6340         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
6341         if (size == SIZE_MAX)
6342                 return -EOVERFLOW;
6343
6344         rings = io_mem_alloc(size);
6345         if (!rings)
6346                 return -ENOMEM;
6347
6348         ctx->rings = rings;
6349         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
6350         rings->sq_ring_mask = p->sq_entries - 1;
6351         rings->cq_ring_mask = p->cq_entries - 1;
6352         rings->sq_ring_entries = p->sq_entries;
6353         rings->cq_ring_entries = p->cq_entries;
6354         ctx->sq_mask = rings->sq_ring_mask;
6355         ctx->cq_mask = rings->cq_ring_mask;
6356         ctx->sq_entries = rings->sq_ring_entries;
6357         ctx->cq_entries = rings->cq_ring_entries;
6358
6359         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
6360         if (size == SIZE_MAX) {
6361                 io_mem_free(ctx->rings);
6362                 ctx->rings = NULL;
6363                 return -EOVERFLOW;
6364         }
6365
6366         ctx->sq_sqes = io_mem_alloc(size);
6367         if (!ctx->sq_sqes) {
6368                 io_mem_free(ctx->rings);
6369                 ctx->rings = NULL;
6370                 return -ENOMEM;
6371         }
6372
6373         return 0;
6374 }
6375
6376 /*
6377  * Allocate an anonymous fd, this is what constitutes the application
6378  * visible backing of an io_uring instance. The application mmaps this
6379  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
6380  * we have to tie this fd to a socket for file garbage collection purposes.
6381  */
6382 static int io_uring_get_fd(struct io_ring_ctx *ctx)
6383 {
6384         struct file *file;
6385         int ret;
6386
6387 #if defined(CONFIG_UNIX)
6388         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
6389                                 &ctx->ring_sock);
6390         if (ret)
6391                 return ret;
6392 #endif
6393
6394         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
6395         if (ret < 0)
6396                 goto err;
6397
6398         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
6399                                         O_RDWR | O_CLOEXEC);
6400         if (IS_ERR(file)) {
6401                 put_unused_fd(ret);
6402                 ret = PTR_ERR(file);
6403                 goto err;
6404         }
6405
6406 #if defined(CONFIG_UNIX)
6407         ctx->ring_sock->file = file;
6408 #endif
6409         fd_install(ret, file);
6410         return ret;
6411 err:
6412 #if defined(CONFIG_UNIX)
6413         sock_release(ctx->ring_sock);
6414         ctx->ring_sock = NULL;
6415 #endif
6416         return ret;
6417 }
6418
6419 static int io_uring_create(unsigned entries, struct io_uring_params *p)
6420 {
6421         struct user_struct *user = NULL;
6422         struct io_ring_ctx *ctx;
6423         bool account_mem;
6424         int ret;
6425
6426         if (!entries)
6427                 return -EINVAL;
6428         if (entries > IORING_MAX_ENTRIES) {
6429                 if (!(p->flags & IORING_SETUP_CLAMP))
6430                         return -EINVAL;
6431                 entries = IORING_MAX_ENTRIES;
6432         }
6433
6434         /*
6435          * Use twice as many entries for the CQ ring. It's possible for the
6436          * application to drive a higher depth than the size of the SQ ring,
6437          * since the sqes are only used at submission time. This allows for
6438          * some flexibility in overcommitting a bit. If the application has
6439          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
6440          * of CQ ring entries manually.
6441          */
6442         p->sq_entries = roundup_pow_of_two(entries);
6443         if (p->flags & IORING_SETUP_CQSIZE) {
6444                 /*
6445                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
6446                  * to a power-of-two, if it isn't already. We do NOT impose
6447                  * any cq vs sq ring sizing.
6448                  */
6449                 if (p->cq_entries < p->sq_entries)
6450                         return -EINVAL;
6451                 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
6452                         if (!(p->flags & IORING_SETUP_CLAMP))
6453                                 return -EINVAL;
6454                         p->cq_entries = IORING_MAX_CQ_ENTRIES;
6455                 }
6456                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
6457         } else {
6458                 p->cq_entries = 2 * p->sq_entries;
6459         }
6460
6461         user = get_uid(current_user());
6462         account_mem = !capable(CAP_IPC_LOCK);
6463
6464         if (account_mem) {
6465                 ret = io_account_mem(user,
6466                                 ring_pages(p->sq_entries, p->cq_entries));
6467                 if (ret) {
6468                         free_uid(user);
6469                         return ret;
6470                 }
6471         }
6472
6473         ctx = io_ring_ctx_alloc(p);
6474         if (!ctx) {
6475                 if (account_mem)
6476                         io_unaccount_mem(user, ring_pages(p->sq_entries,
6477                                                                 p->cq_entries));
6478                 free_uid(user);
6479                 return -ENOMEM;
6480         }
6481         ctx->compat = in_compat_syscall();
6482         ctx->account_mem = account_mem;
6483         ctx->user = user;
6484         ctx->creds = get_current_cred();
6485
6486         ret = io_allocate_scq_urings(ctx, p);
6487         if (ret)
6488                 goto err;
6489
6490         ret = io_sq_offload_start(ctx, p);
6491         if (ret)
6492                 goto err;
6493
6494         memset(&p->sq_off, 0, sizeof(p->sq_off));
6495         p->sq_off.head = offsetof(struct io_rings, sq.head);
6496         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
6497         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
6498         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
6499         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
6500         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
6501         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
6502
6503         memset(&p->cq_off, 0, sizeof(p->cq_off));
6504         p->cq_off.head = offsetof(struct io_rings, cq.head);
6505         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
6506         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
6507         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
6508         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
6509         p->cq_off.cqes = offsetof(struct io_rings, cqes);
6510
6511         /*
6512          * Install ring fd as the very last thing, so we don't risk someone
6513          * having closed it before we finish setup
6514          */
6515         ret = io_uring_get_fd(ctx);
6516         if (ret < 0)
6517                 goto err;
6518
6519         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
6520                         IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS;
6521         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
6522         return ret;
6523 err:
6524         io_ring_ctx_wait_and_kill(ctx);
6525         return ret;
6526 }
6527
6528 /*
6529  * Sets up an aio uring context, and returns the fd. Applications asks for a
6530  * ring size, we return the actual sq/cq ring sizes (among other things) in the
6531  * params structure passed in.
6532  */
6533 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
6534 {
6535         struct io_uring_params p;
6536         long ret;
6537         int i;
6538
6539         if (copy_from_user(&p, params, sizeof(p)))
6540                 return -EFAULT;
6541         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
6542                 if (p.resv[i])
6543                         return -EINVAL;
6544         }
6545
6546         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
6547                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
6548                         IORING_SETUP_CLAMP))
6549                 return -EINVAL;
6550
6551         ret = io_uring_create(entries, &p);
6552         if (ret < 0)
6553                 return ret;
6554
6555         if (copy_to_user(params, &p, sizeof(p)))
6556                 return -EFAULT;
6557
6558         return ret;
6559 }
6560
6561 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
6562                 struct io_uring_params __user *, params)
6563 {
6564         return io_uring_setup(entries, params);
6565 }
6566
6567 static int io_probe(struct io_ring_ctx *ctx, void __user *arg, unsigned nr_args)
6568 {
6569         struct io_uring_probe *p;
6570         size_t size;
6571         int i, ret;
6572
6573         size = struct_size(p, ops, nr_args);
6574         if (size == SIZE_MAX)
6575                 return -EOVERFLOW;
6576         p = kzalloc(size, GFP_KERNEL);
6577         if (!p)
6578                 return -ENOMEM;
6579
6580         ret = -EFAULT;
6581         if (copy_from_user(p, arg, size))
6582                 goto out;
6583         ret = -EINVAL;
6584         if (memchr_inv(p, 0, size))
6585                 goto out;
6586
6587         p->last_op = IORING_OP_LAST - 1;
6588         if (nr_args > IORING_OP_LAST)
6589                 nr_args = IORING_OP_LAST;
6590
6591         for (i = 0; i < nr_args; i++) {
6592                 p->ops[i].op = i;
6593                 if (!io_op_defs[i].not_supported)
6594                         p->ops[i].flags = IO_URING_OP_SUPPORTED;
6595         }
6596         p->ops_len = i;
6597
6598         ret = 0;
6599         if (copy_to_user(arg, p, size))
6600                 ret = -EFAULT;
6601 out:
6602         kfree(p);
6603         return ret;
6604 }
6605
6606 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
6607                                void __user *arg, unsigned nr_args)
6608         __releases(ctx->uring_lock)
6609         __acquires(ctx->uring_lock)
6610 {
6611         int ret;
6612
6613         /*
6614          * We're inside the ring mutex, if the ref is already dying, then
6615          * someone else killed the ctx or is already going through
6616          * io_uring_register().
6617          */
6618         if (percpu_ref_is_dying(&ctx->refs))
6619                 return -ENXIO;
6620
6621         if (opcode != IORING_UNREGISTER_FILES &&
6622             opcode != IORING_REGISTER_FILES_UPDATE &&
6623             opcode != IORING_REGISTER_PROBE) {
6624                 percpu_ref_kill(&ctx->refs);
6625
6626                 /*
6627                  * Drop uring mutex before waiting for references to exit. If
6628                  * another thread is currently inside io_uring_enter() it might
6629                  * need to grab the uring_lock to make progress. If we hold it
6630                  * here across the drain wait, then we can deadlock. It's safe
6631                  * to drop the mutex here, since no new references will come in
6632                  * after we've killed the percpu ref.
6633                  */
6634                 mutex_unlock(&ctx->uring_lock);
6635                 ret = wait_for_completion_interruptible(&ctx->completions[0]);
6636                 mutex_lock(&ctx->uring_lock);
6637                 if (ret) {
6638                         percpu_ref_resurrect(&ctx->refs);
6639                         ret = -EINTR;
6640                         goto out;
6641                 }
6642         }
6643
6644         switch (opcode) {
6645         case IORING_REGISTER_BUFFERS:
6646                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
6647                 break;
6648         case IORING_UNREGISTER_BUFFERS:
6649                 ret = -EINVAL;
6650                 if (arg || nr_args)
6651                         break;
6652                 ret = io_sqe_buffer_unregister(ctx);
6653                 break;
6654         case IORING_REGISTER_FILES:
6655                 ret = io_sqe_files_register(ctx, arg, nr_args);
6656                 break;
6657         case IORING_UNREGISTER_FILES:
6658                 ret = -EINVAL;
6659                 if (arg || nr_args)
6660                         break;
6661                 ret = io_sqe_files_unregister(ctx);
6662                 break;
6663         case IORING_REGISTER_FILES_UPDATE:
6664                 ret = io_sqe_files_update(ctx, arg, nr_args);
6665                 break;
6666         case IORING_REGISTER_EVENTFD:
6667         case IORING_REGISTER_EVENTFD_ASYNC:
6668                 ret = -EINVAL;
6669                 if (nr_args != 1)
6670                         break;
6671                 ret = io_eventfd_register(ctx, arg);
6672                 if (ret)
6673                         break;
6674                 if (opcode == IORING_REGISTER_EVENTFD_ASYNC)
6675                         ctx->eventfd_async = 1;
6676                 else
6677                         ctx->eventfd_async = 0;
6678                 break;
6679         case IORING_UNREGISTER_EVENTFD:
6680                 ret = -EINVAL;
6681                 if (arg || nr_args)
6682                         break;
6683                 ret = io_eventfd_unregister(ctx);
6684                 break;
6685         case IORING_REGISTER_PROBE:
6686                 ret = -EINVAL;
6687                 if (!arg || nr_args > 256)
6688                         break;
6689                 ret = io_probe(ctx, arg, nr_args);
6690                 break;
6691         default:
6692                 ret = -EINVAL;
6693                 break;
6694         }
6695
6696
6697         if (opcode != IORING_UNREGISTER_FILES &&
6698             opcode != IORING_REGISTER_FILES_UPDATE &&
6699             opcode != IORING_REGISTER_PROBE) {
6700                 /* bring the ctx back to life */
6701                 percpu_ref_reinit(&ctx->refs);
6702 out:
6703                 reinit_completion(&ctx->completions[0]);
6704         }
6705         return ret;
6706 }
6707
6708 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
6709                 void __user *, arg, unsigned int, nr_args)
6710 {
6711         struct io_ring_ctx *ctx;
6712         long ret = -EBADF;
6713         struct fd f;
6714
6715         f = fdget(fd);
6716         if (!f.file)
6717                 return -EBADF;
6718
6719         ret = -EOPNOTSUPP;
6720         if (f.file->f_op != &io_uring_fops)
6721                 goto out_fput;
6722
6723         ctx = f.file->private_data;
6724
6725         mutex_lock(&ctx->uring_lock);
6726         ret = __io_uring_register(ctx, opcode, arg, nr_args);
6727         mutex_unlock(&ctx->uring_lock);
6728         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
6729                                                         ctx->cq_ev_fd != NULL, ret);
6730 out_fput:
6731         fdput(f);
6732         return ret;
6733 }
6734
6735 static int __init io_uring_init(void)
6736 {
6737         BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
6738         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
6739         return 0;
6740 };
6741 __initcall(io_uring_init);