io_uring: remove extra check in __io_commit_cqring
[linux-2.6-microblaze.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73 #include <linux/namei.h>
74 #include <linux/fsnotify.h>
75 #include <linux/fadvise.h>
76
77 #define CREATE_TRACE_POINTS
78 #include <trace/events/io_uring.h>
79
80 #include <uapi/linux/io_uring.h>
81
82 #include "internal.h"
83 #include "io-wq.h"
84
85 #define IORING_MAX_ENTRIES      32768
86 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
87
88 /*
89  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
90  */
91 #define IORING_FILE_TABLE_SHIFT 9
92 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
93 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
94 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
95
96 struct io_uring {
97         u32 head ____cacheline_aligned_in_smp;
98         u32 tail ____cacheline_aligned_in_smp;
99 };
100
101 /*
102  * This data is shared with the application through the mmap at offsets
103  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
104  *
105  * The offsets to the member fields are published through struct
106  * io_sqring_offsets when calling io_uring_setup.
107  */
108 struct io_rings {
109         /*
110          * Head and tail offsets into the ring; the offsets need to be
111          * masked to get valid indices.
112          *
113          * The kernel controls head of the sq ring and the tail of the cq ring,
114          * and the application controls tail of the sq ring and the head of the
115          * cq ring.
116          */
117         struct io_uring         sq, cq;
118         /*
119          * Bitmasks to apply to head and tail offsets (constant, equals
120          * ring_entries - 1)
121          */
122         u32                     sq_ring_mask, cq_ring_mask;
123         /* Ring sizes (constant, power of 2) */
124         u32                     sq_ring_entries, cq_ring_entries;
125         /*
126          * Number of invalid entries dropped by the kernel due to
127          * invalid index stored in array
128          *
129          * Written by the kernel, shouldn't be modified by the
130          * application (i.e. get number of "new events" by comparing to
131          * cached value).
132          *
133          * After a new SQ head value was read by the application this
134          * counter includes all submissions that were dropped reaching
135          * the new SQ head (and possibly more).
136          */
137         u32                     sq_dropped;
138         /*
139          * Runtime flags
140          *
141          * Written by the kernel, shouldn't be modified by the
142          * application.
143          *
144          * The application needs a full memory barrier before checking
145          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
146          */
147         u32                     sq_flags;
148         /*
149          * Number of completion events lost because the queue was full;
150          * this should be avoided by the application by making sure
151          * there are not more requests pending than there is space in
152          * the completion queue.
153          *
154          * Written by the kernel, shouldn't be modified by the
155          * application (i.e. get number of "new events" by comparing to
156          * cached value).
157          *
158          * As completion events come in out of order this counter is not
159          * ordered with any other data.
160          */
161         u32                     cq_overflow;
162         /*
163          * Ring buffer of completion events.
164          *
165          * The kernel writes completion events fresh every time they are
166          * produced, so the application is allowed to modify pending
167          * entries.
168          */
169         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
170 };
171
172 struct io_mapped_ubuf {
173         u64             ubuf;
174         size_t          len;
175         struct          bio_vec *bvec;
176         unsigned int    nr_bvecs;
177 };
178
179 struct fixed_file_table {
180         struct file             **files;
181 };
182
183 enum {
184         FFD_F_ATOMIC,
185 };
186
187 struct fixed_file_data {
188         struct fixed_file_table         *table;
189         struct io_ring_ctx              *ctx;
190
191         struct percpu_ref               refs;
192         struct llist_head               put_llist;
193         unsigned long                   state;
194         struct work_struct              ref_work;
195         struct completion               done;
196 };
197
198 struct io_ring_ctx {
199         struct {
200                 struct percpu_ref       refs;
201         } ____cacheline_aligned_in_smp;
202
203         struct {
204                 unsigned int            flags;
205                 int                     compat: 1;
206                 int                     account_mem: 1;
207                 int                     cq_overflow_flushed: 1;
208                 int                     drain_next: 1;
209                 int                     eventfd_async: 1;
210
211                 /*
212                  * Ring buffer of indices into array of io_uring_sqe, which is
213                  * mmapped by the application using the IORING_OFF_SQES offset.
214                  *
215                  * This indirection could e.g. be used to assign fixed
216                  * io_uring_sqe entries to operations and only submit them to
217                  * the queue when needed.
218                  *
219                  * The kernel modifies neither the indices array nor the entries
220                  * array.
221                  */
222                 u32                     *sq_array;
223                 unsigned                cached_sq_head;
224                 unsigned                sq_entries;
225                 unsigned                sq_mask;
226                 unsigned                sq_thread_idle;
227                 unsigned                cached_sq_dropped;
228                 atomic_t                cached_cq_overflow;
229                 unsigned long           sq_check_overflow;
230
231                 struct list_head        defer_list;
232                 struct list_head        timeout_list;
233                 struct list_head        cq_overflow_list;
234
235                 wait_queue_head_t       inflight_wait;
236                 struct io_uring_sqe     *sq_sqes;
237         } ____cacheline_aligned_in_smp;
238
239         struct io_rings *rings;
240
241         /* IO offload */
242         struct io_wq            *io_wq;
243         struct task_struct      *sqo_thread;    /* if using sq thread polling */
244         struct mm_struct        *sqo_mm;
245         wait_queue_head_t       sqo_wait;
246
247         /*
248          * If used, fixed file set. Writers must ensure that ->refs is dead,
249          * readers must ensure that ->refs is alive as long as the file* is
250          * used. Only updated through io_uring_register(2).
251          */
252         struct fixed_file_data  *file_data;
253         unsigned                nr_user_files;
254
255         /* if used, fixed mapped user buffers */
256         unsigned                nr_user_bufs;
257         struct io_mapped_ubuf   *user_bufs;
258
259         struct user_struct      *user;
260
261         const struct cred       *creds;
262
263         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
264         struct completion       *completions;
265
266         /* if all else fails... */
267         struct io_kiocb         *fallback_req;
268
269 #if defined(CONFIG_UNIX)
270         struct socket           *ring_sock;
271 #endif
272
273         struct {
274                 unsigned                cached_cq_tail;
275                 unsigned                cq_entries;
276                 unsigned                cq_mask;
277                 atomic_t                cq_timeouts;
278                 unsigned long           cq_check_overflow;
279                 struct wait_queue_head  cq_wait;
280                 struct fasync_struct    *cq_fasync;
281                 struct eventfd_ctx      *cq_ev_fd;
282         } ____cacheline_aligned_in_smp;
283
284         struct {
285                 struct mutex            uring_lock;
286                 wait_queue_head_t       wait;
287         } ____cacheline_aligned_in_smp;
288
289         struct {
290                 spinlock_t              completion_lock;
291                 struct llist_head       poll_llist;
292
293                 /*
294                  * ->poll_list is protected by the ctx->uring_lock for
295                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
296                  * For SQPOLL, only the single threaded io_sq_thread() will
297                  * manipulate the list, hence no extra locking is needed there.
298                  */
299                 struct list_head        poll_list;
300                 struct hlist_head       *cancel_hash;
301                 unsigned                cancel_hash_bits;
302                 bool                    poll_multi_file;
303
304                 spinlock_t              inflight_lock;
305                 struct list_head        inflight_list;
306         } ____cacheline_aligned_in_smp;
307 };
308
309 /*
310  * First field must be the file pointer in all the
311  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
312  */
313 struct io_poll_iocb {
314         struct file                     *file;
315         union {
316                 struct wait_queue_head  *head;
317                 u64                     addr;
318         };
319         __poll_t                        events;
320         bool                            done;
321         bool                            canceled;
322         struct wait_queue_entry         wait;
323 };
324
325 struct io_close {
326         struct file                     *file;
327         struct file                     *put_file;
328         int                             fd;
329 };
330
331 struct io_timeout_data {
332         struct io_kiocb                 *req;
333         struct hrtimer                  timer;
334         struct timespec64               ts;
335         enum hrtimer_mode               mode;
336         u32                             seq_offset;
337 };
338
339 struct io_accept {
340         struct file                     *file;
341         struct sockaddr __user          *addr;
342         int __user                      *addr_len;
343         int                             flags;
344 };
345
346 struct io_sync {
347         struct file                     *file;
348         loff_t                          len;
349         loff_t                          off;
350         int                             flags;
351         int                             mode;
352 };
353
354 struct io_cancel {
355         struct file                     *file;
356         u64                             addr;
357 };
358
359 struct io_timeout {
360         struct file                     *file;
361         u64                             addr;
362         int                             flags;
363         unsigned                        count;
364 };
365
366 struct io_rw {
367         /* NOTE: kiocb has the file as the first member, so don't do it here */
368         struct kiocb                    kiocb;
369         u64                             addr;
370         u64                             len;
371 };
372
373 struct io_connect {
374         struct file                     *file;
375         struct sockaddr __user          *addr;
376         int                             addr_len;
377 };
378
379 struct io_sr_msg {
380         struct file                     *file;
381         union {
382                 struct user_msghdr __user *msg;
383                 void __user             *buf;
384         };
385         int                             msg_flags;
386         size_t                          len;
387 };
388
389 struct io_open {
390         struct file                     *file;
391         int                             dfd;
392         union {
393                 unsigned                mask;
394         };
395         struct filename                 *filename;
396         struct statx __user             *buffer;
397         struct open_how                 how;
398 };
399
400 struct io_files_update {
401         struct file                     *file;
402         u64                             arg;
403         u32                             nr_args;
404         u32                             offset;
405 };
406
407 struct io_fadvise {
408         struct file                     *file;
409         u64                             offset;
410         u32                             len;
411         u32                             advice;
412 };
413
414 struct io_madvise {
415         struct file                     *file;
416         u64                             addr;
417         u32                             len;
418         u32                             advice;
419 };
420
421 struct io_async_connect {
422         struct sockaddr_storage         address;
423 };
424
425 struct io_async_msghdr {
426         struct iovec                    fast_iov[UIO_FASTIOV];
427         struct iovec                    *iov;
428         struct sockaddr __user          *uaddr;
429         struct msghdr                   msg;
430 };
431
432 struct io_async_rw {
433         struct iovec                    fast_iov[UIO_FASTIOV];
434         struct iovec                    *iov;
435         ssize_t                         nr_segs;
436         ssize_t                         size;
437 };
438
439 struct io_async_open {
440         struct filename                 *filename;
441 };
442
443 struct io_async_ctx {
444         union {
445                 struct io_async_rw      rw;
446                 struct io_async_msghdr  msg;
447                 struct io_async_connect connect;
448                 struct io_timeout_data  timeout;
449                 struct io_async_open    open;
450         };
451 };
452
453 /*
454  * NOTE! Each of the iocb union members has the file pointer
455  * as the first entry in their struct definition. So you can
456  * access the file pointer through any of the sub-structs,
457  * or directly as just 'ki_filp' in this struct.
458  */
459 struct io_kiocb {
460         union {
461                 struct file             *file;
462                 struct io_rw            rw;
463                 struct io_poll_iocb     poll;
464                 struct io_accept        accept;
465                 struct io_sync          sync;
466                 struct io_cancel        cancel;
467                 struct io_timeout       timeout;
468                 struct io_connect       connect;
469                 struct io_sr_msg        sr_msg;
470                 struct io_open          open;
471                 struct io_close         close;
472                 struct io_files_update  files_update;
473                 struct io_fadvise       fadvise;
474                 struct io_madvise       madvise;
475         };
476
477         struct io_async_ctx             *io;
478         union {
479                 /*
480                  * ring_file is only used in the submission path, and
481                  * llist_node is only used for poll deferred completions
482                  */
483                 struct file             *ring_file;
484                 struct llist_node       llist_node;
485         };
486         int                             ring_fd;
487         bool                            has_user;
488         bool                            in_async;
489         bool                            needs_fixed_file;
490         u8                              opcode;
491
492         struct io_ring_ctx      *ctx;
493         union {
494                 struct list_head        list;
495                 struct hlist_node       hash_node;
496         };
497         struct list_head        link_list;
498         unsigned int            flags;
499         refcount_t              refs;
500 #define REQ_F_NOWAIT            1       /* must not punt to workers */
501 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
502 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
503 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
504 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
505 #define REQ_F_IO_DRAINED        32      /* drain done */
506 #define REQ_F_LINK              64      /* linked sqes */
507 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
508 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
509 #define REQ_F_TIMEOUT           1024    /* timeout request */
510 #define REQ_F_ISREG             2048    /* regular file */
511 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
512 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
513 #define REQ_F_INFLIGHT          16384   /* on inflight list */
514 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
515 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
516 #define REQ_F_FORCE_ASYNC       131072  /* IOSQE_ASYNC */
517 #define REQ_F_CUR_POS           262144  /* read/write uses file position */
518         u64                     user_data;
519         u32                     result;
520         u32                     sequence;
521
522         struct list_head        inflight_entry;
523
524         struct io_wq_work       work;
525 };
526
527 #define IO_PLUG_THRESHOLD               2
528 #define IO_IOPOLL_BATCH                 8
529
530 struct io_submit_state {
531         struct blk_plug         plug;
532
533         /*
534          * io_kiocb alloc cache
535          */
536         void                    *reqs[IO_IOPOLL_BATCH];
537         unsigned                int free_reqs;
538         unsigned                int cur_req;
539
540         /*
541          * File reference cache
542          */
543         struct file             *file;
544         unsigned int            fd;
545         unsigned int            has_refs;
546         unsigned int            used_refs;
547         unsigned int            ios_left;
548 };
549
550 struct io_op_def {
551         /* needs req->io allocated for deferral/async */
552         unsigned                async_ctx : 1;
553         /* needs current->mm setup, does mm access */
554         unsigned                needs_mm : 1;
555         /* needs req->file assigned */
556         unsigned                needs_file : 1;
557         /* needs req->file assigned IFF fd is >= 0 */
558         unsigned                fd_non_neg : 1;
559         /* hash wq insertion if file is a regular file */
560         unsigned                hash_reg_file : 1;
561         /* unbound wq insertion if file is a non-regular file */
562         unsigned                unbound_nonreg_file : 1;
563         /* opcode is not supported by this kernel */
564         unsigned                not_supported : 1;
565 };
566
567 static const struct io_op_def io_op_defs[] = {
568         {
569                 /* IORING_OP_NOP */
570         },
571         {
572                 /* IORING_OP_READV */
573                 .async_ctx              = 1,
574                 .needs_mm               = 1,
575                 .needs_file             = 1,
576                 .unbound_nonreg_file    = 1,
577         },
578         {
579                 /* IORING_OP_WRITEV */
580                 .async_ctx              = 1,
581                 .needs_mm               = 1,
582                 .needs_file             = 1,
583                 .hash_reg_file          = 1,
584                 .unbound_nonreg_file    = 1,
585         },
586         {
587                 /* IORING_OP_FSYNC */
588                 .needs_file             = 1,
589         },
590         {
591                 /* IORING_OP_READ_FIXED */
592                 .needs_file             = 1,
593                 .unbound_nonreg_file    = 1,
594         },
595         {
596                 /* IORING_OP_WRITE_FIXED */
597                 .needs_file             = 1,
598                 .hash_reg_file          = 1,
599                 .unbound_nonreg_file    = 1,
600         },
601         {
602                 /* IORING_OP_POLL_ADD */
603                 .needs_file             = 1,
604                 .unbound_nonreg_file    = 1,
605         },
606         {
607                 /* IORING_OP_POLL_REMOVE */
608         },
609         {
610                 /* IORING_OP_SYNC_FILE_RANGE */
611                 .needs_file             = 1,
612         },
613         {
614                 /* IORING_OP_SENDMSG */
615                 .async_ctx              = 1,
616                 .needs_mm               = 1,
617                 .needs_file             = 1,
618                 .unbound_nonreg_file    = 1,
619         },
620         {
621                 /* IORING_OP_RECVMSG */
622                 .async_ctx              = 1,
623                 .needs_mm               = 1,
624                 .needs_file             = 1,
625                 .unbound_nonreg_file    = 1,
626         },
627         {
628                 /* IORING_OP_TIMEOUT */
629                 .async_ctx              = 1,
630                 .needs_mm               = 1,
631         },
632         {
633                 /* IORING_OP_TIMEOUT_REMOVE */
634         },
635         {
636                 /* IORING_OP_ACCEPT */
637                 .needs_mm               = 1,
638                 .needs_file             = 1,
639                 .unbound_nonreg_file    = 1,
640         },
641         {
642                 /* IORING_OP_ASYNC_CANCEL */
643         },
644         {
645                 /* IORING_OP_LINK_TIMEOUT */
646                 .async_ctx              = 1,
647                 .needs_mm               = 1,
648         },
649         {
650                 /* IORING_OP_CONNECT */
651                 .async_ctx              = 1,
652                 .needs_mm               = 1,
653                 .needs_file             = 1,
654                 .unbound_nonreg_file    = 1,
655         },
656         {
657                 /* IORING_OP_FALLOCATE */
658                 .needs_file             = 1,
659         },
660         {
661                 /* IORING_OP_OPENAT */
662                 .needs_file             = 1,
663                 .fd_non_neg             = 1,
664         },
665         {
666                 /* IORING_OP_CLOSE */
667                 .needs_file             = 1,
668         },
669         {
670                 /* IORING_OP_FILES_UPDATE */
671                 .needs_mm               = 1,
672         },
673         {
674                 /* IORING_OP_STATX */
675                 .needs_mm               = 1,
676                 .needs_file             = 1,
677                 .fd_non_neg             = 1,
678         },
679         {
680                 /* IORING_OP_READ */
681                 .needs_mm               = 1,
682                 .needs_file             = 1,
683                 .unbound_nonreg_file    = 1,
684         },
685         {
686                 /* IORING_OP_WRITE */
687                 .needs_mm               = 1,
688                 .needs_file             = 1,
689                 .unbound_nonreg_file    = 1,
690         },
691         {
692                 /* IORING_OP_FADVISE */
693                 .needs_file             = 1,
694         },
695         {
696                 /* IORING_OP_MADVISE */
697                 .needs_mm               = 1,
698         },
699         {
700                 /* IORING_OP_SEND */
701                 .needs_mm               = 1,
702                 .needs_file             = 1,
703                 .unbound_nonreg_file    = 1,
704         },
705         {
706                 /* IORING_OP_RECV */
707                 .needs_mm               = 1,
708                 .needs_file             = 1,
709                 .unbound_nonreg_file    = 1,
710         },
711         {
712                 /* IORING_OP_OPENAT2 */
713                 .needs_file             = 1,
714                 .fd_non_neg             = 1,
715         },
716 };
717
718 static void io_wq_submit_work(struct io_wq_work **workptr);
719 static void io_cqring_fill_event(struct io_kiocb *req, long res);
720 static void io_put_req(struct io_kiocb *req);
721 static void __io_double_put_req(struct io_kiocb *req);
722 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
723 static void io_queue_linked_timeout(struct io_kiocb *req);
724 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
725                                  struct io_uring_files_update *ip,
726                                  unsigned nr_args);
727
728 static struct kmem_cache *req_cachep;
729
730 static const struct file_operations io_uring_fops;
731
732 struct sock *io_uring_get_socket(struct file *file)
733 {
734 #if defined(CONFIG_UNIX)
735         if (file->f_op == &io_uring_fops) {
736                 struct io_ring_ctx *ctx = file->private_data;
737
738                 return ctx->ring_sock->sk;
739         }
740 #endif
741         return NULL;
742 }
743 EXPORT_SYMBOL(io_uring_get_socket);
744
745 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
746 {
747         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
748
749         complete(&ctx->completions[0]);
750 }
751
752 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
753 {
754         struct io_ring_ctx *ctx;
755         int hash_bits;
756
757         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
758         if (!ctx)
759                 return NULL;
760
761         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
762         if (!ctx->fallback_req)
763                 goto err;
764
765         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
766         if (!ctx->completions)
767                 goto err;
768
769         /*
770          * Use 5 bits less than the max cq entries, that should give us around
771          * 32 entries per hash list if totally full and uniformly spread.
772          */
773         hash_bits = ilog2(p->cq_entries);
774         hash_bits -= 5;
775         if (hash_bits <= 0)
776                 hash_bits = 1;
777         ctx->cancel_hash_bits = hash_bits;
778         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
779                                         GFP_KERNEL);
780         if (!ctx->cancel_hash)
781                 goto err;
782         __hash_init(ctx->cancel_hash, 1U << hash_bits);
783
784         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
785                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
786                 goto err;
787
788         ctx->flags = p->flags;
789         init_waitqueue_head(&ctx->cq_wait);
790         INIT_LIST_HEAD(&ctx->cq_overflow_list);
791         init_completion(&ctx->completions[0]);
792         init_completion(&ctx->completions[1]);
793         mutex_init(&ctx->uring_lock);
794         init_waitqueue_head(&ctx->wait);
795         spin_lock_init(&ctx->completion_lock);
796         init_llist_head(&ctx->poll_llist);
797         INIT_LIST_HEAD(&ctx->poll_list);
798         INIT_LIST_HEAD(&ctx->defer_list);
799         INIT_LIST_HEAD(&ctx->timeout_list);
800         init_waitqueue_head(&ctx->inflight_wait);
801         spin_lock_init(&ctx->inflight_lock);
802         INIT_LIST_HEAD(&ctx->inflight_list);
803         return ctx;
804 err:
805         if (ctx->fallback_req)
806                 kmem_cache_free(req_cachep, ctx->fallback_req);
807         kfree(ctx->completions);
808         kfree(ctx->cancel_hash);
809         kfree(ctx);
810         return NULL;
811 }
812
813 static inline bool __req_need_defer(struct io_kiocb *req)
814 {
815         struct io_ring_ctx *ctx = req->ctx;
816
817         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
818                                         + atomic_read(&ctx->cached_cq_overflow);
819 }
820
821 static inline bool req_need_defer(struct io_kiocb *req)
822 {
823         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
824                 return __req_need_defer(req);
825
826         return false;
827 }
828
829 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
830 {
831         struct io_kiocb *req;
832
833         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
834         if (req && !req_need_defer(req)) {
835                 list_del_init(&req->list);
836                 return req;
837         }
838
839         return NULL;
840 }
841
842 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
843 {
844         struct io_kiocb *req;
845
846         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
847         if (req) {
848                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
849                         return NULL;
850                 if (!__req_need_defer(req)) {
851                         list_del_init(&req->list);
852                         return req;
853                 }
854         }
855
856         return NULL;
857 }
858
859 static void __io_commit_cqring(struct io_ring_ctx *ctx)
860 {
861         struct io_rings *rings = ctx->rings;
862
863         /* order cqe stores with ring update */
864         smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
865
866         if (wq_has_sleeper(&ctx->cq_wait)) {
867                 wake_up_interruptible(&ctx->cq_wait);
868                 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
869         }
870 }
871
872 static inline bool io_prep_async_work(struct io_kiocb *req,
873                                       struct io_kiocb **link)
874 {
875         const struct io_op_def *def = &io_op_defs[req->opcode];
876         bool do_hashed = false;
877
878         if (req->flags & REQ_F_ISREG) {
879                 if (def->hash_reg_file)
880                         do_hashed = true;
881         } else {
882                 if (def->unbound_nonreg_file)
883                         req->work.flags |= IO_WQ_WORK_UNBOUND;
884         }
885         if (def->needs_mm)
886                 req->work.flags |= IO_WQ_WORK_NEEDS_USER;
887
888         *link = io_prep_linked_timeout(req);
889         return do_hashed;
890 }
891
892 static inline void io_queue_async_work(struct io_kiocb *req)
893 {
894         struct io_ring_ctx *ctx = req->ctx;
895         struct io_kiocb *link;
896         bool do_hashed;
897
898         do_hashed = io_prep_async_work(req, &link);
899
900         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
901                                         req->flags);
902         if (!do_hashed) {
903                 io_wq_enqueue(ctx->io_wq, &req->work);
904         } else {
905                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
906                                         file_inode(req->file));
907         }
908
909         if (link)
910                 io_queue_linked_timeout(link);
911 }
912
913 static void io_kill_timeout(struct io_kiocb *req)
914 {
915         int ret;
916
917         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
918         if (ret != -1) {
919                 atomic_inc(&req->ctx->cq_timeouts);
920                 list_del_init(&req->list);
921                 io_cqring_fill_event(req, 0);
922                 io_put_req(req);
923         }
924 }
925
926 static void io_kill_timeouts(struct io_ring_ctx *ctx)
927 {
928         struct io_kiocb *req, *tmp;
929
930         spin_lock_irq(&ctx->completion_lock);
931         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
932                 io_kill_timeout(req);
933         spin_unlock_irq(&ctx->completion_lock);
934 }
935
936 static void io_commit_cqring(struct io_ring_ctx *ctx)
937 {
938         struct io_kiocb *req;
939
940         while ((req = io_get_timeout_req(ctx)) != NULL)
941                 io_kill_timeout(req);
942
943         __io_commit_cqring(ctx);
944
945         while ((req = io_get_deferred_req(ctx)) != NULL) {
946                 req->flags |= REQ_F_IO_DRAINED;
947                 io_queue_async_work(req);
948         }
949 }
950
951 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
952 {
953         struct io_rings *rings = ctx->rings;
954         unsigned tail;
955
956         tail = ctx->cached_cq_tail;
957         /*
958          * writes to the cq entry need to come after reading head; the
959          * control dependency is enough as we're using WRITE_ONCE to
960          * fill the cq entry
961          */
962         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
963                 return NULL;
964
965         ctx->cached_cq_tail++;
966         return &rings->cqes[tail & ctx->cq_mask];
967 }
968
969 static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx)
970 {
971         if (!ctx->eventfd_async)
972                 return true;
973         return io_wq_current_is_worker() || in_interrupt();
974 }
975
976 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
977 {
978         if (waitqueue_active(&ctx->wait))
979                 wake_up(&ctx->wait);
980         if (waitqueue_active(&ctx->sqo_wait))
981                 wake_up(&ctx->sqo_wait);
982         if (ctx->cq_ev_fd && io_should_trigger_evfd(ctx))
983                 eventfd_signal(ctx->cq_ev_fd, 1);
984 }
985
986 /* Returns true if there are no backlogged entries after the flush */
987 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
988 {
989         struct io_rings *rings = ctx->rings;
990         struct io_uring_cqe *cqe;
991         struct io_kiocb *req;
992         unsigned long flags;
993         LIST_HEAD(list);
994
995         if (!force) {
996                 if (list_empty_careful(&ctx->cq_overflow_list))
997                         return true;
998                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
999                     rings->cq_ring_entries))
1000                         return false;
1001         }
1002
1003         spin_lock_irqsave(&ctx->completion_lock, flags);
1004
1005         /* if force is set, the ring is going away. always drop after that */
1006         if (force)
1007                 ctx->cq_overflow_flushed = 1;
1008
1009         cqe = NULL;
1010         while (!list_empty(&ctx->cq_overflow_list)) {
1011                 cqe = io_get_cqring(ctx);
1012                 if (!cqe && !force)
1013                         break;
1014
1015                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
1016                                                 list);
1017                 list_move(&req->list, &list);
1018                 if (cqe) {
1019                         WRITE_ONCE(cqe->user_data, req->user_data);
1020                         WRITE_ONCE(cqe->res, req->result);
1021                         WRITE_ONCE(cqe->flags, 0);
1022                 } else {
1023                         WRITE_ONCE(ctx->rings->cq_overflow,
1024                                 atomic_inc_return(&ctx->cached_cq_overflow));
1025                 }
1026         }
1027
1028         io_commit_cqring(ctx);
1029         if (cqe) {
1030                 clear_bit(0, &ctx->sq_check_overflow);
1031                 clear_bit(0, &ctx->cq_check_overflow);
1032         }
1033         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1034         io_cqring_ev_posted(ctx);
1035
1036         while (!list_empty(&list)) {
1037                 req = list_first_entry(&list, struct io_kiocb, list);
1038                 list_del(&req->list);
1039                 io_put_req(req);
1040         }
1041
1042         return cqe != NULL;
1043 }
1044
1045 static void io_cqring_fill_event(struct io_kiocb *req, long res)
1046 {
1047         struct io_ring_ctx *ctx = req->ctx;
1048         struct io_uring_cqe *cqe;
1049
1050         trace_io_uring_complete(ctx, req->user_data, res);
1051
1052         /*
1053          * If we can't get a cq entry, userspace overflowed the
1054          * submission (by quite a lot). Increment the overflow count in
1055          * the ring.
1056          */
1057         cqe = io_get_cqring(ctx);
1058         if (likely(cqe)) {
1059                 WRITE_ONCE(cqe->user_data, req->user_data);
1060                 WRITE_ONCE(cqe->res, res);
1061                 WRITE_ONCE(cqe->flags, 0);
1062         } else if (ctx->cq_overflow_flushed) {
1063                 WRITE_ONCE(ctx->rings->cq_overflow,
1064                                 atomic_inc_return(&ctx->cached_cq_overflow));
1065         } else {
1066                 if (list_empty(&ctx->cq_overflow_list)) {
1067                         set_bit(0, &ctx->sq_check_overflow);
1068                         set_bit(0, &ctx->cq_check_overflow);
1069                 }
1070                 refcount_inc(&req->refs);
1071                 req->result = res;
1072                 list_add_tail(&req->list, &ctx->cq_overflow_list);
1073         }
1074 }
1075
1076 static void io_cqring_add_event(struct io_kiocb *req, long res)
1077 {
1078         struct io_ring_ctx *ctx = req->ctx;
1079         unsigned long flags;
1080
1081         spin_lock_irqsave(&ctx->completion_lock, flags);
1082         io_cqring_fill_event(req, res);
1083         io_commit_cqring(ctx);
1084         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1085
1086         io_cqring_ev_posted(ctx);
1087 }
1088
1089 static inline bool io_is_fallback_req(struct io_kiocb *req)
1090 {
1091         return req == (struct io_kiocb *)
1092                         ((unsigned long) req->ctx->fallback_req & ~1UL);
1093 }
1094
1095 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
1096 {
1097         struct io_kiocb *req;
1098
1099         req = ctx->fallback_req;
1100         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
1101                 return req;
1102
1103         return NULL;
1104 }
1105
1106 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
1107                                    struct io_submit_state *state)
1108 {
1109         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1110         struct io_kiocb *req;
1111
1112         if (!state) {
1113                 req = kmem_cache_alloc(req_cachep, gfp);
1114                 if (unlikely(!req))
1115                         goto fallback;
1116         } else if (!state->free_reqs) {
1117                 size_t sz;
1118                 int ret;
1119
1120                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
1121                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
1122
1123                 /*
1124                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
1125                  * retry single alloc to be on the safe side.
1126                  */
1127                 if (unlikely(ret <= 0)) {
1128                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1129                         if (!state->reqs[0])
1130                                 goto fallback;
1131                         ret = 1;
1132                 }
1133                 state->free_reqs = ret - 1;
1134                 state->cur_req = 1;
1135                 req = state->reqs[0];
1136         } else {
1137                 req = state->reqs[state->cur_req];
1138                 state->free_reqs--;
1139                 state->cur_req++;
1140         }
1141
1142 got_it:
1143         req->io = NULL;
1144         req->ring_file = NULL;
1145         req->file = NULL;
1146         req->ctx = ctx;
1147         req->flags = 0;
1148         /* one is dropped after submission, the other at completion */
1149         refcount_set(&req->refs, 2);
1150         req->result = 0;
1151         INIT_IO_WORK(&req->work, io_wq_submit_work);
1152         return req;
1153 fallback:
1154         req = io_get_fallback_req(ctx);
1155         if (req)
1156                 goto got_it;
1157         percpu_ref_put(&ctx->refs);
1158         return NULL;
1159 }
1160
1161 static void __io_req_do_free(struct io_kiocb *req)
1162 {
1163         if (likely(!io_is_fallback_req(req)))
1164                 kmem_cache_free(req_cachep, req);
1165         else
1166                 clear_bit_unlock(0, (unsigned long *) req->ctx->fallback_req);
1167 }
1168
1169 static void __io_req_aux_free(struct io_kiocb *req)
1170 {
1171         struct io_ring_ctx *ctx = req->ctx;
1172
1173         kfree(req->io);
1174         if (req->file) {
1175                 if (req->flags & REQ_F_FIXED_FILE)
1176                         percpu_ref_put(&ctx->file_data->refs);
1177                 else
1178                         fput(req->file);
1179         }
1180 }
1181
1182 static void __io_free_req(struct io_kiocb *req)
1183 {
1184         __io_req_aux_free(req);
1185
1186         if (req->flags & REQ_F_INFLIGHT) {
1187                 struct io_ring_ctx *ctx = req->ctx;
1188                 unsigned long flags;
1189
1190                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1191                 list_del(&req->inflight_entry);
1192                 if (waitqueue_active(&ctx->inflight_wait))
1193                         wake_up(&ctx->inflight_wait);
1194                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1195         }
1196
1197         percpu_ref_put(&req->ctx->refs);
1198         __io_req_do_free(req);
1199 }
1200
1201 struct req_batch {
1202         void *reqs[IO_IOPOLL_BATCH];
1203         int to_free;
1204         int need_iter;
1205 };
1206
1207 static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
1208 {
1209         int fixed_refs = rb->to_free;
1210
1211         if (!rb->to_free)
1212                 return;
1213         if (rb->need_iter) {
1214                 int i, inflight = 0;
1215                 unsigned long flags;
1216
1217                 fixed_refs = 0;
1218                 for (i = 0; i < rb->to_free; i++) {
1219                         struct io_kiocb *req = rb->reqs[i];
1220
1221                         if (req->flags & REQ_F_FIXED_FILE) {
1222                                 req->file = NULL;
1223                                 fixed_refs++;
1224                         }
1225                         if (req->flags & REQ_F_INFLIGHT)
1226                                 inflight++;
1227                         __io_req_aux_free(req);
1228                 }
1229                 if (!inflight)
1230                         goto do_free;
1231
1232                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1233                 for (i = 0; i < rb->to_free; i++) {
1234                         struct io_kiocb *req = rb->reqs[i];
1235
1236                         if (req->flags & REQ_F_INFLIGHT) {
1237                                 list_del(&req->inflight_entry);
1238                                 if (!--inflight)
1239                                         break;
1240                         }
1241                 }
1242                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1243
1244                 if (waitqueue_active(&ctx->inflight_wait))
1245                         wake_up(&ctx->inflight_wait);
1246         }
1247 do_free:
1248         kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
1249         if (fixed_refs)
1250                 percpu_ref_put_many(&ctx->file_data->refs, fixed_refs);
1251         percpu_ref_put_many(&ctx->refs, rb->to_free);
1252         rb->to_free = rb->need_iter = 0;
1253 }
1254
1255 static bool io_link_cancel_timeout(struct io_kiocb *req)
1256 {
1257         struct io_ring_ctx *ctx = req->ctx;
1258         int ret;
1259
1260         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
1261         if (ret != -1) {
1262                 io_cqring_fill_event(req, -ECANCELED);
1263                 io_commit_cqring(ctx);
1264                 req->flags &= ~REQ_F_LINK;
1265                 io_put_req(req);
1266                 return true;
1267         }
1268
1269         return false;
1270 }
1271
1272 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1273 {
1274         struct io_ring_ctx *ctx = req->ctx;
1275         bool wake_ev = false;
1276
1277         /* Already got next link */
1278         if (req->flags & REQ_F_LINK_NEXT)
1279                 return;
1280
1281         /*
1282          * The list should never be empty when we are called here. But could
1283          * potentially happen if the chain is messed up, check to be on the
1284          * safe side.
1285          */
1286         while (!list_empty(&req->link_list)) {
1287                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
1288                                                 struct io_kiocb, link_list);
1289
1290                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
1291                              (nxt->flags & REQ_F_TIMEOUT))) {
1292                         list_del_init(&nxt->link_list);
1293                         wake_ev |= io_link_cancel_timeout(nxt);
1294                         req->flags &= ~REQ_F_LINK_TIMEOUT;
1295                         continue;
1296                 }
1297
1298                 list_del_init(&req->link_list);
1299                 if (!list_empty(&nxt->link_list))
1300                         nxt->flags |= REQ_F_LINK;
1301                 *nxtptr = nxt;
1302                 break;
1303         }
1304
1305         req->flags |= REQ_F_LINK_NEXT;
1306         if (wake_ev)
1307                 io_cqring_ev_posted(ctx);
1308 }
1309
1310 /*
1311  * Called if REQ_F_LINK is set, and we fail the head request
1312  */
1313 static void io_fail_links(struct io_kiocb *req)
1314 {
1315         struct io_ring_ctx *ctx = req->ctx;
1316         unsigned long flags;
1317
1318         spin_lock_irqsave(&ctx->completion_lock, flags);
1319
1320         while (!list_empty(&req->link_list)) {
1321                 struct io_kiocb *link = list_first_entry(&req->link_list,
1322                                                 struct io_kiocb, link_list);
1323
1324                 list_del_init(&link->link_list);
1325                 trace_io_uring_fail_link(req, link);
1326
1327                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
1328                     link->opcode == IORING_OP_LINK_TIMEOUT) {
1329                         io_link_cancel_timeout(link);
1330                 } else {
1331                         io_cqring_fill_event(link, -ECANCELED);
1332                         __io_double_put_req(link);
1333                 }
1334                 req->flags &= ~REQ_F_LINK_TIMEOUT;
1335         }
1336
1337         io_commit_cqring(ctx);
1338         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1339         io_cqring_ev_posted(ctx);
1340 }
1341
1342 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1343 {
1344         if (likely(!(req->flags & REQ_F_LINK)))
1345                 return;
1346
1347         /*
1348          * If LINK is set, we have dependent requests in this chain. If we
1349          * didn't fail this request, queue the first one up, moving any other
1350          * dependencies to the next request. In case of failure, fail the rest
1351          * of the chain.
1352          */
1353         if (req->flags & REQ_F_FAIL_LINK) {
1354                 io_fail_links(req);
1355         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1356                         REQ_F_LINK_TIMEOUT) {
1357                 struct io_ring_ctx *ctx = req->ctx;
1358                 unsigned long flags;
1359
1360                 /*
1361                  * If this is a timeout link, we could be racing with the
1362                  * timeout timer. Grab the completion lock for this case to
1363                  * protect against that.
1364                  */
1365                 spin_lock_irqsave(&ctx->completion_lock, flags);
1366                 io_req_link_next(req, nxt);
1367                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1368         } else {
1369                 io_req_link_next(req, nxt);
1370         }
1371 }
1372
1373 static void io_free_req(struct io_kiocb *req)
1374 {
1375         struct io_kiocb *nxt = NULL;
1376
1377         io_req_find_next(req, &nxt);
1378         __io_free_req(req);
1379
1380         if (nxt)
1381                 io_queue_async_work(nxt);
1382 }
1383
1384 /*
1385  * Drop reference to request, return next in chain (if there is one) if this
1386  * was the last reference to this request.
1387  */
1388 __attribute__((nonnull))
1389 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1390 {
1391         io_req_find_next(req, nxtptr);
1392
1393         if (refcount_dec_and_test(&req->refs))
1394                 __io_free_req(req);
1395 }
1396
1397 static void io_put_req(struct io_kiocb *req)
1398 {
1399         if (refcount_dec_and_test(&req->refs))
1400                 io_free_req(req);
1401 }
1402
1403 /*
1404  * Must only be used if we don't need to care about links, usually from
1405  * within the completion handling itself.
1406  */
1407 static void __io_double_put_req(struct io_kiocb *req)
1408 {
1409         /* drop both submit and complete references */
1410         if (refcount_sub_and_test(2, &req->refs))
1411                 __io_free_req(req);
1412 }
1413
1414 static void io_double_put_req(struct io_kiocb *req)
1415 {
1416         /* drop both submit and complete references */
1417         if (refcount_sub_and_test(2, &req->refs))
1418                 io_free_req(req);
1419 }
1420
1421 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1422 {
1423         struct io_rings *rings = ctx->rings;
1424
1425         if (test_bit(0, &ctx->cq_check_overflow)) {
1426                 /*
1427                  * noflush == true is from the waitqueue handler, just ensure
1428                  * we wake up the task, and the next invocation will flush the
1429                  * entries. We cannot safely to it from here.
1430                  */
1431                 if (noflush && !list_empty(&ctx->cq_overflow_list))
1432                         return -1U;
1433
1434                 io_cqring_overflow_flush(ctx, false);
1435         }
1436
1437         /* See comment at the top of this file */
1438         smp_rmb();
1439         return ctx->cached_cq_tail - READ_ONCE(rings->cq.head);
1440 }
1441
1442 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1443 {
1444         struct io_rings *rings = ctx->rings;
1445
1446         /* make sure SQ entry isn't read before tail */
1447         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1448 }
1449
1450 static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req)
1451 {
1452         if ((req->flags & REQ_F_LINK) || io_is_fallback_req(req))
1453                 return false;
1454
1455         if (!(req->flags & REQ_F_FIXED_FILE) || req->io)
1456                 rb->need_iter++;
1457
1458         rb->reqs[rb->to_free++] = req;
1459         if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
1460                 io_free_req_many(req->ctx, rb);
1461         return true;
1462 }
1463
1464 /*
1465  * Find and free completed poll iocbs
1466  */
1467 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1468                                struct list_head *done)
1469 {
1470         struct req_batch rb;
1471         struct io_kiocb *req;
1472
1473         rb.to_free = rb.need_iter = 0;
1474         while (!list_empty(done)) {
1475                 req = list_first_entry(done, struct io_kiocb, list);
1476                 list_del(&req->list);
1477
1478                 io_cqring_fill_event(req, req->result);
1479                 (*nr_events)++;
1480
1481                 if (refcount_dec_and_test(&req->refs) &&
1482                     !io_req_multi_free(&rb, req))
1483                         io_free_req(req);
1484         }
1485
1486         io_commit_cqring(ctx);
1487         io_free_req_many(ctx, &rb);
1488 }
1489
1490 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1491                         long min)
1492 {
1493         struct io_kiocb *req, *tmp;
1494         LIST_HEAD(done);
1495         bool spin;
1496         int ret;
1497
1498         /*
1499          * Only spin for completions if we don't have multiple devices hanging
1500          * off our complete list, and we're under the requested amount.
1501          */
1502         spin = !ctx->poll_multi_file && *nr_events < min;
1503
1504         ret = 0;
1505         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1506                 struct kiocb *kiocb = &req->rw.kiocb;
1507
1508                 /*
1509                  * Move completed entries to our local list. If we find a
1510                  * request that requires polling, break out and complete
1511                  * the done list first, if we have entries there.
1512                  */
1513                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1514                         list_move_tail(&req->list, &done);
1515                         continue;
1516                 }
1517                 if (!list_empty(&done))
1518                         break;
1519
1520                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1521                 if (ret < 0)
1522                         break;
1523
1524                 if (ret && spin)
1525                         spin = false;
1526                 ret = 0;
1527         }
1528
1529         if (!list_empty(&done))
1530                 io_iopoll_complete(ctx, nr_events, &done);
1531
1532         return ret;
1533 }
1534
1535 /*
1536  * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1537  * non-spinning poll check - we'll still enter the driver poll loop, but only
1538  * as a non-spinning completion check.
1539  */
1540 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1541                                 long min)
1542 {
1543         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1544                 int ret;
1545
1546                 ret = io_do_iopoll(ctx, nr_events, min);
1547                 if (ret < 0)
1548                         return ret;
1549                 if (!min || *nr_events >= min)
1550                         return 0;
1551         }
1552
1553         return 1;
1554 }
1555
1556 /*
1557  * We can't just wait for polled events to come to us, we have to actively
1558  * find and complete them.
1559  */
1560 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1561 {
1562         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1563                 return;
1564
1565         mutex_lock(&ctx->uring_lock);
1566         while (!list_empty(&ctx->poll_list)) {
1567                 unsigned int nr_events = 0;
1568
1569                 io_iopoll_getevents(ctx, &nr_events, 1);
1570
1571                 /*
1572                  * Ensure we allow local-to-the-cpu processing to take place,
1573                  * in this case we need to ensure that we reap all events.
1574                  */
1575                 cond_resched();
1576         }
1577         mutex_unlock(&ctx->uring_lock);
1578 }
1579
1580 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1581                             long min)
1582 {
1583         int iters = 0, ret = 0;
1584
1585         do {
1586                 int tmin = 0;
1587
1588                 /*
1589                  * Don't enter poll loop if we already have events pending.
1590                  * If we do, we can potentially be spinning for commands that
1591                  * already triggered a CQE (eg in error).
1592                  */
1593                 if (io_cqring_events(ctx, false))
1594                         break;
1595
1596                 /*
1597                  * If a submit got punted to a workqueue, we can have the
1598                  * application entering polling for a command before it gets
1599                  * issued. That app will hold the uring_lock for the duration
1600                  * of the poll right here, so we need to take a breather every
1601                  * now and then to ensure that the issue has a chance to add
1602                  * the poll to the issued list. Otherwise we can spin here
1603                  * forever, while the workqueue is stuck trying to acquire the
1604                  * very same mutex.
1605                  */
1606                 if (!(++iters & 7)) {
1607                         mutex_unlock(&ctx->uring_lock);
1608                         mutex_lock(&ctx->uring_lock);
1609                 }
1610
1611                 if (*nr_events < min)
1612                         tmin = min - *nr_events;
1613
1614                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1615                 if (ret <= 0)
1616                         break;
1617                 ret = 0;
1618         } while (min && !*nr_events && !need_resched());
1619
1620         return ret;
1621 }
1622
1623 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1624                            long min)
1625 {
1626         int ret;
1627
1628         /*
1629          * We disallow the app entering submit/complete with polling, but we
1630          * still need to lock the ring to prevent racing with polled issue
1631          * that got punted to a workqueue.
1632          */
1633         mutex_lock(&ctx->uring_lock);
1634         ret = __io_iopoll_check(ctx, nr_events, min);
1635         mutex_unlock(&ctx->uring_lock);
1636         return ret;
1637 }
1638
1639 static void kiocb_end_write(struct io_kiocb *req)
1640 {
1641         /*
1642          * Tell lockdep we inherited freeze protection from submission
1643          * thread.
1644          */
1645         if (req->flags & REQ_F_ISREG) {
1646                 struct inode *inode = file_inode(req->file);
1647
1648                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1649         }
1650         file_end_write(req->file);
1651 }
1652
1653 static inline void req_set_fail_links(struct io_kiocb *req)
1654 {
1655         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1656                 req->flags |= REQ_F_FAIL_LINK;
1657 }
1658
1659 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1660 {
1661         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1662
1663         if (kiocb->ki_flags & IOCB_WRITE)
1664                 kiocb_end_write(req);
1665
1666         if (res != req->result)
1667                 req_set_fail_links(req);
1668         io_cqring_add_event(req, res);
1669 }
1670
1671 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1672 {
1673         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1674
1675         io_complete_rw_common(kiocb, res);
1676         io_put_req(req);
1677 }
1678
1679 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1680 {
1681         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1682         struct io_kiocb *nxt = NULL;
1683
1684         io_complete_rw_common(kiocb, res);
1685         io_put_req_find_next(req, &nxt);
1686
1687         return nxt;
1688 }
1689
1690 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1691 {
1692         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1693
1694         if (kiocb->ki_flags & IOCB_WRITE)
1695                 kiocb_end_write(req);
1696
1697         if (res != req->result)
1698                 req_set_fail_links(req);
1699         req->result = res;
1700         if (res != -EAGAIN)
1701                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1702 }
1703
1704 /*
1705  * After the iocb has been issued, it's safe to be found on the poll list.
1706  * Adding the kiocb to the list AFTER submission ensures that we don't
1707  * find it from a io_iopoll_getevents() thread before the issuer is done
1708  * accessing the kiocb cookie.
1709  */
1710 static void io_iopoll_req_issued(struct io_kiocb *req)
1711 {
1712         struct io_ring_ctx *ctx = req->ctx;
1713
1714         /*
1715          * Track whether we have multiple files in our lists. This will impact
1716          * how we do polling eventually, not spinning if we're on potentially
1717          * different devices.
1718          */
1719         if (list_empty(&ctx->poll_list)) {
1720                 ctx->poll_multi_file = false;
1721         } else if (!ctx->poll_multi_file) {
1722                 struct io_kiocb *list_req;
1723
1724                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1725                                                 list);
1726                 if (list_req->file != req->file)
1727                         ctx->poll_multi_file = true;
1728         }
1729
1730         /*
1731          * For fast devices, IO may have already completed. If it has, add
1732          * it to the front so we find it first.
1733          */
1734         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1735                 list_add(&req->list, &ctx->poll_list);
1736         else
1737                 list_add_tail(&req->list, &ctx->poll_list);
1738 }
1739
1740 static void io_file_put(struct io_submit_state *state)
1741 {
1742         if (state->file) {
1743                 int diff = state->has_refs - state->used_refs;
1744
1745                 if (diff)
1746                         fput_many(state->file, diff);
1747                 state->file = NULL;
1748         }
1749 }
1750
1751 /*
1752  * Get as many references to a file as we have IOs left in this submission,
1753  * assuming most submissions are for one file, or at least that each file
1754  * has more than one submission.
1755  */
1756 static struct file *io_file_get(struct io_submit_state *state, int fd)
1757 {
1758         if (!state)
1759                 return fget(fd);
1760
1761         if (state->file) {
1762                 if (state->fd == fd) {
1763                         state->used_refs++;
1764                         state->ios_left--;
1765                         return state->file;
1766                 }
1767                 io_file_put(state);
1768         }
1769         state->file = fget_many(fd, state->ios_left);
1770         if (!state->file)
1771                 return NULL;
1772
1773         state->fd = fd;
1774         state->has_refs = state->ios_left;
1775         state->used_refs = 1;
1776         state->ios_left--;
1777         return state->file;
1778 }
1779
1780 /*
1781  * If we tracked the file through the SCM inflight mechanism, we could support
1782  * any file. For now, just ensure that anything potentially problematic is done
1783  * inline.
1784  */
1785 static bool io_file_supports_async(struct file *file)
1786 {
1787         umode_t mode = file_inode(file)->i_mode;
1788
1789         if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1790                 return true;
1791         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1792                 return true;
1793
1794         return false;
1795 }
1796
1797 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1798                       bool force_nonblock)
1799 {
1800         struct io_ring_ctx *ctx = req->ctx;
1801         struct kiocb *kiocb = &req->rw.kiocb;
1802         unsigned ioprio;
1803         int ret;
1804
1805         if (!req->file)
1806                 return -EBADF;
1807
1808         if (S_ISREG(file_inode(req->file)->i_mode))
1809                 req->flags |= REQ_F_ISREG;
1810
1811         kiocb->ki_pos = READ_ONCE(sqe->off);
1812         if (kiocb->ki_pos == -1 && !(req->file->f_mode & FMODE_STREAM)) {
1813                 req->flags |= REQ_F_CUR_POS;
1814                 kiocb->ki_pos = req->file->f_pos;
1815         }
1816         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1817         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1818
1819         ioprio = READ_ONCE(sqe->ioprio);
1820         if (ioprio) {
1821                 ret = ioprio_check_cap(ioprio);
1822                 if (ret)
1823                         return ret;
1824
1825                 kiocb->ki_ioprio = ioprio;
1826         } else
1827                 kiocb->ki_ioprio = get_current_ioprio();
1828
1829         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1830         if (unlikely(ret))
1831                 return ret;
1832
1833         /* don't allow async punt if RWF_NOWAIT was requested */
1834         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1835             (req->file->f_flags & O_NONBLOCK))
1836                 req->flags |= REQ_F_NOWAIT;
1837
1838         if (force_nonblock)
1839                 kiocb->ki_flags |= IOCB_NOWAIT;
1840
1841         if (ctx->flags & IORING_SETUP_IOPOLL) {
1842                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1843                     !kiocb->ki_filp->f_op->iopoll)
1844                         return -EOPNOTSUPP;
1845
1846                 kiocb->ki_flags |= IOCB_HIPRI;
1847                 kiocb->ki_complete = io_complete_rw_iopoll;
1848                 req->result = 0;
1849         } else {
1850                 if (kiocb->ki_flags & IOCB_HIPRI)
1851                         return -EINVAL;
1852                 kiocb->ki_complete = io_complete_rw;
1853         }
1854
1855         req->rw.addr = READ_ONCE(sqe->addr);
1856         req->rw.len = READ_ONCE(sqe->len);
1857         /* we own ->private, reuse it for the buffer index */
1858         req->rw.kiocb.private = (void *) (unsigned long)
1859                                         READ_ONCE(sqe->buf_index);
1860         return 0;
1861 }
1862
1863 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1864 {
1865         switch (ret) {
1866         case -EIOCBQUEUED:
1867                 break;
1868         case -ERESTARTSYS:
1869         case -ERESTARTNOINTR:
1870         case -ERESTARTNOHAND:
1871         case -ERESTART_RESTARTBLOCK:
1872                 /*
1873                  * We can't just restart the syscall, since previously
1874                  * submitted sqes may already be in progress. Just fail this
1875                  * IO with EINTR.
1876                  */
1877                 ret = -EINTR;
1878                 /* fall through */
1879         default:
1880                 kiocb->ki_complete(kiocb, ret, 0);
1881         }
1882 }
1883
1884 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1885                        bool in_async)
1886 {
1887         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1888
1889         if (req->flags & REQ_F_CUR_POS)
1890                 req->file->f_pos = kiocb->ki_pos;
1891         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1892                 *nxt = __io_complete_rw(kiocb, ret);
1893         else
1894                 io_rw_done(kiocb, ret);
1895 }
1896
1897 static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
1898                                struct iov_iter *iter)
1899 {
1900         struct io_ring_ctx *ctx = req->ctx;
1901         size_t len = req->rw.len;
1902         struct io_mapped_ubuf *imu;
1903         unsigned index, buf_index;
1904         size_t offset;
1905         u64 buf_addr;
1906
1907         /* attempt to use fixed buffers without having provided iovecs */
1908         if (unlikely(!ctx->user_bufs))
1909                 return -EFAULT;
1910
1911         buf_index = (unsigned long) req->rw.kiocb.private;
1912         if (unlikely(buf_index >= ctx->nr_user_bufs))
1913                 return -EFAULT;
1914
1915         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1916         imu = &ctx->user_bufs[index];
1917         buf_addr = req->rw.addr;
1918
1919         /* overflow */
1920         if (buf_addr + len < buf_addr)
1921                 return -EFAULT;
1922         /* not inside the mapped region */
1923         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1924                 return -EFAULT;
1925
1926         /*
1927          * May not be a start of buffer, set size appropriately
1928          * and advance us to the beginning.
1929          */
1930         offset = buf_addr - imu->ubuf;
1931         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1932
1933         if (offset) {
1934                 /*
1935                  * Don't use iov_iter_advance() here, as it's really slow for
1936                  * using the latter parts of a big fixed buffer - it iterates
1937                  * over each segment manually. We can cheat a bit here, because
1938                  * we know that:
1939                  *
1940                  * 1) it's a BVEC iter, we set it up
1941                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1942                  *    first and last bvec
1943                  *
1944                  * So just find our index, and adjust the iterator afterwards.
1945                  * If the offset is within the first bvec (or the whole first
1946                  * bvec, just use iov_iter_advance(). This makes it easier
1947                  * since we can just skip the first segment, which may not
1948                  * be PAGE_SIZE aligned.
1949                  */
1950                 const struct bio_vec *bvec = imu->bvec;
1951
1952                 if (offset <= bvec->bv_len) {
1953                         iov_iter_advance(iter, offset);
1954                 } else {
1955                         unsigned long seg_skip;
1956
1957                         /* skip first vec */
1958                         offset -= bvec->bv_len;
1959                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1960
1961                         iter->bvec = bvec + seg_skip;
1962                         iter->nr_segs -= seg_skip;
1963                         iter->count -= bvec->bv_len + offset;
1964                         iter->iov_offset = offset & ~PAGE_MASK;
1965                 }
1966         }
1967
1968         return len;
1969 }
1970
1971 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1972                                struct iovec **iovec, struct iov_iter *iter)
1973 {
1974         void __user *buf = u64_to_user_ptr(req->rw.addr);
1975         size_t sqe_len = req->rw.len;
1976         u8 opcode;
1977
1978         opcode = req->opcode;
1979         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1980                 *iovec = NULL;
1981                 return io_import_fixed(req, rw, iter);
1982         }
1983
1984         /* buffer index only valid with fixed read/write */
1985         if (req->rw.kiocb.private)
1986                 return -EINVAL;
1987
1988         if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
1989                 ssize_t ret;
1990                 ret = import_single_range(rw, buf, sqe_len, *iovec, iter);
1991                 *iovec = NULL;
1992                 return ret;
1993         }
1994
1995         if (req->io) {
1996                 struct io_async_rw *iorw = &req->io->rw;
1997
1998                 *iovec = iorw->iov;
1999                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
2000                 if (iorw->iov == iorw->fast_iov)
2001                         *iovec = NULL;
2002                 return iorw->size;
2003         }
2004
2005         if (!req->has_user)
2006                 return -EFAULT;
2007
2008 #ifdef CONFIG_COMPAT
2009         if (req->ctx->compat)
2010                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
2011                                                 iovec, iter);
2012 #endif
2013
2014         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
2015 }
2016
2017 /*
2018  * For files that don't have ->read_iter() and ->write_iter(), handle them
2019  * by looping over ->read() or ->write() manually.
2020  */
2021 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
2022                            struct iov_iter *iter)
2023 {
2024         ssize_t ret = 0;
2025
2026         /*
2027          * Don't support polled IO through this interface, and we can't
2028          * support non-blocking either. For the latter, this just causes
2029          * the kiocb to be handled from an async context.
2030          */
2031         if (kiocb->ki_flags & IOCB_HIPRI)
2032                 return -EOPNOTSUPP;
2033         if (kiocb->ki_flags & IOCB_NOWAIT)
2034                 return -EAGAIN;
2035
2036         while (iov_iter_count(iter)) {
2037                 struct iovec iovec;
2038                 ssize_t nr;
2039
2040                 if (!iov_iter_is_bvec(iter)) {
2041                         iovec = iov_iter_iovec(iter);
2042                 } else {
2043                         /* fixed buffers import bvec */
2044                         iovec.iov_base = kmap(iter->bvec->bv_page)
2045                                                 + iter->iov_offset;
2046                         iovec.iov_len = min(iter->count,
2047                                         iter->bvec->bv_len - iter->iov_offset);
2048                 }
2049
2050                 if (rw == READ) {
2051                         nr = file->f_op->read(file, iovec.iov_base,
2052                                               iovec.iov_len, &kiocb->ki_pos);
2053                 } else {
2054                         nr = file->f_op->write(file, iovec.iov_base,
2055                                                iovec.iov_len, &kiocb->ki_pos);
2056                 }
2057
2058                 if (iov_iter_is_bvec(iter))
2059                         kunmap(iter->bvec->bv_page);
2060
2061                 if (nr < 0) {
2062                         if (!ret)
2063                                 ret = nr;
2064                         break;
2065                 }
2066                 ret += nr;
2067                 if (nr != iovec.iov_len)
2068                         break;
2069                 iov_iter_advance(iter, nr);
2070         }
2071
2072         return ret;
2073 }
2074
2075 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
2076                           struct iovec *iovec, struct iovec *fast_iov,
2077                           struct iov_iter *iter)
2078 {
2079         req->io->rw.nr_segs = iter->nr_segs;
2080         req->io->rw.size = io_size;
2081         req->io->rw.iov = iovec;
2082         if (!req->io->rw.iov) {
2083                 req->io->rw.iov = req->io->rw.fast_iov;
2084                 memcpy(req->io->rw.iov, fast_iov,
2085                         sizeof(struct iovec) * iter->nr_segs);
2086         }
2087 }
2088
2089 static int io_alloc_async_ctx(struct io_kiocb *req)
2090 {
2091         if (!io_op_defs[req->opcode].async_ctx)
2092                 return 0;
2093         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
2094         return req->io == NULL;
2095 }
2096
2097 static void io_rw_async(struct io_wq_work **workptr)
2098 {
2099         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2100         struct iovec *iov = NULL;
2101
2102         if (req->io->rw.iov != req->io->rw.fast_iov)
2103                 iov = req->io->rw.iov;
2104         io_wq_submit_work(workptr);
2105         kfree(iov);
2106 }
2107
2108 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
2109                              struct iovec *iovec, struct iovec *fast_iov,
2110                              struct iov_iter *iter)
2111 {
2112         if (req->opcode == IORING_OP_READ_FIXED ||
2113             req->opcode == IORING_OP_WRITE_FIXED)
2114                 return 0;
2115         if (!req->io && io_alloc_async_ctx(req))
2116                 return -ENOMEM;
2117
2118         io_req_map_rw(req, io_size, iovec, fast_iov, iter);
2119         req->work.func = io_rw_async;
2120         return 0;
2121 }
2122
2123 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2124                         bool force_nonblock)
2125 {
2126         struct io_async_ctx *io;
2127         struct iov_iter iter;
2128         ssize_t ret;
2129
2130         ret = io_prep_rw(req, sqe, force_nonblock);
2131         if (ret)
2132                 return ret;
2133
2134         if (unlikely(!(req->file->f_mode & FMODE_READ)))
2135                 return -EBADF;
2136
2137         if (!req->io)
2138                 return 0;
2139
2140         io = req->io;
2141         io->rw.iov = io->rw.fast_iov;
2142         req->io = NULL;
2143         ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
2144         req->io = io;
2145         if (ret < 0)
2146                 return ret;
2147
2148         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2149         return 0;
2150 }
2151
2152 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
2153                    bool force_nonblock)
2154 {
2155         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2156         struct kiocb *kiocb = &req->rw.kiocb;
2157         struct iov_iter iter;
2158         size_t iov_count;
2159         ssize_t io_size, ret;
2160
2161         ret = io_import_iovec(READ, req, &iovec, &iter);
2162         if (ret < 0)
2163                 return ret;
2164
2165         /* Ensure we clear previously set non-block flag */
2166         if (!force_nonblock)
2167                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2168
2169         req->result = 0;
2170         io_size = ret;
2171         if (req->flags & REQ_F_LINK)
2172                 req->result = io_size;
2173
2174         /*
2175          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2176          * we know to async punt it even if it was opened O_NONBLOCK
2177          */
2178         if (force_nonblock && !io_file_supports_async(req->file)) {
2179                 req->flags |= REQ_F_MUST_PUNT;
2180                 goto copy_iov;
2181         }
2182
2183         iov_count = iov_iter_count(&iter);
2184         ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
2185         if (!ret) {
2186                 ssize_t ret2;
2187
2188                 if (req->file->f_op->read_iter)
2189                         ret2 = call_read_iter(req->file, kiocb, &iter);
2190                 else
2191                         ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
2192
2193                 /* Catch -EAGAIN return for forced non-blocking submission */
2194                 if (!force_nonblock || ret2 != -EAGAIN) {
2195                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2196                 } else {
2197 copy_iov:
2198                         ret = io_setup_async_rw(req, io_size, iovec,
2199                                                 inline_vecs, &iter);
2200                         if (ret)
2201                                 goto out_free;
2202                         return -EAGAIN;
2203                 }
2204         }
2205 out_free:
2206         if (!io_wq_current_is_worker())
2207                 kfree(iovec);
2208         return ret;
2209 }
2210
2211 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2212                          bool force_nonblock)
2213 {
2214         struct io_async_ctx *io;
2215         struct iov_iter iter;
2216         ssize_t ret;
2217
2218         ret = io_prep_rw(req, sqe, force_nonblock);
2219         if (ret)
2220                 return ret;
2221
2222         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
2223                 return -EBADF;
2224
2225         if (!req->io)
2226                 return 0;
2227
2228         io = req->io;
2229         io->rw.iov = io->rw.fast_iov;
2230         req->io = NULL;
2231         ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
2232         req->io = io;
2233         if (ret < 0)
2234                 return ret;
2235
2236         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2237         return 0;
2238 }
2239
2240 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
2241                     bool force_nonblock)
2242 {
2243         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2244         struct kiocb *kiocb = &req->rw.kiocb;
2245         struct iov_iter iter;
2246         size_t iov_count;
2247         ssize_t ret, io_size;
2248
2249         ret = io_import_iovec(WRITE, req, &iovec, &iter);
2250         if (ret < 0)
2251                 return ret;
2252
2253         /* Ensure we clear previously set non-block flag */
2254         if (!force_nonblock)
2255                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2256
2257         req->result = 0;
2258         io_size = ret;
2259         if (req->flags & REQ_F_LINK)
2260                 req->result = io_size;
2261
2262         /*
2263          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2264          * we know to async punt it even if it was opened O_NONBLOCK
2265          */
2266         if (force_nonblock && !io_file_supports_async(req->file)) {
2267                 req->flags |= REQ_F_MUST_PUNT;
2268                 goto copy_iov;
2269         }
2270
2271         /* file path doesn't support NOWAIT for non-direct_IO */
2272         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
2273             (req->flags & REQ_F_ISREG))
2274                 goto copy_iov;
2275
2276         iov_count = iov_iter_count(&iter);
2277         ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
2278         if (!ret) {
2279                 ssize_t ret2;
2280
2281                 /*
2282                  * Open-code file_start_write here to grab freeze protection,
2283                  * which will be released by another thread in
2284                  * io_complete_rw().  Fool lockdep by telling it the lock got
2285                  * released so that it doesn't complain about the held lock when
2286                  * we return to userspace.
2287                  */
2288                 if (req->flags & REQ_F_ISREG) {
2289                         __sb_start_write(file_inode(req->file)->i_sb,
2290                                                 SB_FREEZE_WRITE, true);
2291                         __sb_writers_release(file_inode(req->file)->i_sb,
2292                                                 SB_FREEZE_WRITE);
2293                 }
2294                 kiocb->ki_flags |= IOCB_WRITE;
2295
2296                 if (req->file->f_op->write_iter)
2297                         ret2 = call_write_iter(req->file, kiocb, &iter);
2298                 else
2299                         ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
2300                 if (!force_nonblock || ret2 != -EAGAIN) {
2301                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2302                 } else {
2303 copy_iov:
2304                         ret = io_setup_async_rw(req, io_size, iovec,
2305                                                 inline_vecs, &iter);
2306                         if (ret)
2307                                 goto out_free;
2308                         return -EAGAIN;
2309                 }
2310         }
2311 out_free:
2312         if (!io_wq_current_is_worker())
2313                 kfree(iovec);
2314         return ret;
2315 }
2316
2317 /*
2318  * IORING_OP_NOP just posts a completion event, nothing else.
2319  */
2320 static int io_nop(struct io_kiocb *req)
2321 {
2322         struct io_ring_ctx *ctx = req->ctx;
2323
2324         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2325                 return -EINVAL;
2326
2327         io_cqring_add_event(req, 0);
2328         io_put_req(req);
2329         return 0;
2330 }
2331
2332 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2333 {
2334         struct io_ring_ctx *ctx = req->ctx;
2335
2336         if (!req->file)
2337                 return -EBADF;
2338
2339         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2340                 return -EINVAL;
2341         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2342                 return -EINVAL;
2343
2344         req->sync.flags = READ_ONCE(sqe->fsync_flags);
2345         if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
2346                 return -EINVAL;
2347
2348         req->sync.off = READ_ONCE(sqe->off);
2349         req->sync.len = READ_ONCE(sqe->len);
2350         return 0;
2351 }
2352
2353 static bool io_req_cancelled(struct io_kiocb *req)
2354 {
2355         if (req->work.flags & IO_WQ_WORK_CANCEL) {
2356                 req_set_fail_links(req);
2357                 io_cqring_add_event(req, -ECANCELED);
2358                 io_put_req(req);
2359                 return true;
2360         }
2361
2362         return false;
2363 }
2364
2365 static void io_link_work_cb(struct io_wq_work **workptr)
2366 {
2367         struct io_wq_work *work = *workptr;
2368         struct io_kiocb *link = work->data;
2369
2370         io_queue_linked_timeout(link);
2371         work->func = io_wq_submit_work;
2372 }
2373
2374 static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
2375 {
2376         struct io_kiocb *link;
2377
2378         io_prep_async_work(nxt, &link);
2379         *workptr = &nxt->work;
2380         if (link) {
2381                 nxt->work.flags |= IO_WQ_WORK_CB;
2382                 nxt->work.func = io_link_work_cb;
2383                 nxt->work.data = link;
2384         }
2385 }
2386
2387 static void io_fsync_finish(struct io_wq_work **workptr)
2388 {
2389         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2390         loff_t end = req->sync.off + req->sync.len;
2391         struct io_kiocb *nxt = NULL;
2392         int ret;
2393
2394         if (io_req_cancelled(req))
2395                 return;
2396
2397         ret = vfs_fsync_range(req->file, req->sync.off,
2398                                 end > 0 ? end : LLONG_MAX,
2399                                 req->sync.flags & IORING_FSYNC_DATASYNC);
2400         if (ret < 0)
2401                 req_set_fail_links(req);
2402         io_cqring_add_event(req, ret);
2403         io_put_req_find_next(req, &nxt);
2404         if (nxt)
2405                 io_wq_assign_next(workptr, nxt);
2406 }
2407
2408 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2409                     bool force_nonblock)
2410 {
2411         struct io_wq_work *work, *old_work;
2412
2413         /* fsync always requires a blocking context */
2414         if (force_nonblock) {
2415                 io_put_req(req);
2416                 req->work.func = io_fsync_finish;
2417                 return -EAGAIN;
2418         }
2419
2420         work = old_work = &req->work;
2421         io_fsync_finish(&work);
2422         if (work && work != old_work)
2423                 *nxt = container_of(work, struct io_kiocb, work);
2424         return 0;
2425 }
2426
2427 static void io_fallocate_finish(struct io_wq_work **workptr)
2428 {
2429         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2430         struct io_kiocb *nxt = NULL;
2431         int ret;
2432
2433         ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off,
2434                                 req->sync.len);
2435         if (ret < 0)
2436                 req_set_fail_links(req);
2437         io_cqring_add_event(req, ret);
2438         io_put_req_find_next(req, &nxt);
2439         if (nxt)
2440                 io_wq_assign_next(workptr, nxt);
2441 }
2442
2443 static int io_fallocate_prep(struct io_kiocb *req,
2444                              const struct io_uring_sqe *sqe)
2445 {
2446         if (sqe->ioprio || sqe->buf_index || sqe->rw_flags)
2447                 return -EINVAL;
2448
2449         req->sync.off = READ_ONCE(sqe->off);
2450         req->sync.len = READ_ONCE(sqe->addr);
2451         req->sync.mode = READ_ONCE(sqe->len);
2452         return 0;
2453 }
2454
2455 static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt,
2456                         bool force_nonblock)
2457 {
2458         struct io_wq_work *work, *old_work;
2459
2460         /* fallocate always requiring blocking context */
2461         if (force_nonblock) {
2462                 io_put_req(req);
2463                 req->work.func = io_fallocate_finish;
2464                 return -EAGAIN;
2465         }
2466
2467         work = old_work = &req->work;
2468         io_fallocate_finish(&work);
2469         if (work && work != old_work)
2470                 *nxt = container_of(work, struct io_kiocb, work);
2471
2472         return 0;
2473 }
2474
2475 static int io_openat_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2476 {
2477         const char __user *fname;
2478         int ret;
2479
2480         if (sqe->ioprio || sqe->buf_index)
2481                 return -EINVAL;
2482
2483         req->open.dfd = READ_ONCE(sqe->fd);
2484         req->open.how.mode = READ_ONCE(sqe->len);
2485         fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2486         req->open.how.flags = READ_ONCE(sqe->open_flags);
2487
2488         req->open.filename = getname(fname);
2489         if (IS_ERR(req->open.filename)) {
2490                 ret = PTR_ERR(req->open.filename);
2491                 req->open.filename = NULL;
2492                 return ret;
2493         }
2494
2495         return 0;
2496 }
2497
2498 static int io_openat2_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2499 {
2500         struct open_how __user *how;
2501         const char __user *fname;
2502         size_t len;
2503         int ret;
2504
2505         if (sqe->ioprio || sqe->buf_index)
2506                 return -EINVAL;
2507
2508         req->open.dfd = READ_ONCE(sqe->fd);
2509         fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2510         how = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2511         len = READ_ONCE(sqe->len);
2512
2513         if (len < OPEN_HOW_SIZE_VER0)
2514                 return -EINVAL;
2515
2516         ret = copy_struct_from_user(&req->open.how, sizeof(req->open.how), how,
2517                                         len);
2518         if (ret)
2519                 return ret;
2520
2521         if (!(req->open.how.flags & O_PATH) && force_o_largefile())
2522                 req->open.how.flags |= O_LARGEFILE;
2523
2524         req->open.filename = getname(fname);
2525         if (IS_ERR(req->open.filename)) {
2526                 ret = PTR_ERR(req->open.filename);
2527                 req->open.filename = NULL;
2528                 return ret;
2529         }
2530
2531         return 0;
2532 }
2533
2534 static int io_openat2(struct io_kiocb *req, struct io_kiocb **nxt,
2535                       bool force_nonblock)
2536 {
2537         struct open_flags op;
2538         struct file *file;
2539         int ret;
2540
2541         if (force_nonblock) {
2542                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2543                 return -EAGAIN;
2544         }
2545
2546         ret = build_open_flags(&req->open.how, &op);
2547         if (ret)
2548                 goto err;
2549
2550         ret = get_unused_fd_flags(req->open.how.flags);
2551         if (ret < 0)
2552                 goto err;
2553
2554         file = do_filp_open(req->open.dfd, req->open.filename, &op);
2555         if (IS_ERR(file)) {
2556                 put_unused_fd(ret);
2557                 ret = PTR_ERR(file);
2558         } else {
2559                 fsnotify_open(file);
2560                 fd_install(ret, file);
2561         }
2562 err:
2563         putname(req->open.filename);
2564         if (ret < 0)
2565                 req_set_fail_links(req);
2566         io_cqring_add_event(req, ret);
2567         io_put_req_find_next(req, nxt);
2568         return 0;
2569 }
2570
2571 static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt,
2572                      bool force_nonblock)
2573 {
2574         req->open.how = build_open_how(req->open.how.flags, req->open.how.mode);
2575         return io_openat2(req, nxt, force_nonblock);
2576 }
2577
2578 static int io_madvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2579 {
2580 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2581         if (sqe->ioprio || sqe->buf_index || sqe->off)
2582                 return -EINVAL;
2583
2584         req->madvise.addr = READ_ONCE(sqe->addr);
2585         req->madvise.len = READ_ONCE(sqe->len);
2586         req->madvise.advice = READ_ONCE(sqe->fadvise_advice);
2587         return 0;
2588 #else
2589         return -EOPNOTSUPP;
2590 #endif
2591 }
2592
2593 static int io_madvise(struct io_kiocb *req, struct io_kiocb **nxt,
2594                       bool force_nonblock)
2595 {
2596 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2597         struct io_madvise *ma = &req->madvise;
2598         int ret;
2599
2600         if (force_nonblock)
2601                 return -EAGAIN;
2602
2603         ret = do_madvise(ma->addr, ma->len, ma->advice);
2604         if (ret < 0)
2605                 req_set_fail_links(req);
2606         io_cqring_add_event(req, ret);
2607         io_put_req_find_next(req, nxt);
2608         return 0;
2609 #else
2610         return -EOPNOTSUPP;
2611 #endif
2612 }
2613
2614 static int io_fadvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2615 {
2616         if (sqe->ioprio || sqe->buf_index || sqe->addr)
2617                 return -EINVAL;
2618
2619         req->fadvise.offset = READ_ONCE(sqe->off);
2620         req->fadvise.len = READ_ONCE(sqe->len);
2621         req->fadvise.advice = READ_ONCE(sqe->fadvise_advice);
2622         return 0;
2623 }
2624
2625 static int io_fadvise(struct io_kiocb *req, struct io_kiocb **nxt,
2626                       bool force_nonblock)
2627 {
2628         struct io_fadvise *fa = &req->fadvise;
2629         int ret;
2630
2631         /* DONTNEED may block, others _should_ not */
2632         if (fa->advice == POSIX_FADV_DONTNEED && force_nonblock)
2633                 return -EAGAIN;
2634
2635         ret = vfs_fadvise(req->file, fa->offset, fa->len, fa->advice);
2636         if (ret < 0)
2637                 req_set_fail_links(req);
2638         io_cqring_add_event(req, ret);
2639         io_put_req_find_next(req, nxt);
2640         return 0;
2641 }
2642
2643 static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2644 {
2645         const char __user *fname;
2646         unsigned lookup_flags;
2647         int ret;
2648
2649         if (sqe->ioprio || sqe->buf_index)
2650                 return -EINVAL;
2651
2652         req->open.dfd = READ_ONCE(sqe->fd);
2653         req->open.mask = READ_ONCE(sqe->len);
2654         fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2655         req->open.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2656         req->open.how.flags = READ_ONCE(sqe->statx_flags);
2657
2658         if (vfs_stat_set_lookup_flags(&lookup_flags, req->open.how.flags))
2659                 return -EINVAL;
2660
2661         req->open.filename = getname_flags(fname, lookup_flags, NULL);
2662         if (IS_ERR(req->open.filename)) {
2663                 ret = PTR_ERR(req->open.filename);
2664                 req->open.filename = NULL;
2665                 return ret;
2666         }
2667
2668         return 0;
2669 }
2670
2671 static int io_statx(struct io_kiocb *req, struct io_kiocb **nxt,
2672                     bool force_nonblock)
2673 {
2674         struct io_open *ctx = &req->open;
2675         unsigned lookup_flags;
2676         struct path path;
2677         struct kstat stat;
2678         int ret;
2679
2680         if (force_nonblock)
2681                 return -EAGAIN;
2682
2683         if (vfs_stat_set_lookup_flags(&lookup_flags, ctx->how.flags))
2684                 return -EINVAL;
2685
2686 retry:
2687         /* filename_lookup() drops it, keep a reference */
2688         ctx->filename->refcnt++;
2689
2690         ret = filename_lookup(ctx->dfd, ctx->filename, lookup_flags, &path,
2691                                 NULL);
2692         if (ret)
2693                 goto err;
2694
2695         ret = vfs_getattr(&path, &stat, ctx->mask, ctx->how.flags);
2696         path_put(&path);
2697         if (retry_estale(ret, lookup_flags)) {
2698                 lookup_flags |= LOOKUP_REVAL;
2699                 goto retry;
2700         }
2701         if (!ret)
2702                 ret = cp_statx(&stat, ctx->buffer);
2703 err:
2704         putname(ctx->filename);
2705         if (ret < 0)
2706                 req_set_fail_links(req);
2707         io_cqring_add_event(req, ret);
2708         io_put_req_find_next(req, nxt);
2709         return 0;
2710 }
2711
2712 static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2713 {
2714         /*
2715          * If we queue this for async, it must not be cancellable. That would
2716          * leave the 'file' in an undeterminate state.
2717          */
2718         req->work.flags |= IO_WQ_WORK_NO_CANCEL;
2719
2720         if (sqe->ioprio || sqe->off || sqe->addr || sqe->len ||
2721             sqe->rw_flags || sqe->buf_index)
2722                 return -EINVAL;
2723         if (sqe->flags & IOSQE_FIXED_FILE)
2724                 return -EINVAL;
2725
2726         req->close.fd = READ_ONCE(sqe->fd);
2727         if (req->file->f_op == &io_uring_fops ||
2728             req->close.fd == req->ring_fd)
2729                 return -EBADF;
2730
2731         return 0;
2732 }
2733
2734 static void io_close_finish(struct io_wq_work **workptr)
2735 {
2736         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2737         struct io_kiocb *nxt = NULL;
2738
2739         /* Invoked with files, we need to do the close */
2740         if (req->work.files) {
2741                 int ret;
2742
2743                 ret = filp_close(req->close.put_file, req->work.files);
2744                 if (ret < 0) {
2745                         req_set_fail_links(req);
2746                 }
2747                 io_cqring_add_event(req, ret);
2748         }
2749
2750         fput(req->close.put_file);
2751
2752         /* we bypassed the re-issue, drop the submission reference */
2753         io_put_req(req);
2754         io_put_req_find_next(req, &nxt);
2755         if (nxt)
2756                 io_wq_assign_next(workptr, nxt);
2757 }
2758
2759 static int io_close(struct io_kiocb *req, struct io_kiocb **nxt,
2760                     bool force_nonblock)
2761 {
2762         int ret;
2763
2764         req->close.put_file = NULL;
2765         ret = __close_fd_get_file(req->close.fd, &req->close.put_file);
2766         if (ret < 0)
2767                 return ret;
2768
2769         /* if the file has a flush method, be safe and punt to async */
2770         if (req->close.put_file->f_op->flush && !io_wq_current_is_worker()) {
2771                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2772                 goto eagain;
2773         }
2774
2775         /*
2776          * No ->flush(), safely close from here and just punt the
2777          * fput() to async context.
2778          */
2779         ret = filp_close(req->close.put_file, current->files);
2780
2781         if (ret < 0)
2782                 req_set_fail_links(req);
2783         io_cqring_add_event(req, ret);
2784
2785         if (io_wq_current_is_worker()) {
2786                 struct io_wq_work *old_work, *work;
2787
2788                 old_work = work = &req->work;
2789                 io_close_finish(&work);
2790                 if (work && work != old_work)
2791                         *nxt = container_of(work, struct io_kiocb, work);
2792                 return 0;
2793         }
2794
2795 eagain:
2796         req->work.func = io_close_finish;
2797         return -EAGAIN;
2798 }
2799
2800 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2801 {
2802         struct io_ring_ctx *ctx = req->ctx;
2803
2804         if (!req->file)
2805                 return -EBADF;
2806
2807         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2808                 return -EINVAL;
2809         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2810                 return -EINVAL;
2811
2812         req->sync.off = READ_ONCE(sqe->off);
2813         req->sync.len = READ_ONCE(sqe->len);
2814         req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2815         return 0;
2816 }
2817
2818 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2819 {
2820         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2821         struct io_kiocb *nxt = NULL;
2822         int ret;
2823
2824         if (io_req_cancelled(req))
2825                 return;
2826
2827         ret = sync_file_range(req->file, req->sync.off, req->sync.len,
2828                                 req->sync.flags);
2829         if (ret < 0)
2830                 req_set_fail_links(req);
2831         io_cqring_add_event(req, ret);
2832         io_put_req_find_next(req, &nxt);
2833         if (nxt)
2834                 io_wq_assign_next(workptr, nxt);
2835 }
2836
2837 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2838                               bool force_nonblock)
2839 {
2840         struct io_wq_work *work, *old_work;
2841
2842         /* sync_file_range always requires a blocking context */
2843         if (force_nonblock) {
2844                 io_put_req(req);
2845                 req->work.func = io_sync_file_range_finish;
2846                 return -EAGAIN;
2847         }
2848
2849         work = old_work = &req->work;
2850         io_sync_file_range_finish(&work);
2851         if (work && work != old_work)
2852                 *nxt = container_of(work, struct io_kiocb, work);
2853         return 0;
2854 }
2855
2856 #if defined(CONFIG_NET)
2857 static void io_sendrecv_async(struct io_wq_work **workptr)
2858 {
2859         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2860         struct iovec *iov = NULL;
2861
2862         if (req->io->rw.iov != req->io->rw.fast_iov)
2863                 iov = req->io->msg.iov;
2864         io_wq_submit_work(workptr);
2865         kfree(iov);
2866 }
2867 #endif
2868
2869 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2870 {
2871 #if defined(CONFIG_NET)
2872         struct io_sr_msg *sr = &req->sr_msg;
2873         struct io_async_ctx *io = req->io;
2874
2875         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2876         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2877         sr->len = READ_ONCE(sqe->len);
2878
2879         if (!io || req->opcode == IORING_OP_SEND)
2880                 return 0;
2881
2882         io->msg.iov = io->msg.fast_iov;
2883         return sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2884                                         &io->msg.iov);
2885 #else
2886         return -EOPNOTSUPP;
2887 #endif
2888 }
2889
2890 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2891                       bool force_nonblock)
2892 {
2893 #if defined(CONFIG_NET)
2894         struct io_async_msghdr *kmsg = NULL;
2895         struct socket *sock;
2896         int ret;
2897
2898         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2899                 return -EINVAL;
2900
2901         sock = sock_from_file(req->file, &ret);
2902         if (sock) {
2903                 struct io_async_ctx io;
2904                 struct sockaddr_storage addr;
2905                 unsigned flags;
2906
2907                 if (req->io) {
2908                         kmsg = &req->io->msg;
2909                         kmsg->msg.msg_name = &addr;
2910                         /* if iov is set, it's allocated already */
2911                         if (!kmsg->iov)
2912                                 kmsg->iov = kmsg->fast_iov;
2913                         kmsg->msg.msg_iter.iov = kmsg->iov;
2914                 } else {
2915                         struct io_sr_msg *sr = &req->sr_msg;
2916
2917                         kmsg = &io.msg;
2918                         kmsg->msg.msg_name = &addr;
2919
2920                         io.msg.iov = io.msg.fast_iov;
2921                         ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
2922                                         sr->msg_flags, &io.msg.iov);
2923                         if (ret)
2924                                 return ret;
2925                 }
2926
2927                 flags = req->sr_msg.msg_flags;
2928                 if (flags & MSG_DONTWAIT)
2929                         req->flags |= REQ_F_NOWAIT;
2930                 else if (force_nonblock)
2931                         flags |= MSG_DONTWAIT;
2932
2933                 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2934                 if (force_nonblock && ret == -EAGAIN) {
2935                         if (req->io)
2936                                 return -EAGAIN;
2937                         if (io_alloc_async_ctx(req))
2938                                 return -ENOMEM;
2939                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2940                         req->work.func = io_sendrecv_async;
2941                         return -EAGAIN;
2942                 }
2943                 if (ret == -ERESTARTSYS)
2944                         ret = -EINTR;
2945         }
2946
2947         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2948                 kfree(kmsg->iov);
2949         io_cqring_add_event(req, ret);
2950         if (ret < 0)
2951                 req_set_fail_links(req);
2952         io_put_req_find_next(req, nxt);
2953         return 0;
2954 #else
2955         return -EOPNOTSUPP;
2956 #endif
2957 }
2958
2959 static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
2960                    bool force_nonblock)
2961 {
2962 #if defined(CONFIG_NET)
2963         struct socket *sock;
2964         int ret;
2965
2966         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2967                 return -EINVAL;
2968
2969         sock = sock_from_file(req->file, &ret);
2970         if (sock) {
2971                 struct io_sr_msg *sr = &req->sr_msg;
2972                 struct msghdr msg;
2973                 struct iovec iov;
2974                 unsigned flags;
2975
2976                 ret = import_single_range(WRITE, sr->buf, sr->len, &iov,
2977                                                 &msg.msg_iter);
2978                 if (ret)
2979                         return ret;
2980
2981                 msg.msg_name = NULL;
2982                 msg.msg_control = NULL;
2983                 msg.msg_controllen = 0;
2984                 msg.msg_namelen = 0;
2985
2986                 flags = req->sr_msg.msg_flags;
2987                 if (flags & MSG_DONTWAIT)
2988                         req->flags |= REQ_F_NOWAIT;
2989                 else if (force_nonblock)
2990                         flags |= MSG_DONTWAIT;
2991
2992                 ret = __sys_sendmsg_sock(sock, &msg, flags);
2993                 if (force_nonblock && ret == -EAGAIN)
2994                         return -EAGAIN;
2995                 if (ret == -ERESTARTSYS)
2996                         ret = -EINTR;
2997         }
2998
2999         io_cqring_add_event(req, ret);
3000         if (ret < 0)
3001                 req_set_fail_links(req);
3002         io_put_req_find_next(req, nxt);
3003         return 0;
3004 #else
3005         return -EOPNOTSUPP;
3006 #endif
3007 }
3008
3009 static int io_recvmsg_prep(struct io_kiocb *req,
3010                            const struct io_uring_sqe *sqe)
3011 {
3012 #if defined(CONFIG_NET)
3013         struct io_sr_msg *sr = &req->sr_msg;
3014         struct io_async_ctx *io = req->io;
3015
3016         sr->msg_flags = READ_ONCE(sqe->msg_flags);
3017         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
3018
3019         if (!io || req->opcode == IORING_OP_RECV)
3020                 return 0;
3021
3022         io->msg.iov = io->msg.fast_iov;
3023         return recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
3024                                         &io->msg.uaddr, &io->msg.iov);
3025 #else
3026         return -EOPNOTSUPP;
3027 #endif
3028 }
3029
3030 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
3031                       bool force_nonblock)
3032 {
3033 #if defined(CONFIG_NET)
3034         struct io_async_msghdr *kmsg = NULL;
3035         struct socket *sock;
3036         int ret;
3037
3038         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3039                 return -EINVAL;
3040
3041         sock = sock_from_file(req->file, &ret);
3042         if (sock) {
3043                 struct io_async_ctx io;
3044                 struct sockaddr_storage addr;
3045                 unsigned flags;
3046
3047                 if (req->io) {
3048                         kmsg = &req->io->msg;
3049                         kmsg->msg.msg_name = &addr;
3050                         /* if iov is set, it's allocated already */
3051                         if (!kmsg->iov)
3052                                 kmsg->iov = kmsg->fast_iov;
3053                         kmsg->msg.msg_iter.iov = kmsg->iov;
3054                 } else {
3055                         struct io_sr_msg *sr = &req->sr_msg;
3056
3057                         kmsg = &io.msg;
3058                         kmsg->msg.msg_name = &addr;
3059
3060                         io.msg.iov = io.msg.fast_iov;
3061                         ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg,
3062                                         sr->msg_flags, &io.msg.uaddr,
3063                                         &io.msg.iov);
3064                         if (ret)
3065                                 return ret;
3066                 }
3067
3068                 flags = req->sr_msg.msg_flags;
3069                 if (flags & MSG_DONTWAIT)
3070                         req->flags |= REQ_F_NOWAIT;
3071                 else if (force_nonblock)
3072                         flags |= MSG_DONTWAIT;
3073
3074                 ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
3075                                                 kmsg->uaddr, flags);
3076                 if (force_nonblock && ret == -EAGAIN) {
3077                         if (req->io)
3078                                 return -EAGAIN;
3079                         if (io_alloc_async_ctx(req))
3080                                 return -ENOMEM;
3081                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
3082                         req->work.func = io_sendrecv_async;
3083                         return -EAGAIN;
3084                 }
3085                 if (ret == -ERESTARTSYS)
3086                         ret = -EINTR;
3087         }
3088
3089         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
3090                 kfree(kmsg->iov);
3091         io_cqring_add_event(req, ret);
3092         if (ret < 0)
3093                 req_set_fail_links(req);
3094         io_put_req_find_next(req, nxt);
3095         return 0;
3096 #else
3097         return -EOPNOTSUPP;
3098 #endif
3099 }
3100
3101 static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt,
3102                    bool force_nonblock)
3103 {
3104 #if defined(CONFIG_NET)
3105         struct socket *sock;
3106         int ret;
3107
3108         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3109                 return -EINVAL;
3110
3111         sock = sock_from_file(req->file, &ret);
3112         if (sock) {
3113                 struct io_sr_msg *sr = &req->sr_msg;
3114                 struct msghdr msg;
3115                 struct iovec iov;
3116                 unsigned flags;
3117
3118                 ret = import_single_range(READ, sr->buf, sr->len, &iov,
3119                                                 &msg.msg_iter);
3120                 if (ret)
3121                         return ret;
3122
3123                 msg.msg_name = NULL;
3124                 msg.msg_control = NULL;
3125                 msg.msg_controllen = 0;
3126                 msg.msg_namelen = 0;
3127                 msg.msg_iocb = NULL;
3128                 msg.msg_flags = 0;
3129
3130                 flags = req->sr_msg.msg_flags;
3131                 if (flags & MSG_DONTWAIT)
3132                         req->flags |= REQ_F_NOWAIT;
3133                 else if (force_nonblock)
3134                         flags |= MSG_DONTWAIT;
3135
3136                 ret = __sys_recvmsg_sock(sock, &msg, NULL, NULL, flags);
3137                 if (force_nonblock && ret == -EAGAIN)
3138                         return -EAGAIN;
3139                 if (ret == -ERESTARTSYS)
3140                         ret = -EINTR;
3141         }
3142
3143         io_cqring_add_event(req, ret);
3144         if (ret < 0)
3145                 req_set_fail_links(req);
3146         io_put_req_find_next(req, nxt);
3147         return 0;
3148 #else
3149         return -EOPNOTSUPP;
3150 #endif
3151 }
3152
3153
3154 static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3155 {
3156 #if defined(CONFIG_NET)
3157         struct io_accept *accept = &req->accept;
3158
3159         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3160                 return -EINVAL;
3161         if (sqe->ioprio || sqe->len || sqe->buf_index)
3162                 return -EINVAL;
3163
3164         accept->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3165         accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2));
3166         accept->flags = READ_ONCE(sqe->accept_flags);
3167         return 0;
3168 #else
3169         return -EOPNOTSUPP;
3170 #endif
3171 }
3172
3173 #if defined(CONFIG_NET)
3174 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
3175                        bool force_nonblock)
3176 {
3177         struct io_accept *accept = &req->accept;
3178         unsigned file_flags;
3179         int ret;
3180
3181         file_flags = force_nonblock ? O_NONBLOCK : 0;
3182         ret = __sys_accept4_file(req->file, file_flags, accept->addr,
3183                                         accept->addr_len, accept->flags);
3184         if (ret == -EAGAIN && force_nonblock)
3185                 return -EAGAIN;
3186         if (ret == -ERESTARTSYS)
3187                 ret = -EINTR;
3188         if (ret < 0)
3189                 req_set_fail_links(req);
3190         io_cqring_add_event(req, ret);
3191         io_put_req_find_next(req, nxt);
3192         return 0;
3193 }
3194
3195 static void io_accept_finish(struct io_wq_work **workptr)
3196 {
3197         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3198         struct io_kiocb *nxt = NULL;
3199
3200         if (io_req_cancelled(req))
3201                 return;
3202         __io_accept(req, &nxt, false);
3203         if (nxt)
3204                 io_wq_assign_next(workptr, nxt);
3205 }
3206 #endif
3207
3208 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
3209                      bool force_nonblock)
3210 {
3211 #if defined(CONFIG_NET)
3212         int ret;
3213
3214         ret = __io_accept(req, nxt, force_nonblock);
3215         if (ret == -EAGAIN && force_nonblock) {
3216                 req->work.func = io_accept_finish;
3217                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3218                 io_put_req(req);
3219                 return -EAGAIN;
3220         }
3221         return 0;
3222 #else
3223         return -EOPNOTSUPP;
3224 #endif
3225 }
3226
3227 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3228 {
3229 #if defined(CONFIG_NET)
3230         struct io_connect *conn = &req->connect;
3231         struct io_async_ctx *io = req->io;
3232
3233         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3234                 return -EINVAL;
3235         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
3236                 return -EINVAL;
3237
3238         conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3239         conn->addr_len =  READ_ONCE(sqe->addr2);
3240
3241         if (!io)
3242                 return 0;
3243
3244         return move_addr_to_kernel(conn->addr, conn->addr_len,
3245                                         &io->connect.address);
3246 #else
3247         return -EOPNOTSUPP;
3248 #endif
3249 }
3250
3251 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
3252                       bool force_nonblock)
3253 {
3254 #if defined(CONFIG_NET)
3255         struct io_async_ctx __io, *io;
3256         unsigned file_flags;
3257         int ret;
3258
3259         if (req->io) {
3260                 io = req->io;
3261         } else {
3262                 ret = move_addr_to_kernel(req->connect.addr,
3263                                                 req->connect.addr_len,
3264                                                 &__io.connect.address);
3265                 if (ret)
3266                         goto out;
3267                 io = &__io;
3268         }
3269
3270         file_flags = force_nonblock ? O_NONBLOCK : 0;
3271
3272         ret = __sys_connect_file(req->file, &io->connect.address,
3273                                         req->connect.addr_len, file_flags);
3274         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
3275                 if (req->io)
3276                         return -EAGAIN;
3277                 if (io_alloc_async_ctx(req)) {
3278                         ret = -ENOMEM;
3279                         goto out;
3280                 }
3281                 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
3282                 return -EAGAIN;
3283         }
3284         if (ret == -ERESTARTSYS)
3285                 ret = -EINTR;
3286 out:
3287         if (ret < 0)
3288                 req_set_fail_links(req);
3289         io_cqring_add_event(req, ret);
3290         io_put_req_find_next(req, nxt);
3291         return 0;
3292 #else
3293         return -EOPNOTSUPP;
3294 #endif
3295 }
3296
3297 static void io_poll_remove_one(struct io_kiocb *req)
3298 {
3299         struct io_poll_iocb *poll = &req->poll;
3300
3301         spin_lock(&poll->head->lock);
3302         WRITE_ONCE(poll->canceled, true);
3303         if (!list_empty(&poll->wait.entry)) {
3304                 list_del_init(&poll->wait.entry);
3305                 io_queue_async_work(req);
3306         }
3307         spin_unlock(&poll->head->lock);
3308         hash_del(&req->hash_node);
3309 }
3310
3311 static void io_poll_remove_all(struct io_ring_ctx *ctx)
3312 {
3313         struct hlist_node *tmp;
3314         struct io_kiocb *req;
3315         int i;
3316
3317         spin_lock_irq(&ctx->completion_lock);
3318         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
3319                 struct hlist_head *list;
3320
3321                 list = &ctx->cancel_hash[i];
3322                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
3323                         io_poll_remove_one(req);
3324         }
3325         spin_unlock_irq(&ctx->completion_lock);
3326 }
3327
3328 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
3329 {
3330         struct hlist_head *list;
3331         struct io_kiocb *req;
3332
3333         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
3334         hlist_for_each_entry(req, list, hash_node) {
3335                 if (sqe_addr == req->user_data) {
3336                         io_poll_remove_one(req);
3337                         return 0;
3338                 }
3339         }
3340
3341         return -ENOENT;
3342 }
3343
3344 static int io_poll_remove_prep(struct io_kiocb *req,
3345                                const struct io_uring_sqe *sqe)
3346 {
3347         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3348                 return -EINVAL;
3349         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
3350             sqe->poll_events)
3351                 return -EINVAL;
3352
3353         req->poll.addr = READ_ONCE(sqe->addr);
3354         return 0;
3355 }
3356
3357 /*
3358  * Find a running poll command that matches one specified in sqe->addr,
3359  * and remove it if found.
3360  */
3361 static int io_poll_remove(struct io_kiocb *req)
3362 {
3363         struct io_ring_ctx *ctx = req->ctx;
3364         u64 addr;
3365         int ret;
3366
3367         addr = req->poll.addr;
3368         spin_lock_irq(&ctx->completion_lock);
3369         ret = io_poll_cancel(ctx, addr);
3370         spin_unlock_irq(&ctx->completion_lock);
3371
3372         io_cqring_add_event(req, ret);
3373         if (ret < 0)
3374                 req_set_fail_links(req);
3375         io_put_req(req);
3376         return 0;
3377 }
3378
3379 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
3380 {
3381         struct io_ring_ctx *ctx = req->ctx;
3382
3383         req->poll.done = true;
3384         if (error)
3385                 io_cqring_fill_event(req, error);
3386         else
3387                 io_cqring_fill_event(req, mangle_poll(mask));
3388         io_commit_cqring(ctx);
3389 }
3390
3391 static void io_poll_complete_work(struct io_wq_work **workptr)
3392 {
3393         struct io_wq_work *work = *workptr;
3394         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3395         struct io_poll_iocb *poll = &req->poll;
3396         struct poll_table_struct pt = { ._key = poll->events };
3397         struct io_ring_ctx *ctx = req->ctx;
3398         struct io_kiocb *nxt = NULL;
3399         __poll_t mask = 0;
3400         int ret = 0;
3401
3402         if (work->flags & IO_WQ_WORK_CANCEL) {
3403                 WRITE_ONCE(poll->canceled, true);
3404                 ret = -ECANCELED;
3405         } else if (READ_ONCE(poll->canceled)) {
3406                 ret = -ECANCELED;
3407         }
3408
3409         if (ret != -ECANCELED)
3410                 mask = vfs_poll(poll->file, &pt) & poll->events;
3411
3412         /*
3413          * Note that ->ki_cancel callers also delete iocb from active_reqs after
3414          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
3415          * synchronize with them.  In the cancellation case the list_del_init
3416          * itself is not actually needed, but harmless so we keep it in to
3417          * avoid further branches in the fast path.
3418          */
3419         spin_lock_irq(&ctx->completion_lock);
3420         if (!mask && ret != -ECANCELED) {
3421                 add_wait_queue(poll->head, &poll->wait);
3422                 spin_unlock_irq(&ctx->completion_lock);
3423                 return;
3424         }
3425         hash_del(&req->hash_node);
3426         io_poll_complete(req, mask, ret);
3427         spin_unlock_irq(&ctx->completion_lock);
3428
3429         io_cqring_ev_posted(ctx);
3430
3431         if (ret < 0)
3432                 req_set_fail_links(req);
3433         io_put_req_find_next(req, &nxt);
3434         if (nxt)
3435                 io_wq_assign_next(workptr, nxt);
3436 }
3437
3438 static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
3439 {
3440         struct io_kiocb *req, *tmp;
3441         struct req_batch rb;
3442
3443         rb.to_free = rb.need_iter = 0;
3444         spin_lock_irq(&ctx->completion_lock);
3445         llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
3446                 hash_del(&req->hash_node);
3447                 io_poll_complete(req, req->result, 0);
3448
3449                 if (refcount_dec_and_test(&req->refs) &&
3450                     !io_req_multi_free(&rb, req)) {
3451                         req->flags |= REQ_F_COMP_LOCKED;
3452                         io_free_req(req);
3453                 }
3454         }
3455         spin_unlock_irq(&ctx->completion_lock);
3456
3457         io_cqring_ev_posted(ctx);
3458         io_free_req_many(ctx, &rb);
3459 }
3460
3461 static void io_poll_flush(struct io_wq_work **workptr)
3462 {
3463         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3464         struct llist_node *nodes;
3465
3466         nodes = llist_del_all(&req->ctx->poll_llist);
3467         if (nodes)
3468                 __io_poll_flush(req->ctx, nodes);
3469 }
3470
3471 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
3472                         void *key)
3473 {
3474         struct io_poll_iocb *poll = wait->private;
3475         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
3476         struct io_ring_ctx *ctx = req->ctx;
3477         __poll_t mask = key_to_poll(key);
3478
3479         /* for instances that support it check for an event match first: */
3480         if (mask && !(mask & poll->events))
3481                 return 0;
3482
3483         list_del_init(&poll->wait.entry);
3484
3485         /*
3486          * Run completion inline if we can. We're using trylock here because
3487          * we are violating the completion_lock -> poll wq lock ordering.
3488          * If we have a link timeout we're going to need the completion_lock
3489          * for finalizing the request, mark us as having grabbed that already.
3490          */
3491         if (mask) {
3492                 unsigned long flags;
3493
3494                 if (llist_empty(&ctx->poll_llist) &&
3495                     spin_trylock_irqsave(&ctx->completion_lock, flags)) {
3496                         hash_del(&req->hash_node);
3497                         io_poll_complete(req, mask, 0);
3498                         req->flags |= REQ_F_COMP_LOCKED;
3499                         io_put_req(req);
3500                         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3501
3502                         io_cqring_ev_posted(ctx);
3503                         req = NULL;
3504                 } else {
3505                         req->result = mask;
3506                         req->llist_node.next = NULL;
3507                         /* if the list wasn't empty, we're done */
3508                         if (!llist_add(&req->llist_node, &ctx->poll_llist))
3509                                 req = NULL;
3510                         else
3511                                 req->work.func = io_poll_flush;
3512                 }
3513         }
3514         if (req)
3515                 io_queue_async_work(req);
3516
3517         return 1;
3518 }
3519
3520 struct io_poll_table {
3521         struct poll_table_struct pt;
3522         struct io_kiocb *req;
3523         int error;
3524 };
3525
3526 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
3527                                struct poll_table_struct *p)
3528 {
3529         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
3530
3531         if (unlikely(pt->req->poll.head)) {
3532                 pt->error = -EINVAL;
3533                 return;
3534         }
3535
3536         pt->error = 0;
3537         pt->req->poll.head = head;
3538         add_wait_queue(head, &pt->req->poll.wait);
3539 }
3540
3541 static void io_poll_req_insert(struct io_kiocb *req)
3542 {
3543         struct io_ring_ctx *ctx = req->ctx;
3544         struct hlist_head *list;
3545
3546         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
3547         hlist_add_head(&req->hash_node, list);
3548 }
3549
3550 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3551 {
3552         struct io_poll_iocb *poll = &req->poll;
3553         u16 events;
3554
3555         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3556                 return -EINVAL;
3557         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
3558                 return -EINVAL;
3559         if (!poll->file)
3560                 return -EBADF;
3561
3562         events = READ_ONCE(sqe->poll_events);
3563         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
3564         return 0;
3565 }
3566
3567 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
3568 {
3569         struct io_poll_iocb *poll = &req->poll;
3570         struct io_ring_ctx *ctx = req->ctx;
3571         struct io_poll_table ipt;
3572         bool cancel = false;
3573         __poll_t mask;
3574
3575         INIT_IO_WORK(&req->work, io_poll_complete_work);
3576         INIT_HLIST_NODE(&req->hash_node);
3577
3578         poll->head = NULL;
3579         poll->done = false;
3580         poll->canceled = false;
3581
3582         ipt.pt._qproc = io_poll_queue_proc;
3583         ipt.pt._key = poll->events;
3584         ipt.req = req;
3585         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
3586
3587         /* initialized the list so that we can do list_empty checks */
3588         INIT_LIST_HEAD(&poll->wait.entry);
3589         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
3590         poll->wait.private = poll;
3591
3592         INIT_LIST_HEAD(&req->list);
3593
3594         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
3595
3596         spin_lock_irq(&ctx->completion_lock);
3597         if (likely(poll->head)) {
3598                 spin_lock(&poll->head->lock);
3599                 if (unlikely(list_empty(&poll->wait.entry))) {
3600                         if (ipt.error)
3601                                 cancel = true;
3602                         ipt.error = 0;
3603                         mask = 0;
3604                 }
3605                 if (mask || ipt.error)
3606                         list_del_init(&poll->wait.entry);
3607                 else if (cancel)
3608                         WRITE_ONCE(poll->canceled, true);
3609                 else if (!poll->done) /* actually waiting for an event */
3610                         io_poll_req_insert(req);
3611                 spin_unlock(&poll->head->lock);
3612         }
3613         if (mask) { /* no async, we'd stolen it */
3614                 ipt.error = 0;
3615                 io_poll_complete(req, mask, 0);
3616         }
3617         spin_unlock_irq(&ctx->completion_lock);
3618
3619         if (mask) {
3620                 io_cqring_ev_posted(ctx);
3621                 io_put_req_find_next(req, nxt);
3622         }
3623         return ipt.error;
3624 }
3625
3626 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
3627 {
3628         struct io_timeout_data *data = container_of(timer,
3629                                                 struct io_timeout_data, timer);
3630         struct io_kiocb *req = data->req;
3631         struct io_ring_ctx *ctx = req->ctx;
3632         unsigned long flags;
3633
3634         atomic_inc(&ctx->cq_timeouts);
3635
3636         spin_lock_irqsave(&ctx->completion_lock, flags);
3637         /*
3638          * We could be racing with timeout deletion. If the list is empty,
3639          * then timeout lookup already found it and will be handling it.
3640          */
3641         if (!list_empty(&req->list)) {
3642                 struct io_kiocb *prev;
3643
3644                 /*
3645                  * Adjust the reqs sequence before the current one because it
3646                  * will consume a slot in the cq_ring and the cq_tail
3647                  * pointer will be increased, otherwise other timeout reqs may
3648                  * return in advance without waiting for enough wait_nr.
3649                  */
3650                 prev = req;
3651                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
3652                         prev->sequence++;
3653                 list_del_init(&req->list);
3654         }
3655
3656         io_cqring_fill_event(req, -ETIME);
3657         io_commit_cqring(ctx);
3658         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3659
3660         io_cqring_ev_posted(ctx);
3661         req_set_fail_links(req);
3662         io_put_req(req);
3663         return HRTIMER_NORESTART;
3664 }
3665
3666 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
3667 {
3668         struct io_kiocb *req;
3669         int ret = -ENOENT;
3670
3671         list_for_each_entry(req, &ctx->timeout_list, list) {
3672                 if (user_data == req->user_data) {
3673                         list_del_init(&req->list);
3674                         ret = 0;
3675                         break;
3676                 }
3677         }
3678
3679         if (ret == -ENOENT)
3680                 return ret;
3681
3682         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
3683         if (ret == -1)
3684                 return -EALREADY;
3685
3686         req_set_fail_links(req);
3687         io_cqring_fill_event(req, -ECANCELED);
3688         io_put_req(req);
3689         return 0;
3690 }
3691
3692 static int io_timeout_remove_prep(struct io_kiocb *req,
3693                                   const struct io_uring_sqe *sqe)
3694 {
3695         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3696                 return -EINVAL;
3697         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
3698                 return -EINVAL;
3699
3700         req->timeout.addr = READ_ONCE(sqe->addr);
3701         req->timeout.flags = READ_ONCE(sqe->timeout_flags);
3702         if (req->timeout.flags)
3703                 return -EINVAL;
3704
3705         return 0;
3706 }
3707
3708 /*
3709  * Remove or update an existing timeout command
3710  */
3711 static int io_timeout_remove(struct io_kiocb *req)
3712 {
3713         struct io_ring_ctx *ctx = req->ctx;
3714         int ret;
3715
3716         spin_lock_irq(&ctx->completion_lock);
3717         ret = io_timeout_cancel(ctx, req->timeout.addr);
3718
3719         io_cqring_fill_event(req, ret);
3720         io_commit_cqring(ctx);
3721         spin_unlock_irq(&ctx->completion_lock);
3722         io_cqring_ev_posted(ctx);
3723         if (ret < 0)
3724                 req_set_fail_links(req);
3725         io_put_req(req);
3726         return 0;
3727 }
3728
3729 static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3730                            bool is_timeout_link)
3731 {
3732         struct io_timeout_data *data;
3733         unsigned flags;
3734
3735         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3736                 return -EINVAL;
3737         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
3738                 return -EINVAL;
3739         if (sqe->off && is_timeout_link)
3740                 return -EINVAL;
3741         flags = READ_ONCE(sqe->timeout_flags);
3742         if (flags & ~IORING_TIMEOUT_ABS)
3743                 return -EINVAL;
3744
3745         req->timeout.count = READ_ONCE(sqe->off);
3746
3747         if (!req->io && io_alloc_async_ctx(req))
3748                 return -ENOMEM;
3749
3750         data = &req->io->timeout;
3751         data->req = req;
3752         req->flags |= REQ_F_TIMEOUT;
3753
3754         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
3755                 return -EFAULT;
3756
3757         if (flags & IORING_TIMEOUT_ABS)
3758                 data->mode = HRTIMER_MODE_ABS;
3759         else
3760                 data->mode = HRTIMER_MODE_REL;
3761
3762         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
3763         return 0;
3764 }
3765
3766 static int io_timeout(struct io_kiocb *req)
3767 {
3768         unsigned count;
3769         struct io_ring_ctx *ctx = req->ctx;
3770         struct io_timeout_data *data;
3771         struct list_head *entry;
3772         unsigned span = 0;
3773
3774         data = &req->io->timeout;
3775
3776         /*
3777          * sqe->off holds how many events that need to occur for this
3778          * timeout event to be satisfied. If it isn't set, then this is
3779          * a pure timeout request, sequence isn't used.
3780          */
3781         count = req->timeout.count;
3782         if (!count) {
3783                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
3784                 spin_lock_irq(&ctx->completion_lock);
3785                 entry = ctx->timeout_list.prev;
3786                 goto add;
3787         }
3788
3789         req->sequence = ctx->cached_sq_head + count - 1;
3790         data->seq_offset = count;
3791
3792         /*
3793          * Insertion sort, ensuring the first entry in the list is always
3794          * the one we need first.
3795          */
3796         spin_lock_irq(&ctx->completion_lock);
3797         list_for_each_prev(entry, &ctx->timeout_list) {
3798                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
3799                 unsigned nxt_sq_head;
3800                 long long tmp, tmp_nxt;
3801                 u32 nxt_offset = nxt->io->timeout.seq_offset;
3802
3803                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
3804                         continue;
3805
3806                 /*
3807                  * Since cached_sq_head + count - 1 can overflow, use type long
3808                  * long to store it.
3809                  */
3810                 tmp = (long long)ctx->cached_sq_head + count - 1;
3811                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
3812                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
3813
3814                 /*
3815                  * cached_sq_head may overflow, and it will never overflow twice
3816                  * once there is some timeout req still be valid.
3817                  */
3818                 if (ctx->cached_sq_head < nxt_sq_head)
3819                         tmp += UINT_MAX;
3820
3821                 if (tmp > tmp_nxt)
3822                         break;
3823
3824                 /*
3825                  * Sequence of reqs after the insert one and itself should
3826                  * be adjusted because each timeout req consumes a slot.
3827                  */
3828                 span++;
3829                 nxt->sequence++;
3830         }
3831         req->sequence -= span;
3832 add:
3833         list_add(&req->list, entry);
3834         data->timer.function = io_timeout_fn;
3835         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
3836         spin_unlock_irq(&ctx->completion_lock);
3837         return 0;
3838 }
3839
3840 static bool io_cancel_cb(struct io_wq_work *work, void *data)
3841 {
3842         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3843
3844         return req->user_data == (unsigned long) data;
3845 }
3846
3847 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
3848 {
3849         enum io_wq_cancel cancel_ret;
3850         int ret = 0;
3851
3852         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
3853         switch (cancel_ret) {
3854         case IO_WQ_CANCEL_OK:
3855                 ret = 0;
3856                 break;
3857         case IO_WQ_CANCEL_RUNNING:
3858                 ret = -EALREADY;
3859                 break;
3860         case IO_WQ_CANCEL_NOTFOUND:
3861                 ret = -ENOENT;
3862                 break;
3863         }
3864
3865         return ret;
3866 }
3867
3868 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
3869                                      struct io_kiocb *req, __u64 sqe_addr,
3870                                      struct io_kiocb **nxt, int success_ret)
3871 {
3872         unsigned long flags;
3873         int ret;
3874
3875         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
3876         if (ret != -ENOENT) {
3877                 spin_lock_irqsave(&ctx->completion_lock, flags);
3878                 goto done;
3879         }
3880
3881         spin_lock_irqsave(&ctx->completion_lock, flags);
3882         ret = io_timeout_cancel(ctx, sqe_addr);
3883         if (ret != -ENOENT)
3884                 goto done;
3885         ret = io_poll_cancel(ctx, sqe_addr);
3886 done:
3887         if (!ret)
3888                 ret = success_ret;
3889         io_cqring_fill_event(req, ret);
3890         io_commit_cqring(ctx);
3891         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3892         io_cqring_ev_posted(ctx);
3893
3894         if (ret < 0)
3895                 req_set_fail_links(req);
3896         io_put_req_find_next(req, nxt);
3897 }
3898
3899 static int io_async_cancel_prep(struct io_kiocb *req,
3900                                 const struct io_uring_sqe *sqe)
3901 {
3902         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3903                 return -EINVAL;
3904         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3905             sqe->cancel_flags)
3906                 return -EINVAL;
3907
3908         req->cancel.addr = READ_ONCE(sqe->addr);
3909         return 0;
3910 }
3911
3912 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3913 {
3914         struct io_ring_ctx *ctx = req->ctx;
3915
3916         io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0);
3917         return 0;
3918 }
3919
3920 static int io_files_update_prep(struct io_kiocb *req,
3921                                 const struct io_uring_sqe *sqe)
3922 {
3923         if (sqe->flags || sqe->ioprio || sqe->rw_flags)
3924                 return -EINVAL;
3925
3926         req->files_update.offset = READ_ONCE(sqe->off);
3927         req->files_update.nr_args = READ_ONCE(sqe->len);
3928         if (!req->files_update.nr_args)
3929                 return -EINVAL;
3930         req->files_update.arg = READ_ONCE(sqe->addr);
3931         return 0;
3932 }
3933
3934 static int io_files_update(struct io_kiocb *req, bool force_nonblock)
3935 {
3936         struct io_ring_ctx *ctx = req->ctx;
3937         struct io_uring_files_update up;
3938         int ret;
3939
3940         if (force_nonblock) {
3941                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3942                 return -EAGAIN;
3943         }
3944
3945         up.offset = req->files_update.offset;
3946         up.fds = req->files_update.arg;
3947
3948         mutex_lock(&ctx->uring_lock);
3949         ret = __io_sqe_files_update(ctx, &up, req->files_update.nr_args);
3950         mutex_unlock(&ctx->uring_lock);
3951
3952         if (ret < 0)
3953                 req_set_fail_links(req);
3954         io_cqring_add_event(req, ret);
3955         io_put_req(req);
3956         return 0;
3957 }
3958
3959 static int io_req_defer_prep(struct io_kiocb *req,
3960                              const struct io_uring_sqe *sqe)
3961 {
3962         ssize_t ret = 0;
3963
3964         switch (req->opcode) {
3965         case IORING_OP_NOP:
3966                 break;
3967         case IORING_OP_READV:
3968         case IORING_OP_READ_FIXED:
3969         case IORING_OP_READ:
3970                 ret = io_read_prep(req, sqe, true);
3971                 break;
3972         case IORING_OP_WRITEV:
3973         case IORING_OP_WRITE_FIXED:
3974         case IORING_OP_WRITE:
3975                 ret = io_write_prep(req, sqe, true);
3976                 break;
3977         case IORING_OP_POLL_ADD:
3978                 ret = io_poll_add_prep(req, sqe);
3979                 break;
3980         case IORING_OP_POLL_REMOVE:
3981                 ret = io_poll_remove_prep(req, sqe);
3982                 break;
3983         case IORING_OP_FSYNC:
3984                 ret = io_prep_fsync(req, sqe);
3985                 break;
3986         case IORING_OP_SYNC_FILE_RANGE:
3987                 ret = io_prep_sfr(req, sqe);
3988                 break;
3989         case IORING_OP_SENDMSG:
3990         case IORING_OP_SEND:
3991                 ret = io_sendmsg_prep(req, sqe);
3992                 break;
3993         case IORING_OP_RECVMSG:
3994         case IORING_OP_RECV:
3995                 ret = io_recvmsg_prep(req, sqe);
3996                 break;
3997         case IORING_OP_CONNECT:
3998                 ret = io_connect_prep(req, sqe);
3999                 break;
4000         case IORING_OP_TIMEOUT:
4001                 ret = io_timeout_prep(req, sqe, false);
4002                 break;
4003         case IORING_OP_TIMEOUT_REMOVE:
4004                 ret = io_timeout_remove_prep(req, sqe);
4005                 break;
4006         case IORING_OP_ASYNC_CANCEL:
4007                 ret = io_async_cancel_prep(req, sqe);
4008                 break;
4009         case IORING_OP_LINK_TIMEOUT:
4010                 ret = io_timeout_prep(req, sqe, true);
4011                 break;
4012         case IORING_OP_ACCEPT:
4013                 ret = io_accept_prep(req, sqe);
4014                 break;
4015         case IORING_OP_FALLOCATE:
4016                 ret = io_fallocate_prep(req, sqe);
4017                 break;
4018         case IORING_OP_OPENAT:
4019                 ret = io_openat_prep(req, sqe);
4020                 break;
4021         case IORING_OP_CLOSE:
4022                 ret = io_close_prep(req, sqe);
4023                 break;
4024         case IORING_OP_FILES_UPDATE:
4025                 ret = io_files_update_prep(req, sqe);
4026                 break;
4027         case IORING_OP_STATX:
4028                 ret = io_statx_prep(req, sqe);
4029                 break;
4030         case IORING_OP_FADVISE:
4031                 ret = io_fadvise_prep(req, sqe);
4032                 break;
4033         case IORING_OP_MADVISE:
4034                 ret = io_madvise_prep(req, sqe);
4035                 break;
4036         case IORING_OP_OPENAT2:
4037                 ret = io_openat2_prep(req, sqe);
4038                 break;
4039         default:
4040                 printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
4041                                 req->opcode);
4042                 ret = -EINVAL;
4043                 break;
4044         }
4045
4046         return ret;
4047 }
4048
4049 static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4050 {
4051         struct io_ring_ctx *ctx = req->ctx;
4052         int ret;
4053
4054         /* Still need defer if there is pending req in defer list. */
4055         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
4056                 return 0;
4057
4058         if (!req->io && io_alloc_async_ctx(req))
4059                 return -EAGAIN;
4060
4061         ret = io_req_defer_prep(req, sqe);
4062         if (ret < 0)
4063                 return ret;
4064
4065         spin_lock_irq(&ctx->completion_lock);
4066         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
4067                 spin_unlock_irq(&ctx->completion_lock);
4068                 return 0;
4069         }
4070
4071         trace_io_uring_defer(ctx, req, req->user_data);
4072         list_add_tail(&req->list, &ctx->defer_list);
4073         spin_unlock_irq(&ctx->completion_lock);
4074         return -EIOCBQUEUED;
4075 }
4076
4077 static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4078                         struct io_kiocb **nxt, bool force_nonblock)
4079 {
4080         struct io_ring_ctx *ctx = req->ctx;
4081         int ret;
4082
4083         switch (req->opcode) {
4084         case IORING_OP_NOP:
4085                 ret = io_nop(req);
4086                 break;
4087         case IORING_OP_READV:
4088         case IORING_OP_READ_FIXED:
4089         case IORING_OP_READ:
4090                 if (sqe) {
4091                         ret = io_read_prep(req, sqe, force_nonblock);
4092                         if (ret < 0)
4093                                 break;
4094                 }
4095                 ret = io_read(req, nxt, force_nonblock);
4096                 break;
4097         case IORING_OP_WRITEV:
4098         case IORING_OP_WRITE_FIXED:
4099         case IORING_OP_WRITE:
4100                 if (sqe) {
4101                         ret = io_write_prep(req, sqe, force_nonblock);
4102                         if (ret < 0)
4103                                 break;
4104                 }
4105                 ret = io_write(req, nxt, force_nonblock);
4106                 break;
4107         case IORING_OP_FSYNC:
4108                 if (sqe) {
4109                         ret = io_prep_fsync(req, sqe);
4110                         if (ret < 0)
4111                                 break;
4112                 }
4113                 ret = io_fsync(req, nxt, force_nonblock);
4114                 break;
4115         case IORING_OP_POLL_ADD:
4116                 if (sqe) {
4117                         ret = io_poll_add_prep(req, sqe);
4118                         if (ret)
4119                                 break;
4120                 }
4121                 ret = io_poll_add(req, nxt);
4122                 break;
4123         case IORING_OP_POLL_REMOVE:
4124                 if (sqe) {
4125                         ret = io_poll_remove_prep(req, sqe);
4126                         if (ret < 0)
4127                                 break;
4128                 }
4129                 ret = io_poll_remove(req);
4130                 break;
4131         case IORING_OP_SYNC_FILE_RANGE:
4132                 if (sqe) {
4133                         ret = io_prep_sfr(req, sqe);
4134                         if (ret < 0)
4135                                 break;
4136                 }
4137                 ret = io_sync_file_range(req, nxt, force_nonblock);
4138                 break;
4139         case IORING_OP_SENDMSG:
4140         case IORING_OP_SEND:
4141                 if (sqe) {
4142                         ret = io_sendmsg_prep(req, sqe);
4143                         if (ret < 0)
4144                                 break;
4145                 }
4146                 if (req->opcode == IORING_OP_SENDMSG)
4147                         ret = io_sendmsg(req, nxt, force_nonblock);
4148                 else
4149                         ret = io_send(req, nxt, force_nonblock);
4150                 break;
4151         case IORING_OP_RECVMSG:
4152         case IORING_OP_RECV:
4153                 if (sqe) {
4154                         ret = io_recvmsg_prep(req, sqe);
4155                         if (ret)
4156                                 break;
4157                 }
4158                 if (req->opcode == IORING_OP_RECVMSG)
4159                         ret = io_recvmsg(req, nxt, force_nonblock);
4160                 else
4161                         ret = io_recv(req, nxt, force_nonblock);
4162                 break;
4163         case IORING_OP_TIMEOUT:
4164                 if (sqe) {
4165                         ret = io_timeout_prep(req, sqe, false);
4166                         if (ret)
4167                                 break;
4168                 }
4169                 ret = io_timeout(req);
4170                 break;
4171         case IORING_OP_TIMEOUT_REMOVE:
4172                 if (sqe) {
4173                         ret = io_timeout_remove_prep(req, sqe);
4174                         if (ret)
4175                                 break;
4176                 }
4177                 ret = io_timeout_remove(req);
4178                 break;
4179         case IORING_OP_ACCEPT:
4180                 if (sqe) {
4181                         ret = io_accept_prep(req, sqe);
4182                         if (ret)
4183                                 break;
4184                 }
4185                 ret = io_accept(req, nxt, force_nonblock);
4186                 break;
4187         case IORING_OP_CONNECT:
4188                 if (sqe) {
4189                         ret = io_connect_prep(req, sqe);
4190                         if (ret)
4191                                 break;
4192                 }
4193                 ret = io_connect(req, nxt, force_nonblock);
4194                 break;
4195         case IORING_OP_ASYNC_CANCEL:
4196                 if (sqe) {
4197                         ret = io_async_cancel_prep(req, sqe);
4198                         if (ret)
4199                                 break;
4200                 }
4201                 ret = io_async_cancel(req, nxt);
4202                 break;
4203         case IORING_OP_FALLOCATE:
4204                 if (sqe) {
4205                         ret = io_fallocate_prep(req, sqe);
4206                         if (ret)
4207                                 break;
4208                 }
4209                 ret = io_fallocate(req, nxt, force_nonblock);
4210                 break;
4211         case IORING_OP_OPENAT:
4212                 if (sqe) {
4213                         ret = io_openat_prep(req, sqe);
4214                         if (ret)
4215                                 break;
4216                 }
4217                 ret = io_openat(req, nxt, force_nonblock);
4218                 break;
4219         case IORING_OP_CLOSE:
4220                 if (sqe) {
4221                         ret = io_close_prep(req, sqe);
4222                         if (ret)
4223                                 break;
4224                 }
4225                 ret = io_close(req, nxt, force_nonblock);
4226                 break;
4227         case IORING_OP_FILES_UPDATE:
4228                 if (sqe) {
4229                         ret = io_files_update_prep(req, sqe);
4230                         if (ret)
4231                                 break;
4232                 }
4233                 ret = io_files_update(req, force_nonblock);
4234                 break;
4235         case IORING_OP_STATX:
4236                 if (sqe) {
4237                         ret = io_statx_prep(req, sqe);
4238                         if (ret)
4239                                 break;
4240                 }
4241                 ret = io_statx(req, nxt, force_nonblock);
4242                 break;
4243         case IORING_OP_FADVISE:
4244                 if (sqe) {
4245                         ret = io_fadvise_prep(req, sqe);
4246                         if (ret)
4247                                 break;
4248                 }
4249                 ret = io_fadvise(req, nxt, force_nonblock);
4250                 break;
4251         case IORING_OP_MADVISE:
4252                 if (sqe) {
4253                         ret = io_madvise_prep(req, sqe);
4254                         if (ret)
4255                                 break;
4256                 }
4257                 ret = io_madvise(req, nxt, force_nonblock);
4258                 break;
4259         case IORING_OP_OPENAT2:
4260                 if (sqe) {
4261                         ret = io_openat2_prep(req, sqe);
4262                         if (ret)
4263                                 break;
4264                 }
4265                 ret = io_openat2(req, nxt, force_nonblock);
4266                 break;
4267         default:
4268                 ret = -EINVAL;
4269                 break;
4270         }
4271
4272         if (ret)
4273                 return ret;
4274
4275         if (ctx->flags & IORING_SETUP_IOPOLL) {
4276                 const bool in_async = io_wq_current_is_worker();
4277
4278                 if (req->result == -EAGAIN)
4279                         return -EAGAIN;
4280
4281                 /* workqueue context doesn't hold uring_lock, grab it now */
4282                 if (in_async)
4283                         mutex_lock(&ctx->uring_lock);
4284
4285                 io_iopoll_req_issued(req);
4286
4287                 if (in_async)
4288                         mutex_unlock(&ctx->uring_lock);
4289         }
4290
4291         return 0;
4292 }
4293
4294 static void io_wq_submit_work(struct io_wq_work **workptr)
4295 {
4296         struct io_wq_work *work = *workptr;
4297         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4298         struct io_kiocb *nxt = NULL;
4299         int ret = 0;
4300
4301         /* if NO_CANCEL is set, we must still run the work */
4302         if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
4303                                 IO_WQ_WORK_CANCEL) {
4304                 ret = -ECANCELED;
4305         }
4306
4307         if (!ret) {
4308                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
4309                 req->in_async = true;
4310                 do {
4311                         ret = io_issue_sqe(req, NULL, &nxt, false);
4312                         /*
4313                          * We can get EAGAIN for polled IO even though we're
4314                          * forcing a sync submission from here, since we can't
4315                          * wait for request slots on the block side.
4316                          */
4317                         if (ret != -EAGAIN)
4318                                 break;
4319                         cond_resched();
4320                 } while (1);
4321         }
4322
4323         /* drop submission reference */
4324         io_put_req(req);
4325
4326         if (ret) {
4327                 req_set_fail_links(req);
4328                 io_cqring_add_event(req, ret);
4329                 io_put_req(req);
4330         }
4331
4332         /* if a dependent link is ready, pass it back */
4333         if (!ret && nxt)
4334                 io_wq_assign_next(workptr, nxt);
4335 }
4336
4337 static int io_req_needs_file(struct io_kiocb *req, int fd)
4338 {
4339         if (!io_op_defs[req->opcode].needs_file)
4340                 return 0;
4341         if (fd == -1 && io_op_defs[req->opcode].fd_non_neg)
4342                 return 0;
4343         return 1;
4344 }
4345
4346 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
4347                                               int index)
4348 {
4349         struct fixed_file_table *table;
4350
4351         table = &ctx->file_data->table[index >> IORING_FILE_TABLE_SHIFT];
4352         return table->files[index & IORING_FILE_TABLE_MASK];;
4353 }
4354
4355 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req,
4356                            const struct io_uring_sqe *sqe)
4357 {
4358         struct io_ring_ctx *ctx = req->ctx;
4359         unsigned flags;
4360         int fd;
4361
4362         flags = READ_ONCE(sqe->flags);
4363         fd = READ_ONCE(sqe->fd);
4364
4365         if (flags & IOSQE_IO_DRAIN)
4366                 req->flags |= REQ_F_IO_DRAIN;
4367
4368         if (!io_req_needs_file(req, fd))
4369                 return 0;
4370
4371         if (flags & IOSQE_FIXED_FILE) {
4372                 if (unlikely(!ctx->file_data ||
4373                     (unsigned) fd >= ctx->nr_user_files))
4374                         return -EBADF;
4375                 fd = array_index_nospec(fd, ctx->nr_user_files);
4376                 req->file = io_file_from_index(ctx, fd);
4377                 if (!req->file)
4378                         return -EBADF;
4379                 req->flags |= REQ_F_FIXED_FILE;
4380                 percpu_ref_get(&ctx->file_data->refs);
4381         } else {
4382                 if (req->needs_fixed_file)
4383                         return -EBADF;
4384                 trace_io_uring_file_get(ctx, fd);
4385                 req->file = io_file_get(state, fd);
4386                 if (unlikely(!req->file))
4387                         return -EBADF;
4388         }
4389
4390         return 0;
4391 }
4392
4393 static int io_grab_files(struct io_kiocb *req)
4394 {
4395         int ret = -EBADF;
4396         struct io_ring_ctx *ctx = req->ctx;
4397
4398         if (!req->ring_file)
4399                 return -EBADF;
4400
4401         rcu_read_lock();
4402         spin_lock_irq(&ctx->inflight_lock);
4403         /*
4404          * We use the f_ops->flush() handler to ensure that we can flush
4405          * out work accessing these files if the fd is closed. Check if
4406          * the fd has changed since we started down this path, and disallow
4407          * this operation if it has.
4408          */
4409         if (fcheck(req->ring_fd) == req->ring_file) {
4410                 list_add(&req->inflight_entry, &ctx->inflight_list);
4411                 req->flags |= REQ_F_INFLIGHT;
4412                 req->work.files = current->files;
4413                 ret = 0;
4414         }
4415         spin_unlock_irq(&ctx->inflight_lock);
4416         rcu_read_unlock();
4417
4418         return ret;
4419 }
4420
4421 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
4422 {
4423         struct io_timeout_data *data = container_of(timer,
4424                                                 struct io_timeout_data, timer);
4425         struct io_kiocb *req = data->req;
4426         struct io_ring_ctx *ctx = req->ctx;
4427         struct io_kiocb *prev = NULL;
4428         unsigned long flags;
4429
4430         spin_lock_irqsave(&ctx->completion_lock, flags);
4431
4432         /*
4433          * We don't expect the list to be empty, that will only happen if we
4434          * race with the completion of the linked work.
4435          */
4436         if (!list_empty(&req->link_list)) {
4437                 prev = list_entry(req->link_list.prev, struct io_kiocb,
4438                                   link_list);
4439                 if (refcount_inc_not_zero(&prev->refs)) {
4440                         list_del_init(&req->link_list);
4441                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
4442                 } else
4443                         prev = NULL;
4444         }
4445
4446         spin_unlock_irqrestore(&ctx->completion_lock, flags);
4447
4448         if (prev) {
4449                 req_set_fail_links(prev);
4450                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
4451                                                 -ETIME);
4452                 io_put_req(prev);
4453         } else {
4454                 io_cqring_add_event(req, -ETIME);
4455                 io_put_req(req);
4456         }
4457         return HRTIMER_NORESTART;
4458 }
4459
4460 static void io_queue_linked_timeout(struct io_kiocb *req)
4461 {
4462         struct io_ring_ctx *ctx = req->ctx;
4463
4464         /*
4465          * If the list is now empty, then our linked request finished before
4466          * we got a chance to setup the timer
4467          */
4468         spin_lock_irq(&ctx->completion_lock);
4469         if (!list_empty(&req->link_list)) {
4470                 struct io_timeout_data *data = &req->io->timeout;
4471
4472                 data->timer.function = io_link_timeout_fn;
4473                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
4474                                 data->mode);
4475         }
4476         spin_unlock_irq(&ctx->completion_lock);
4477
4478         /* drop submission reference */
4479         io_put_req(req);
4480 }
4481
4482 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
4483 {
4484         struct io_kiocb *nxt;
4485
4486         if (!(req->flags & REQ_F_LINK))
4487                 return NULL;
4488
4489         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
4490                                         link_list);
4491         if (!nxt || nxt->opcode != IORING_OP_LINK_TIMEOUT)
4492                 return NULL;
4493
4494         req->flags |= REQ_F_LINK_TIMEOUT;
4495         return nxt;
4496 }
4497
4498 static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4499 {
4500         struct io_kiocb *linked_timeout;
4501         struct io_kiocb *nxt = NULL;
4502         int ret;
4503
4504 again:
4505         linked_timeout = io_prep_linked_timeout(req);
4506
4507         ret = io_issue_sqe(req, sqe, &nxt, true);
4508
4509         /*
4510          * We async punt it if the file wasn't marked NOWAIT, or if the file
4511          * doesn't support non-blocking read/write attempts
4512          */
4513         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
4514             (req->flags & REQ_F_MUST_PUNT))) {
4515                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
4516                         ret = io_grab_files(req);
4517                         if (ret)
4518                                 goto err;
4519                 }
4520
4521                 /*
4522                  * Queued up for async execution, worker will release
4523                  * submit reference when the iocb is actually submitted.
4524                  */
4525                 io_queue_async_work(req);
4526                 goto done_req;
4527         }
4528
4529 err:
4530         /* drop submission reference */
4531         io_put_req(req);
4532
4533         if (linked_timeout) {
4534                 if (!ret)
4535                         io_queue_linked_timeout(linked_timeout);
4536                 else
4537                         io_put_req(linked_timeout);
4538         }
4539
4540         /* and drop final reference, if we failed */
4541         if (ret) {
4542                 io_cqring_add_event(req, ret);
4543                 req_set_fail_links(req);
4544                 io_put_req(req);
4545         }
4546 done_req:
4547         if (nxt) {
4548                 req = nxt;
4549                 nxt = NULL;
4550                 goto again;
4551         }
4552 }
4553
4554 static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4555 {
4556         int ret;
4557
4558         ret = io_req_defer(req, sqe);
4559         if (ret) {
4560                 if (ret != -EIOCBQUEUED) {
4561                         io_cqring_add_event(req, ret);
4562                         req_set_fail_links(req);
4563                         io_double_put_req(req);
4564                 }
4565         } else if (req->flags & REQ_F_FORCE_ASYNC) {
4566                 /*
4567                  * Never try inline submit of IOSQE_ASYNC is set, go straight
4568                  * to async execution.
4569                  */
4570                 req->work.flags |= IO_WQ_WORK_CONCURRENT;
4571                 io_queue_async_work(req);
4572         } else {
4573                 __io_queue_sqe(req, sqe);
4574         }
4575 }
4576
4577 static inline void io_queue_link_head(struct io_kiocb *req)
4578 {
4579         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
4580                 io_cqring_add_event(req, -ECANCELED);
4581                 io_double_put_req(req);
4582         } else
4583                 io_queue_sqe(req, NULL);
4584 }
4585
4586 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
4587                                 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
4588
4589 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4590                           struct io_submit_state *state, struct io_kiocb **link)
4591 {
4592         struct io_ring_ctx *ctx = req->ctx;
4593         unsigned int sqe_flags;
4594         int ret;
4595
4596         sqe_flags = READ_ONCE(sqe->flags);
4597
4598         /* enforce forwards compatibility on users */
4599         if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) {
4600                 ret = -EINVAL;
4601                 goto err_req;
4602         }
4603         if (sqe_flags & IOSQE_ASYNC)
4604                 req->flags |= REQ_F_FORCE_ASYNC;
4605
4606         ret = io_req_set_file(state, req, sqe);
4607         if (unlikely(ret)) {
4608 err_req:
4609                 io_cqring_add_event(req, ret);
4610                 io_double_put_req(req);
4611                 return false;
4612         }
4613
4614         /*
4615          * If we already have a head request, queue this one for async
4616          * submittal once the head completes. If we don't have a head but
4617          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
4618          * submitted sync once the chain is complete. If none of those
4619          * conditions are true (normal request), then just queue it.
4620          */
4621         if (*link) {
4622                 struct io_kiocb *head = *link;
4623
4624                 if (sqe_flags & IOSQE_IO_DRAIN) {
4625                         head->flags |= REQ_F_IO_DRAIN;
4626                         ctx->drain_next = 1;
4627                 }
4628
4629                 if (sqe_flags & IOSQE_IO_HARDLINK)
4630                         req->flags |= REQ_F_HARDLINK;
4631
4632                 if (io_alloc_async_ctx(req)) {
4633                         ret = -EAGAIN;
4634                         goto err_req;
4635                 }
4636
4637                 ret = io_req_defer_prep(req, sqe);
4638                 if (ret) {
4639                         /* fail even hard links since we don't submit */
4640                         head->flags |= REQ_F_FAIL_LINK;
4641                         goto err_req;
4642                 }
4643                 trace_io_uring_link(ctx, req, head);
4644                 list_add_tail(&req->link_list, &head->link_list);
4645
4646                 /* last request of a link, enqueue the link */
4647                 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK))) {
4648                         io_queue_link_head(head);
4649                         *link = NULL;
4650                 }
4651         } else {
4652                 if (unlikely(ctx->drain_next)) {
4653                         req->flags |= REQ_F_IO_DRAIN;
4654                         req->ctx->drain_next = 0;
4655                 }
4656                 if (sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
4657                         req->flags |= REQ_F_LINK;
4658                         if (sqe_flags & IOSQE_IO_HARDLINK)
4659                                 req->flags |= REQ_F_HARDLINK;
4660
4661                         INIT_LIST_HEAD(&req->link_list);
4662                         ret = io_req_defer_prep(req, sqe);
4663                         if (ret)
4664                                 req->flags |= REQ_F_FAIL_LINK;
4665                         *link = req;
4666                 } else {
4667                         io_queue_sqe(req, sqe);
4668                 }
4669         }
4670
4671         return true;
4672 }
4673
4674 /*
4675  * Batched submission is done, ensure local IO is flushed out.
4676  */
4677 static void io_submit_state_end(struct io_submit_state *state)
4678 {
4679         blk_finish_plug(&state->plug);
4680         io_file_put(state);
4681         if (state->free_reqs)
4682                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
4683                                         &state->reqs[state->cur_req]);
4684 }
4685
4686 /*
4687  * Start submission side cache.
4688  */
4689 static void io_submit_state_start(struct io_submit_state *state,
4690                                   unsigned int max_ios)
4691 {
4692         blk_start_plug(&state->plug);
4693         state->free_reqs = 0;
4694         state->file = NULL;
4695         state->ios_left = max_ios;
4696 }
4697
4698 static void io_commit_sqring(struct io_ring_ctx *ctx)
4699 {
4700         struct io_rings *rings = ctx->rings;
4701
4702         /*
4703          * Ensure any loads from the SQEs are done at this point,
4704          * since once we write the new head, the application could
4705          * write new data to them.
4706          */
4707         smp_store_release(&rings->sq.head, ctx->cached_sq_head);
4708 }
4709
4710 /*
4711  * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
4712  * that is mapped by userspace. This means that care needs to be taken to
4713  * ensure that reads are stable, as we cannot rely on userspace always
4714  * being a good citizen. If members of the sqe are validated and then later
4715  * used, it's important that those reads are done through READ_ONCE() to
4716  * prevent a re-load down the line.
4717  */
4718 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req,
4719                           const struct io_uring_sqe **sqe_ptr)
4720 {
4721         u32 *sq_array = ctx->sq_array;
4722         unsigned head;
4723
4724         /*
4725          * The cached sq head (or cq tail) serves two purposes:
4726          *
4727          * 1) allows us to batch the cost of updating the user visible
4728          *    head updates.
4729          * 2) allows the kernel side to track the head on its own, even
4730          *    though the application is the one updating it.
4731          */
4732         head = READ_ONCE(sq_array[ctx->cached_sq_head & ctx->sq_mask]);
4733         if (likely(head < ctx->sq_entries)) {
4734                 /*
4735                  * All io need record the previous position, if LINK vs DARIN,
4736                  * it can be used to mark the position of the first IO in the
4737                  * link list.
4738                  */
4739                 req->sequence = ctx->cached_sq_head;
4740                 *sqe_ptr = &ctx->sq_sqes[head];
4741                 req->opcode = READ_ONCE((*sqe_ptr)->opcode);
4742                 req->user_data = READ_ONCE((*sqe_ptr)->user_data);
4743                 ctx->cached_sq_head++;
4744                 return true;
4745         }
4746
4747         /* drop invalid entries */
4748         ctx->cached_sq_head++;
4749         ctx->cached_sq_dropped++;
4750         WRITE_ONCE(ctx->rings->sq_dropped, ctx->cached_sq_dropped);
4751         return false;
4752 }
4753
4754 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
4755                           struct file *ring_file, int ring_fd,
4756                           struct mm_struct **mm, bool async)
4757 {
4758         struct io_submit_state state, *statep = NULL;
4759         struct io_kiocb *link = NULL;
4760         int i, submitted = 0;
4761         bool mm_fault = false;
4762
4763         /* if we have a backlog and couldn't flush it all, return BUSY */
4764         if (test_bit(0, &ctx->sq_check_overflow)) {
4765                 if (!list_empty(&ctx->cq_overflow_list) &&
4766                     !io_cqring_overflow_flush(ctx, false))
4767                         return -EBUSY;
4768         }
4769
4770         /* make sure SQ entry isn't read before tail */
4771         nr = min3(nr, ctx->sq_entries, io_sqring_entries(ctx));
4772
4773         if (!percpu_ref_tryget_many(&ctx->refs, nr))
4774                 return -EAGAIN;
4775
4776         if (nr > IO_PLUG_THRESHOLD) {
4777                 io_submit_state_start(&state, nr);
4778                 statep = &state;
4779         }
4780
4781         for (i = 0; i < nr; i++) {
4782                 const struct io_uring_sqe *sqe;
4783                 struct io_kiocb *req;
4784
4785                 req = io_get_req(ctx, statep);
4786                 if (unlikely(!req)) {
4787                         if (!submitted)
4788                                 submitted = -EAGAIN;
4789                         break;
4790                 }
4791                 if (!io_get_sqring(ctx, req, &sqe)) {
4792                         __io_req_do_free(req);
4793                         break;
4794                 }
4795
4796                 /* will complete beyond this point, count as submitted */
4797                 submitted++;
4798
4799                 if (unlikely(req->opcode >= IORING_OP_LAST)) {
4800                         io_cqring_add_event(req, -EINVAL);
4801                         io_double_put_req(req);
4802                         break;
4803                 }
4804
4805                 if (io_op_defs[req->opcode].needs_mm && !*mm) {
4806                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
4807                         if (!mm_fault) {
4808                                 use_mm(ctx->sqo_mm);
4809                                 *mm = ctx->sqo_mm;
4810                         }
4811                 }
4812
4813                 req->ring_file = ring_file;
4814                 req->ring_fd = ring_fd;
4815                 req->has_user = *mm != NULL;
4816                 req->in_async = async;
4817                 req->needs_fixed_file = async;
4818                 trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data,
4819                                                 true, async);
4820                 if (!io_submit_sqe(req, sqe, statep, &link))
4821                         break;
4822         }
4823
4824         if (submitted != nr)
4825                 percpu_ref_put_many(&ctx->refs, nr - submitted);
4826         if (link)
4827                 io_queue_link_head(link);
4828         if (statep)
4829                 io_submit_state_end(&state);
4830
4831          /* Commit SQ ring head once we've consumed and submitted all SQEs */
4832         io_commit_sqring(ctx);
4833
4834         return submitted;
4835 }
4836
4837 static int io_sq_thread(void *data)
4838 {
4839         struct io_ring_ctx *ctx = data;
4840         struct mm_struct *cur_mm = NULL;
4841         const struct cred *old_cred;
4842         mm_segment_t old_fs;
4843         DEFINE_WAIT(wait);
4844         unsigned inflight;
4845         unsigned long timeout;
4846         int ret;
4847
4848         complete(&ctx->completions[1]);
4849
4850         old_fs = get_fs();
4851         set_fs(USER_DS);
4852         old_cred = override_creds(ctx->creds);
4853
4854         ret = timeout = inflight = 0;
4855         while (!kthread_should_park()) {
4856                 unsigned int to_submit;
4857
4858                 if (inflight) {
4859                         unsigned nr_events = 0;
4860
4861                         if (ctx->flags & IORING_SETUP_IOPOLL) {
4862                                 /*
4863                                  * inflight is the count of the maximum possible
4864                                  * entries we submitted, but it can be smaller
4865                                  * if we dropped some of them. If we don't have
4866                                  * poll entries available, then we know that we
4867                                  * have nothing left to poll for. Reset the
4868                                  * inflight count to zero in that case.
4869                                  */
4870                                 mutex_lock(&ctx->uring_lock);
4871                                 if (!list_empty(&ctx->poll_list))
4872                                         __io_iopoll_check(ctx, &nr_events, 0);
4873                                 else
4874                                         inflight = 0;
4875                                 mutex_unlock(&ctx->uring_lock);
4876                         } else {
4877                                 /*
4878                                  * Normal IO, just pretend everything completed.
4879                                  * We don't have to poll completions for that.
4880                                  */
4881                                 nr_events = inflight;
4882                         }
4883
4884                         inflight -= nr_events;
4885                         if (!inflight)
4886                                 timeout = jiffies + ctx->sq_thread_idle;
4887                 }
4888
4889                 to_submit = io_sqring_entries(ctx);
4890
4891                 /*
4892                  * If submit got -EBUSY, flag us as needing the application
4893                  * to enter the kernel to reap and flush events.
4894                  */
4895                 if (!to_submit || ret == -EBUSY) {
4896                         /*
4897                          * We're polling. If we're within the defined idle
4898                          * period, then let us spin without work before going
4899                          * to sleep. The exception is if we got EBUSY doing
4900                          * more IO, we should wait for the application to
4901                          * reap events and wake us up.
4902                          */
4903                         if (inflight ||
4904                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
4905                                 cond_resched();
4906                                 continue;
4907                         }
4908
4909                         /*
4910                          * Drop cur_mm before scheduling, we can't hold it for
4911                          * long periods (or over schedule()). Do this before
4912                          * adding ourselves to the waitqueue, as the unuse/drop
4913                          * may sleep.
4914                          */
4915                         if (cur_mm) {
4916                                 unuse_mm(cur_mm);
4917                                 mmput(cur_mm);
4918                                 cur_mm = NULL;
4919                         }
4920
4921                         prepare_to_wait(&ctx->sqo_wait, &wait,
4922                                                 TASK_INTERRUPTIBLE);
4923
4924                         /* Tell userspace we may need a wakeup call */
4925                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
4926                         /* make sure to read SQ tail after writing flags */
4927                         smp_mb();
4928
4929                         to_submit = io_sqring_entries(ctx);
4930                         if (!to_submit || ret == -EBUSY) {
4931                                 if (kthread_should_park()) {
4932                                         finish_wait(&ctx->sqo_wait, &wait);
4933                                         break;
4934                                 }
4935                                 if (signal_pending(current))
4936                                         flush_signals(current);
4937                                 schedule();
4938                                 finish_wait(&ctx->sqo_wait, &wait);
4939
4940                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4941                                 continue;
4942                         }
4943                         finish_wait(&ctx->sqo_wait, &wait);
4944
4945                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4946                 }
4947
4948                 mutex_lock(&ctx->uring_lock);
4949                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
4950                 mutex_unlock(&ctx->uring_lock);
4951                 if (ret > 0)
4952                         inflight += ret;
4953         }
4954
4955         set_fs(old_fs);
4956         if (cur_mm) {
4957                 unuse_mm(cur_mm);
4958                 mmput(cur_mm);
4959         }
4960         revert_creds(old_cred);
4961
4962         kthread_parkme();
4963
4964         return 0;
4965 }
4966
4967 struct io_wait_queue {
4968         struct wait_queue_entry wq;
4969         struct io_ring_ctx *ctx;
4970         unsigned to_wait;
4971         unsigned nr_timeouts;
4972 };
4973
4974 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
4975 {
4976         struct io_ring_ctx *ctx = iowq->ctx;
4977
4978         /*
4979          * Wake up if we have enough events, or if a timeout occurred since we
4980          * started waiting. For timeouts, we always want to return to userspace,
4981          * regardless of event count.
4982          */
4983         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
4984                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
4985 }
4986
4987 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
4988                             int wake_flags, void *key)
4989 {
4990         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
4991                                                         wq);
4992
4993         /* use noflush == true, as we can't safely rely on locking context */
4994         if (!io_should_wake(iowq, true))
4995                 return -1;
4996
4997         return autoremove_wake_function(curr, mode, wake_flags, key);
4998 }
4999
5000 /*
5001  * Wait until events become available, if we don't already have some. The
5002  * application must reap them itself, as they reside on the shared cq ring.
5003  */
5004 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
5005                           const sigset_t __user *sig, size_t sigsz)
5006 {
5007         struct io_wait_queue iowq = {
5008                 .wq = {
5009                         .private        = current,
5010                         .func           = io_wake_function,
5011                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
5012                 },
5013                 .ctx            = ctx,
5014                 .to_wait        = min_events,
5015         };
5016         struct io_rings *rings = ctx->rings;
5017         int ret = 0;
5018
5019         if (io_cqring_events(ctx, false) >= min_events)
5020                 return 0;
5021
5022         if (sig) {
5023 #ifdef CONFIG_COMPAT
5024                 if (in_compat_syscall())
5025                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
5026                                                       sigsz);
5027                 else
5028 #endif
5029                         ret = set_user_sigmask(sig, sigsz);
5030
5031                 if (ret)
5032                         return ret;
5033         }
5034
5035         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
5036         trace_io_uring_cqring_wait(ctx, min_events);
5037         do {
5038                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
5039                                                 TASK_INTERRUPTIBLE);
5040                 if (io_should_wake(&iowq, false))
5041                         break;
5042                 schedule();
5043                 if (signal_pending(current)) {
5044                         ret = -EINTR;
5045                         break;
5046                 }
5047         } while (1);
5048         finish_wait(&ctx->wait, &iowq.wq);
5049
5050         restore_saved_sigmask_unless(ret == -EINTR);
5051
5052         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
5053 }
5054
5055 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
5056 {
5057 #if defined(CONFIG_UNIX)
5058         if (ctx->ring_sock) {
5059                 struct sock *sock = ctx->ring_sock->sk;
5060                 struct sk_buff *skb;
5061
5062                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
5063                         kfree_skb(skb);
5064         }
5065 #else
5066         int i;
5067
5068         for (i = 0; i < ctx->nr_user_files; i++) {
5069                 struct file *file;
5070
5071                 file = io_file_from_index(ctx, i);
5072                 if (file)
5073                         fput(file);
5074         }
5075 #endif
5076 }
5077
5078 static void io_file_ref_kill(struct percpu_ref *ref)
5079 {
5080         struct fixed_file_data *data;
5081
5082         data = container_of(ref, struct fixed_file_data, refs);
5083         complete(&data->done);
5084 }
5085
5086 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
5087 {
5088         struct fixed_file_data *data = ctx->file_data;
5089         unsigned nr_tables, i;
5090
5091         if (!data)
5092                 return -ENXIO;
5093
5094         /* protect against inflight atomic switch, which drops the ref */
5095         flush_work(&data->ref_work);
5096         percpu_ref_get(&data->refs);
5097         percpu_ref_kill_and_confirm(&data->refs, io_file_ref_kill);
5098         wait_for_completion(&data->done);
5099         percpu_ref_put(&data->refs);
5100         percpu_ref_exit(&data->refs);
5101
5102         __io_sqe_files_unregister(ctx);
5103         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
5104         for (i = 0; i < nr_tables; i++)
5105                 kfree(data->table[i].files);
5106         kfree(data->table);
5107         kfree(data);
5108         ctx->file_data = NULL;
5109         ctx->nr_user_files = 0;
5110         return 0;
5111 }
5112
5113 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
5114 {
5115         if (ctx->sqo_thread) {
5116                 wait_for_completion(&ctx->completions[1]);
5117                 /*
5118                  * The park is a bit of a work-around, without it we get
5119                  * warning spews on shutdown with SQPOLL set and affinity
5120                  * set to a single CPU.
5121                  */
5122                 kthread_park(ctx->sqo_thread);
5123                 kthread_stop(ctx->sqo_thread);
5124                 ctx->sqo_thread = NULL;
5125         }
5126 }
5127
5128 static void io_finish_async(struct io_ring_ctx *ctx)
5129 {
5130         io_sq_thread_stop(ctx);
5131
5132         if (ctx->io_wq) {
5133                 io_wq_destroy(ctx->io_wq);
5134                 ctx->io_wq = NULL;
5135         }
5136 }
5137
5138 #if defined(CONFIG_UNIX)
5139 /*
5140  * Ensure the UNIX gc is aware of our file set, so we are certain that
5141  * the io_uring can be safely unregistered on process exit, even if we have
5142  * loops in the file referencing.
5143  */
5144 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
5145 {
5146         struct sock *sk = ctx->ring_sock->sk;
5147         struct scm_fp_list *fpl;
5148         struct sk_buff *skb;
5149         int i, nr_files;
5150
5151         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
5152                 unsigned long inflight = ctx->user->unix_inflight + nr;
5153
5154                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
5155                         return -EMFILE;
5156         }
5157
5158         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
5159         if (!fpl)
5160                 return -ENOMEM;
5161
5162         skb = alloc_skb(0, GFP_KERNEL);
5163         if (!skb) {
5164                 kfree(fpl);
5165                 return -ENOMEM;
5166         }
5167
5168         skb->sk = sk;
5169
5170         nr_files = 0;
5171         fpl->user = get_uid(ctx->user);
5172         for (i = 0; i < nr; i++) {
5173                 struct file *file = io_file_from_index(ctx, i + offset);
5174
5175                 if (!file)
5176                         continue;
5177                 fpl->fp[nr_files] = get_file(file);
5178                 unix_inflight(fpl->user, fpl->fp[nr_files]);
5179                 nr_files++;
5180         }
5181
5182         if (nr_files) {
5183                 fpl->max = SCM_MAX_FD;
5184                 fpl->count = nr_files;
5185                 UNIXCB(skb).fp = fpl;
5186                 skb->destructor = unix_destruct_scm;
5187                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
5188                 skb_queue_head(&sk->sk_receive_queue, skb);
5189
5190                 for (i = 0; i < nr_files; i++)
5191                         fput(fpl->fp[i]);
5192         } else {
5193                 kfree_skb(skb);
5194                 kfree(fpl);
5195         }
5196
5197         return 0;
5198 }
5199
5200 /*
5201  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
5202  * causes regular reference counting to break down. We rely on the UNIX
5203  * garbage collection to take care of this problem for us.
5204  */
5205 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5206 {
5207         unsigned left, total;
5208         int ret = 0;
5209
5210         total = 0;
5211         left = ctx->nr_user_files;
5212         while (left) {
5213                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
5214
5215                 ret = __io_sqe_files_scm(ctx, this_files, total);
5216                 if (ret)
5217                         break;
5218                 left -= this_files;
5219                 total += this_files;
5220         }
5221
5222         if (!ret)
5223                 return 0;
5224
5225         while (total < ctx->nr_user_files) {
5226                 struct file *file = io_file_from_index(ctx, total);
5227
5228                 if (file)
5229                         fput(file);
5230                 total++;
5231         }
5232
5233         return ret;
5234 }
5235 #else
5236 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5237 {
5238         return 0;
5239 }
5240 #endif
5241
5242 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
5243                                     unsigned nr_files)
5244 {
5245         int i;
5246
5247         for (i = 0; i < nr_tables; i++) {
5248                 struct fixed_file_table *table = &ctx->file_data->table[i];
5249                 unsigned this_files;
5250
5251                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
5252                 table->files = kcalloc(this_files, sizeof(struct file *),
5253                                         GFP_KERNEL);
5254                 if (!table->files)
5255                         break;
5256                 nr_files -= this_files;
5257         }
5258
5259         if (i == nr_tables)
5260                 return 0;
5261
5262         for (i = 0; i < nr_tables; i++) {
5263                 struct fixed_file_table *table = &ctx->file_data->table[i];
5264                 kfree(table->files);
5265         }
5266         return 1;
5267 }
5268
5269 static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file)
5270 {
5271 #if defined(CONFIG_UNIX)
5272         struct sock *sock = ctx->ring_sock->sk;
5273         struct sk_buff_head list, *head = &sock->sk_receive_queue;
5274         struct sk_buff *skb;
5275         int i;
5276
5277         __skb_queue_head_init(&list);
5278
5279         /*
5280          * Find the skb that holds this file in its SCM_RIGHTS. When found,
5281          * remove this entry and rearrange the file array.
5282          */
5283         skb = skb_dequeue(head);
5284         while (skb) {
5285                 struct scm_fp_list *fp;
5286
5287                 fp = UNIXCB(skb).fp;
5288                 for (i = 0; i < fp->count; i++) {
5289                         int left;
5290
5291                         if (fp->fp[i] != file)
5292                                 continue;
5293
5294                         unix_notinflight(fp->user, fp->fp[i]);
5295                         left = fp->count - 1 - i;
5296                         if (left) {
5297                                 memmove(&fp->fp[i], &fp->fp[i + 1],
5298                                                 left * sizeof(struct file *));
5299                         }
5300                         fp->count--;
5301                         if (!fp->count) {
5302                                 kfree_skb(skb);
5303                                 skb = NULL;
5304                         } else {
5305                                 __skb_queue_tail(&list, skb);
5306                         }
5307                         fput(file);
5308                         file = NULL;
5309                         break;
5310                 }
5311
5312                 if (!file)
5313                         break;
5314
5315                 __skb_queue_tail(&list, skb);
5316
5317                 skb = skb_dequeue(head);
5318         }
5319
5320         if (skb_peek(&list)) {
5321                 spin_lock_irq(&head->lock);
5322                 while ((skb = __skb_dequeue(&list)) != NULL)
5323                         __skb_queue_tail(head, skb);
5324                 spin_unlock_irq(&head->lock);
5325         }
5326 #else
5327         fput(file);
5328 #endif
5329 }
5330
5331 struct io_file_put {
5332         struct llist_node llist;
5333         struct file *file;
5334         struct completion *done;
5335 };
5336
5337 static void io_ring_file_ref_switch(struct work_struct *work)
5338 {
5339         struct io_file_put *pfile, *tmp;
5340         struct fixed_file_data *data;
5341         struct llist_node *node;
5342
5343         data = container_of(work, struct fixed_file_data, ref_work);
5344
5345         while ((node = llist_del_all(&data->put_llist)) != NULL) {
5346                 llist_for_each_entry_safe(pfile, tmp, node, llist) {
5347                         io_ring_file_put(data->ctx, pfile->file);
5348                         if (pfile->done)
5349                                 complete(pfile->done);
5350                         else
5351                                 kfree(pfile);
5352                 }
5353         }
5354
5355         percpu_ref_get(&data->refs);
5356         percpu_ref_switch_to_percpu(&data->refs);
5357 }
5358
5359 static void io_file_data_ref_zero(struct percpu_ref *ref)
5360 {
5361         struct fixed_file_data *data;
5362
5363         data = container_of(ref, struct fixed_file_data, refs);
5364
5365         /* we can't safely switch from inside this context, punt to wq */
5366         queue_work(system_wq, &data->ref_work);
5367 }
5368
5369 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
5370                                  unsigned nr_args)
5371 {
5372         __s32 __user *fds = (__s32 __user *) arg;
5373         unsigned nr_tables;
5374         struct file *file;
5375         int fd, ret = 0;
5376         unsigned i;
5377
5378         if (ctx->file_data)
5379                 return -EBUSY;
5380         if (!nr_args)
5381                 return -EINVAL;
5382         if (nr_args > IORING_MAX_FIXED_FILES)
5383                 return -EMFILE;
5384
5385         ctx->file_data = kzalloc(sizeof(*ctx->file_data), GFP_KERNEL);
5386         if (!ctx->file_data)
5387                 return -ENOMEM;
5388         ctx->file_data->ctx = ctx;
5389         init_completion(&ctx->file_data->done);
5390
5391         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
5392         ctx->file_data->table = kcalloc(nr_tables,
5393                                         sizeof(struct fixed_file_table),
5394                                         GFP_KERNEL);
5395         if (!ctx->file_data->table) {
5396                 kfree(ctx->file_data);
5397                 ctx->file_data = NULL;
5398                 return -ENOMEM;
5399         }
5400
5401         if (percpu_ref_init(&ctx->file_data->refs, io_file_data_ref_zero,
5402                                 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
5403                 kfree(ctx->file_data->table);
5404                 kfree(ctx->file_data);
5405                 ctx->file_data = NULL;
5406                 return -ENOMEM;
5407         }
5408         ctx->file_data->put_llist.first = NULL;
5409         INIT_WORK(&ctx->file_data->ref_work, io_ring_file_ref_switch);
5410
5411         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
5412                 percpu_ref_exit(&ctx->file_data->refs);
5413                 kfree(ctx->file_data->table);
5414                 kfree(ctx->file_data);
5415                 ctx->file_data = NULL;
5416                 return -ENOMEM;
5417         }
5418
5419         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
5420                 struct fixed_file_table *table;
5421                 unsigned index;
5422
5423                 ret = -EFAULT;
5424                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
5425                         break;
5426                 /* allow sparse sets */
5427                 if (fd == -1) {
5428                         ret = 0;
5429                         continue;
5430                 }
5431
5432                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5433                 index = i & IORING_FILE_TABLE_MASK;
5434                 file = fget(fd);
5435
5436                 ret = -EBADF;
5437                 if (!file)
5438                         break;
5439
5440                 /*
5441                  * Don't allow io_uring instances to be registered. If UNIX
5442                  * isn't enabled, then this causes a reference cycle and this
5443                  * instance can never get freed. If UNIX is enabled we'll
5444                  * handle it just fine, but there's still no point in allowing
5445                  * a ring fd as it doesn't support regular read/write anyway.
5446                  */
5447                 if (file->f_op == &io_uring_fops) {
5448                         fput(file);
5449                         break;
5450                 }
5451                 ret = 0;
5452                 table->files[index] = file;
5453         }
5454
5455         if (ret) {
5456                 for (i = 0; i < ctx->nr_user_files; i++) {
5457                         file = io_file_from_index(ctx, i);
5458                         if (file)
5459                                 fput(file);
5460                 }
5461                 for (i = 0; i < nr_tables; i++)
5462                         kfree(ctx->file_data->table[i].files);
5463
5464                 kfree(ctx->file_data->table);
5465                 kfree(ctx->file_data);
5466                 ctx->file_data = NULL;
5467                 ctx->nr_user_files = 0;
5468                 return ret;
5469         }
5470
5471         ret = io_sqe_files_scm(ctx);
5472         if (ret)
5473                 io_sqe_files_unregister(ctx);
5474
5475         return ret;
5476 }
5477
5478 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
5479                                 int index)
5480 {
5481 #if defined(CONFIG_UNIX)
5482         struct sock *sock = ctx->ring_sock->sk;
5483         struct sk_buff_head *head = &sock->sk_receive_queue;
5484         struct sk_buff *skb;
5485
5486         /*
5487          * See if we can merge this file into an existing skb SCM_RIGHTS
5488          * file set. If there's no room, fall back to allocating a new skb
5489          * and filling it in.
5490          */
5491         spin_lock_irq(&head->lock);
5492         skb = skb_peek(head);
5493         if (skb) {
5494                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
5495
5496                 if (fpl->count < SCM_MAX_FD) {
5497                         __skb_unlink(skb, head);
5498                         spin_unlock_irq(&head->lock);
5499                         fpl->fp[fpl->count] = get_file(file);
5500                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
5501                         fpl->count++;
5502                         spin_lock_irq(&head->lock);
5503                         __skb_queue_head(head, skb);
5504                 } else {
5505                         skb = NULL;
5506                 }
5507         }
5508         spin_unlock_irq(&head->lock);
5509
5510         if (skb) {
5511                 fput(file);
5512                 return 0;
5513         }
5514
5515         return __io_sqe_files_scm(ctx, 1, index);
5516 #else
5517         return 0;
5518 #endif
5519 }
5520
5521 static void io_atomic_switch(struct percpu_ref *ref)
5522 {
5523         struct fixed_file_data *data;
5524
5525         data = container_of(ref, struct fixed_file_data, refs);
5526         clear_bit(FFD_F_ATOMIC, &data->state);
5527 }
5528
5529 static bool io_queue_file_removal(struct fixed_file_data *data,
5530                                   struct file *file)
5531 {
5532         struct io_file_put *pfile, pfile_stack;
5533         DECLARE_COMPLETION_ONSTACK(done);
5534
5535         /*
5536          * If we fail allocating the struct we need for doing async reomval
5537          * of this file, just punt to sync and wait for it.
5538          */
5539         pfile = kzalloc(sizeof(*pfile), GFP_KERNEL);
5540         if (!pfile) {
5541                 pfile = &pfile_stack;
5542                 pfile->done = &done;
5543         }
5544
5545         pfile->file = file;
5546         llist_add(&pfile->llist, &data->put_llist);
5547
5548         if (pfile == &pfile_stack) {
5549                 if (!test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5550                         percpu_ref_put(&data->refs);
5551                         percpu_ref_switch_to_atomic(&data->refs,
5552                                                         io_atomic_switch);
5553                 }
5554                 wait_for_completion(&done);
5555                 flush_work(&data->ref_work);
5556                 return false;
5557         }
5558
5559         return true;
5560 }
5561
5562 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
5563                                  struct io_uring_files_update *up,
5564                                  unsigned nr_args)
5565 {
5566         struct fixed_file_data *data = ctx->file_data;
5567         bool ref_switch = false;
5568         struct file *file;
5569         __s32 __user *fds;
5570         int fd, i, err;
5571         __u32 done;
5572
5573         if (check_add_overflow(up->offset, nr_args, &done))
5574                 return -EOVERFLOW;
5575         if (done > ctx->nr_user_files)
5576                 return -EINVAL;
5577
5578         done = 0;
5579         fds = u64_to_user_ptr(up->fds);
5580         while (nr_args) {
5581                 struct fixed_file_table *table;
5582                 unsigned index;
5583
5584                 err = 0;
5585                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
5586                         err = -EFAULT;
5587                         break;
5588                 }
5589                 i = array_index_nospec(up->offset, ctx->nr_user_files);
5590                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5591                 index = i & IORING_FILE_TABLE_MASK;
5592                 if (table->files[index]) {
5593                         file = io_file_from_index(ctx, index);
5594                         table->files[index] = NULL;
5595                         if (io_queue_file_removal(data, file))
5596                                 ref_switch = true;
5597                 }
5598                 if (fd != -1) {
5599                         file = fget(fd);
5600                         if (!file) {
5601                                 err = -EBADF;
5602                                 break;
5603                         }
5604                         /*
5605                          * Don't allow io_uring instances to be registered. If
5606                          * UNIX isn't enabled, then this causes a reference
5607                          * cycle and this instance can never get freed. If UNIX
5608                          * is enabled we'll handle it just fine, but there's
5609                          * still no point in allowing a ring fd as it doesn't
5610                          * support regular read/write anyway.
5611                          */
5612                         if (file->f_op == &io_uring_fops) {
5613                                 fput(file);
5614                                 err = -EBADF;
5615                                 break;
5616                         }
5617                         table->files[index] = file;
5618                         err = io_sqe_file_register(ctx, file, i);
5619                         if (err)
5620                                 break;
5621                 }
5622                 nr_args--;
5623                 done++;
5624                 up->offset++;
5625         }
5626
5627         if (ref_switch && !test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5628                 percpu_ref_put(&data->refs);
5629                 percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch);
5630         }
5631
5632         return done ? done : err;
5633 }
5634 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
5635                                unsigned nr_args)
5636 {
5637         struct io_uring_files_update up;
5638
5639         if (!ctx->file_data)
5640                 return -ENXIO;
5641         if (!nr_args)
5642                 return -EINVAL;
5643         if (copy_from_user(&up, arg, sizeof(up)))
5644                 return -EFAULT;
5645         if (up.resv)
5646                 return -EINVAL;
5647
5648         return __io_sqe_files_update(ctx, &up, nr_args);
5649 }
5650
5651 static void io_put_work(struct io_wq_work *work)
5652 {
5653         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5654
5655         io_put_req(req);
5656 }
5657
5658 static void io_get_work(struct io_wq_work *work)
5659 {
5660         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5661
5662         refcount_inc(&req->refs);
5663 }
5664
5665 static int io_sq_offload_start(struct io_ring_ctx *ctx,
5666                                struct io_uring_params *p)
5667 {
5668         struct io_wq_data data;
5669         unsigned concurrency;
5670         int ret;
5671
5672         init_waitqueue_head(&ctx->sqo_wait);
5673         mmgrab(current->mm);
5674         ctx->sqo_mm = current->mm;
5675
5676         if (ctx->flags & IORING_SETUP_SQPOLL) {
5677                 ret = -EPERM;
5678                 if (!capable(CAP_SYS_ADMIN))
5679                         goto err;
5680
5681                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
5682                 if (!ctx->sq_thread_idle)
5683                         ctx->sq_thread_idle = HZ;
5684
5685                 if (p->flags & IORING_SETUP_SQ_AFF) {
5686                         int cpu = p->sq_thread_cpu;
5687
5688                         ret = -EINVAL;
5689                         if (cpu >= nr_cpu_ids)
5690                                 goto err;
5691                         if (!cpu_online(cpu))
5692                                 goto err;
5693
5694                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
5695                                                         ctx, cpu,
5696                                                         "io_uring-sq");
5697                 } else {
5698                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
5699                                                         "io_uring-sq");
5700                 }
5701                 if (IS_ERR(ctx->sqo_thread)) {
5702                         ret = PTR_ERR(ctx->sqo_thread);
5703                         ctx->sqo_thread = NULL;
5704                         goto err;
5705                 }
5706                 wake_up_process(ctx->sqo_thread);
5707         } else if (p->flags & IORING_SETUP_SQ_AFF) {
5708                 /* Can't have SQ_AFF without SQPOLL */
5709                 ret = -EINVAL;
5710                 goto err;
5711         }
5712
5713         data.mm = ctx->sqo_mm;
5714         data.user = ctx->user;
5715         data.creds = ctx->creds;
5716         data.get_work = io_get_work;
5717         data.put_work = io_put_work;
5718
5719         /* Do QD, or 4 * CPUS, whatever is smallest */
5720         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
5721         ctx->io_wq = io_wq_create(concurrency, &data);
5722         if (IS_ERR(ctx->io_wq)) {
5723                 ret = PTR_ERR(ctx->io_wq);
5724                 ctx->io_wq = NULL;
5725                 goto err;
5726         }
5727
5728         return 0;
5729 err:
5730         io_finish_async(ctx);
5731         mmdrop(ctx->sqo_mm);
5732         ctx->sqo_mm = NULL;
5733         return ret;
5734 }
5735
5736 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
5737 {
5738         atomic_long_sub(nr_pages, &user->locked_vm);
5739 }
5740
5741 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
5742 {
5743         unsigned long page_limit, cur_pages, new_pages;
5744
5745         /* Don't allow more pages than we can safely lock */
5746         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
5747
5748         do {
5749                 cur_pages = atomic_long_read(&user->locked_vm);
5750                 new_pages = cur_pages + nr_pages;
5751                 if (new_pages > page_limit)
5752                         return -ENOMEM;
5753         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
5754                                         new_pages) != cur_pages);
5755
5756         return 0;
5757 }
5758
5759 static void io_mem_free(void *ptr)
5760 {
5761         struct page *page;
5762
5763         if (!ptr)
5764                 return;
5765
5766         page = virt_to_head_page(ptr);
5767         if (put_page_testzero(page))
5768                 free_compound_page(page);
5769 }
5770
5771 static void *io_mem_alloc(size_t size)
5772 {
5773         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
5774                                 __GFP_NORETRY;
5775
5776         return (void *) __get_free_pages(gfp_flags, get_order(size));
5777 }
5778
5779 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
5780                                 size_t *sq_offset)
5781 {
5782         struct io_rings *rings;
5783         size_t off, sq_array_size;
5784
5785         off = struct_size(rings, cqes, cq_entries);
5786         if (off == SIZE_MAX)
5787                 return SIZE_MAX;
5788
5789 #ifdef CONFIG_SMP
5790         off = ALIGN(off, SMP_CACHE_BYTES);
5791         if (off == 0)
5792                 return SIZE_MAX;
5793 #endif
5794
5795         sq_array_size = array_size(sizeof(u32), sq_entries);
5796         if (sq_array_size == SIZE_MAX)
5797                 return SIZE_MAX;
5798
5799         if (check_add_overflow(off, sq_array_size, &off))
5800                 return SIZE_MAX;
5801
5802         if (sq_offset)
5803                 *sq_offset = off;
5804
5805         return off;
5806 }
5807
5808 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
5809 {
5810         size_t pages;
5811
5812         pages = (size_t)1 << get_order(
5813                 rings_size(sq_entries, cq_entries, NULL));
5814         pages += (size_t)1 << get_order(
5815                 array_size(sizeof(struct io_uring_sqe), sq_entries));
5816
5817         return pages;
5818 }
5819
5820 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
5821 {
5822         int i, j;
5823
5824         if (!ctx->user_bufs)
5825                 return -ENXIO;
5826
5827         for (i = 0; i < ctx->nr_user_bufs; i++) {
5828                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5829
5830                 for (j = 0; j < imu->nr_bvecs; j++)
5831                         put_user_page(imu->bvec[j].bv_page);
5832
5833                 if (ctx->account_mem)
5834                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
5835                 kvfree(imu->bvec);
5836                 imu->nr_bvecs = 0;
5837         }
5838
5839         kfree(ctx->user_bufs);
5840         ctx->user_bufs = NULL;
5841         ctx->nr_user_bufs = 0;
5842         return 0;
5843 }
5844
5845 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
5846                        void __user *arg, unsigned index)
5847 {
5848         struct iovec __user *src;
5849
5850 #ifdef CONFIG_COMPAT
5851         if (ctx->compat) {
5852                 struct compat_iovec __user *ciovs;
5853                 struct compat_iovec ciov;
5854
5855                 ciovs = (struct compat_iovec __user *) arg;
5856                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
5857                         return -EFAULT;
5858
5859                 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
5860                 dst->iov_len = ciov.iov_len;
5861                 return 0;
5862         }
5863 #endif
5864         src = (struct iovec __user *) arg;
5865         if (copy_from_user(dst, &src[index], sizeof(*dst)))
5866                 return -EFAULT;
5867         return 0;
5868 }
5869
5870 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
5871                                   unsigned nr_args)
5872 {
5873         struct vm_area_struct **vmas = NULL;
5874         struct page **pages = NULL;
5875         int i, j, got_pages = 0;
5876         int ret = -EINVAL;
5877
5878         if (ctx->user_bufs)
5879                 return -EBUSY;
5880         if (!nr_args || nr_args > UIO_MAXIOV)
5881                 return -EINVAL;
5882
5883         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
5884                                         GFP_KERNEL);
5885         if (!ctx->user_bufs)
5886                 return -ENOMEM;
5887
5888         for (i = 0; i < nr_args; i++) {
5889                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5890                 unsigned long off, start, end, ubuf;
5891                 int pret, nr_pages;
5892                 struct iovec iov;
5893                 size_t size;
5894
5895                 ret = io_copy_iov(ctx, &iov, arg, i);
5896                 if (ret)
5897                         goto err;
5898
5899                 /*
5900                  * Don't impose further limits on the size and buffer
5901                  * constraints here, we'll -EINVAL later when IO is
5902                  * submitted if they are wrong.
5903                  */
5904                 ret = -EFAULT;
5905                 if (!iov.iov_base || !iov.iov_len)
5906                         goto err;
5907
5908                 /* arbitrary limit, but we need something */
5909                 if (iov.iov_len > SZ_1G)
5910                         goto err;
5911
5912                 ubuf = (unsigned long) iov.iov_base;
5913                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
5914                 start = ubuf >> PAGE_SHIFT;
5915                 nr_pages = end - start;
5916
5917                 if (ctx->account_mem) {
5918                         ret = io_account_mem(ctx->user, nr_pages);
5919                         if (ret)
5920                                 goto err;
5921                 }
5922
5923                 ret = 0;
5924                 if (!pages || nr_pages > got_pages) {
5925                         kfree(vmas);
5926                         kfree(pages);
5927                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
5928                                                 GFP_KERNEL);
5929                         vmas = kvmalloc_array(nr_pages,
5930                                         sizeof(struct vm_area_struct *),
5931                                         GFP_KERNEL);
5932                         if (!pages || !vmas) {
5933                                 ret = -ENOMEM;
5934                                 if (ctx->account_mem)
5935                                         io_unaccount_mem(ctx->user, nr_pages);
5936                                 goto err;
5937                         }
5938                         got_pages = nr_pages;
5939                 }
5940
5941                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
5942                                                 GFP_KERNEL);
5943                 ret = -ENOMEM;
5944                 if (!imu->bvec) {
5945                         if (ctx->account_mem)
5946                                 io_unaccount_mem(ctx->user, nr_pages);
5947                         goto err;
5948                 }
5949
5950                 ret = 0;
5951                 down_read(&current->mm->mmap_sem);
5952                 pret = get_user_pages(ubuf, nr_pages,
5953                                       FOLL_WRITE | FOLL_LONGTERM,
5954                                       pages, vmas);
5955                 if (pret == nr_pages) {
5956                         /* don't support file backed memory */
5957                         for (j = 0; j < nr_pages; j++) {
5958                                 struct vm_area_struct *vma = vmas[j];
5959
5960                                 if (vma->vm_file &&
5961                                     !is_file_hugepages(vma->vm_file)) {
5962                                         ret = -EOPNOTSUPP;
5963                                         break;
5964                                 }
5965                         }
5966                 } else {
5967                         ret = pret < 0 ? pret : -EFAULT;
5968                 }
5969                 up_read(&current->mm->mmap_sem);
5970                 if (ret) {
5971                         /*
5972                          * if we did partial map, or found file backed vmas,
5973                          * release any pages we did get
5974                          */
5975                         if (pret > 0)
5976                                 put_user_pages(pages, pret);
5977                         if (ctx->account_mem)
5978                                 io_unaccount_mem(ctx->user, nr_pages);
5979                         kvfree(imu->bvec);
5980                         goto err;
5981                 }
5982
5983                 off = ubuf & ~PAGE_MASK;
5984                 size = iov.iov_len;
5985                 for (j = 0; j < nr_pages; j++) {
5986                         size_t vec_len;
5987
5988                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
5989                         imu->bvec[j].bv_page = pages[j];
5990                         imu->bvec[j].bv_len = vec_len;
5991                         imu->bvec[j].bv_offset = off;
5992                         off = 0;
5993                         size -= vec_len;
5994                 }
5995                 /* store original address for later verification */
5996                 imu->ubuf = ubuf;
5997                 imu->len = iov.iov_len;
5998                 imu->nr_bvecs = nr_pages;
5999
6000                 ctx->nr_user_bufs++;
6001         }
6002         kvfree(pages);
6003         kvfree(vmas);
6004         return 0;
6005 err:
6006         kvfree(pages);
6007         kvfree(vmas);
6008         io_sqe_buffer_unregister(ctx);
6009         return ret;
6010 }
6011
6012 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
6013 {
6014         __s32 __user *fds = arg;
6015         int fd;
6016
6017         if (ctx->cq_ev_fd)
6018                 return -EBUSY;
6019
6020         if (copy_from_user(&fd, fds, sizeof(*fds)))
6021                 return -EFAULT;
6022
6023         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
6024         if (IS_ERR(ctx->cq_ev_fd)) {
6025                 int ret = PTR_ERR(ctx->cq_ev_fd);
6026                 ctx->cq_ev_fd = NULL;
6027                 return ret;
6028         }
6029
6030         return 0;
6031 }
6032
6033 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
6034 {
6035         if (ctx->cq_ev_fd) {
6036                 eventfd_ctx_put(ctx->cq_ev_fd);
6037                 ctx->cq_ev_fd = NULL;
6038                 return 0;
6039         }
6040
6041         return -ENXIO;
6042 }
6043
6044 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
6045 {
6046         io_finish_async(ctx);
6047         if (ctx->sqo_mm)
6048                 mmdrop(ctx->sqo_mm);
6049
6050         io_iopoll_reap_events(ctx);
6051         io_sqe_buffer_unregister(ctx);
6052         io_sqe_files_unregister(ctx);
6053         io_eventfd_unregister(ctx);
6054
6055 #if defined(CONFIG_UNIX)
6056         if (ctx->ring_sock) {
6057                 ctx->ring_sock->file = NULL; /* so that iput() is called */
6058                 sock_release(ctx->ring_sock);
6059         }
6060 #endif
6061
6062         io_mem_free(ctx->rings);
6063         io_mem_free(ctx->sq_sqes);
6064
6065         percpu_ref_exit(&ctx->refs);
6066         if (ctx->account_mem)
6067                 io_unaccount_mem(ctx->user,
6068                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
6069         free_uid(ctx->user);
6070         put_cred(ctx->creds);
6071         kfree(ctx->completions);
6072         kfree(ctx->cancel_hash);
6073         kmem_cache_free(req_cachep, ctx->fallback_req);
6074         kfree(ctx);
6075 }
6076
6077 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
6078 {
6079         struct io_ring_ctx *ctx = file->private_data;
6080         __poll_t mask = 0;
6081
6082         poll_wait(file, &ctx->cq_wait, wait);
6083         /*
6084          * synchronizes with barrier from wq_has_sleeper call in
6085          * io_commit_cqring
6086          */
6087         smp_rmb();
6088         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
6089             ctx->rings->sq_ring_entries)
6090                 mask |= EPOLLOUT | EPOLLWRNORM;
6091         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
6092                 mask |= EPOLLIN | EPOLLRDNORM;
6093
6094         return mask;
6095 }
6096
6097 static int io_uring_fasync(int fd, struct file *file, int on)
6098 {
6099         struct io_ring_ctx *ctx = file->private_data;
6100
6101         return fasync_helper(fd, file, on, &ctx->cq_fasync);
6102 }
6103
6104 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
6105 {
6106         mutex_lock(&ctx->uring_lock);
6107         percpu_ref_kill(&ctx->refs);
6108         mutex_unlock(&ctx->uring_lock);
6109
6110         io_kill_timeouts(ctx);
6111         io_poll_remove_all(ctx);
6112
6113         if (ctx->io_wq)
6114                 io_wq_cancel_all(ctx->io_wq);
6115
6116         io_iopoll_reap_events(ctx);
6117         /* if we failed setting up the ctx, we might not have any rings */
6118         if (ctx->rings)
6119                 io_cqring_overflow_flush(ctx, true);
6120         wait_for_completion(&ctx->completions[0]);
6121         io_ring_ctx_free(ctx);
6122 }
6123
6124 static int io_uring_release(struct inode *inode, struct file *file)
6125 {
6126         struct io_ring_ctx *ctx = file->private_data;
6127
6128         file->private_data = NULL;
6129         io_ring_ctx_wait_and_kill(ctx);
6130         return 0;
6131 }
6132
6133 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
6134                                   struct files_struct *files)
6135 {
6136         struct io_kiocb *req;
6137         DEFINE_WAIT(wait);
6138
6139         while (!list_empty_careful(&ctx->inflight_list)) {
6140                 struct io_kiocb *cancel_req = NULL;
6141
6142                 spin_lock_irq(&ctx->inflight_lock);
6143                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
6144                         if (req->work.files != files)
6145                                 continue;
6146                         /* req is being completed, ignore */
6147                         if (!refcount_inc_not_zero(&req->refs))
6148                                 continue;
6149                         cancel_req = req;
6150                         break;
6151                 }
6152                 if (cancel_req)
6153                         prepare_to_wait(&ctx->inflight_wait, &wait,
6154                                                 TASK_UNINTERRUPTIBLE);
6155                 spin_unlock_irq(&ctx->inflight_lock);
6156
6157                 /* We need to keep going until we don't find a matching req */
6158                 if (!cancel_req)
6159                         break;
6160
6161                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
6162                 io_put_req(cancel_req);
6163                 schedule();
6164         }
6165         finish_wait(&ctx->inflight_wait, &wait);
6166 }
6167
6168 static int io_uring_flush(struct file *file, void *data)
6169 {
6170         struct io_ring_ctx *ctx = file->private_data;
6171
6172         io_uring_cancel_files(ctx, data);
6173         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
6174                 io_cqring_overflow_flush(ctx, true);
6175                 io_wq_cancel_all(ctx->io_wq);
6176         }
6177         return 0;
6178 }
6179
6180 static void *io_uring_validate_mmap_request(struct file *file,
6181                                             loff_t pgoff, size_t sz)
6182 {
6183         struct io_ring_ctx *ctx = file->private_data;
6184         loff_t offset = pgoff << PAGE_SHIFT;
6185         struct page *page;
6186         void *ptr;
6187
6188         switch (offset) {
6189         case IORING_OFF_SQ_RING:
6190         case IORING_OFF_CQ_RING:
6191                 ptr = ctx->rings;
6192                 break;
6193         case IORING_OFF_SQES:
6194                 ptr = ctx->sq_sqes;
6195                 break;
6196         default:
6197                 return ERR_PTR(-EINVAL);
6198         }
6199
6200         page = virt_to_head_page(ptr);
6201         if (sz > page_size(page))
6202                 return ERR_PTR(-EINVAL);
6203
6204         return ptr;
6205 }
6206
6207 #ifdef CONFIG_MMU
6208
6209 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6210 {
6211         size_t sz = vma->vm_end - vma->vm_start;
6212         unsigned long pfn;
6213         void *ptr;
6214
6215         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
6216         if (IS_ERR(ptr))
6217                 return PTR_ERR(ptr);
6218
6219         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
6220         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
6221 }
6222
6223 #else /* !CONFIG_MMU */
6224
6225 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6226 {
6227         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
6228 }
6229
6230 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
6231 {
6232         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
6233 }
6234
6235 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
6236         unsigned long addr, unsigned long len,
6237         unsigned long pgoff, unsigned long flags)
6238 {
6239         void *ptr;
6240
6241         ptr = io_uring_validate_mmap_request(file, pgoff, len);
6242         if (IS_ERR(ptr))
6243                 return PTR_ERR(ptr);
6244
6245         return (unsigned long) ptr;
6246 }
6247
6248 #endif /* !CONFIG_MMU */
6249
6250 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
6251                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
6252                 size_t, sigsz)
6253 {
6254         struct io_ring_ctx *ctx;
6255         long ret = -EBADF;
6256         int submitted = 0;
6257         struct fd f;
6258
6259         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
6260                 return -EINVAL;
6261
6262         f = fdget(fd);
6263         if (!f.file)
6264                 return -EBADF;
6265
6266         ret = -EOPNOTSUPP;
6267         if (f.file->f_op != &io_uring_fops)
6268                 goto out_fput;
6269
6270         ret = -ENXIO;
6271         ctx = f.file->private_data;
6272         if (!percpu_ref_tryget(&ctx->refs))
6273                 goto out_fput;
6274
6275         /*
6276          * For SQ polling, the thread will do all submissions and completions.
6277          * Just return the requested submit count, and wake the thread if
6278          * we were asked to.
6279          */
6280         ret = 0;
6281         if (ctx->flags & IORING_SETUP_SQPOLL) {
6282                 if (!list_empty_careful(&ctx->cq_overflow_list))
6283                         io_cqring_overflow_flush(ctx, false);
6284                 if (flags & IORING_ENTER_SQ_WAKEUP)
6285                         wake_up(&ctx->sqo_wait);
6286                 submitted = to_submit;
6287         } else if (to_submit) {
6288                 struct mm_struct *cur_mm;
6289
6290                 if (current->mm != ctx->sqo_mm ||
6291                     current_cred() != ctx->creds) {
6292                         ret = -EPERM;
6293                         goto out;
6294                 }
6295
6296                 mutex_lock(&ctx->uring_lock);
6297                 /* already have mm, so io_submit_sqes() won't try to grab it */
6298                 cur_mm = ctx->sqo_mm;
6299                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
6300                                            &cur_mm, false);
6301                 mutex_unlock(&ctx->uring_lock);
6302
6303                 if (submitted != to_submit)
6304                         goto out;
6305         }
6306         if (flags & IORING_ENTER_GETEVENTS) {
6307                 unsigned nr_events = 0;
6308
6309                 min_complete = min(min_complete, ctx->cq_entries);
6310
6311                 if (ctx->flags & IORING_SETUP_IOPOLL) {
6312                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
6313                 } else {
6314                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
6315                 }
6316         }
6317
6318 out:
6319         percpu_ref_put(&ctx->refs);
6320 out_fput:
6321         fdput(f);
6322         return submitted ? submitted : ret;
6323 }
6324
6325 static const struct file_operations io_uring_fops = {
6326         .release        = io_uring_release,
6327         .flush          = io_uring_flush,
6328         .mmap           = io_uring_mmap,
6329 #ifndef CONFIG_MMU
6330         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
6331         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
6332 #endif
6333         .poll           = io_uring_poll,
6334         .fasync         = io_uring_fasync,
6335 };
6336
6337 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
6338                                   struct io_uring_params *p)
6339 {
6340         struct io_rings *rings;
6341         size_t size, sq_array_offset;
6342
6343         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
6344         if (size == SIZE_MAX)
6345                 return -EOVERFLOW;
6346
6347         rings = io_mem_alloc(size);
6348         if (!rings)
6349                 return -ENOMEM;
6350
6351         ctx->rings = rings;
6352         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
6353         rings->sq_ring_mask = p->sq_entries - 1;
6354         rings->cq_ring_mask = p->cq_entries - 1;
6355         rings->sq_ring_entries = p->sq_entries;
6356         rings->cq_ring_entries = p->cq_entries;
6357         ctx->sq_mask = rings->sq_ring_mask;
6358         ctx->cq_mask = rings->cq_ring_mask;
6359         ctx->sq_entries = rings->sq_ring_entries;
6360         ctx->cq_entries = rings->cq_ring_entries;
6361
6362         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
6363         if (size == SIZE_MAX) {
6364                 io_mem_free(ctx->rings);
6365                 ctx->rings = NULL;
6366                 return -EOVERFLOW;
6367         }
6368
6369         ctx->sq_sqes = io_mem_alloc(size);
6370         if (!ctx->sq_sqes) {
6371                 io_mem_free(ctx->rings);
6372                 ctx->rings = NULL;
6373                 return -ENOMEM;
6374         }
6375
6376         return 0;
6377 }
6378
6379 /*
6380  * Allocate an anonymous fd, this is what constitutes the application
6381  * visible backing of an io_uring instance. The application mmaps this
6382  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
6383  * we have to tie this fd to a socket for file garbage collection purposes.
6384  */
6385 static int io_uring_get_fd(struct io_ring_ctx *ctx)
6386 {
6387         struct file *file;
6388         int ret;
6389
6390 #if defined(CONFIG_UNIX)
6391         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
6392                                 &ctx->ring_sock);
6393         if (ret)
6394                 return ret;
6395 #endif
6396
6397         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
6398         if (ret < 0)
6399                 goto err;
6400
6401         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
6402                                         O_RDWR | O_CLOEXEC);
6403         if (IS_ERR(file)) {
6404                 put_unused_fd(ret);
6405                 ret = PTR_ERR(file);
6406                 goto err;
6407         }
6408
6409 #if defined(CONFIG_UNIX)
6410         ctx->ring_sock->file = file;
6411 #endif
6412         fd_install(ret, file);
6413         return ret;
6414 err:
6415 #if defined(CONFIG_UNIX)
6416         sock_release(ctx->ring_sock);
6417         ctx->ring_sock = NULL;
6418 #endif
6419         return ret;
6420 }
6421
6422 static int io_uring_create(unsigned entries, struct io_uring_params *p)
6423 {
6424         struct user_struct *user = NULL;
6425         struct io_ring_ctx *ctx;
6426         bool account_mem;
6427         int ret;
6428
6429         if (!entries)
6430                 return -EINVAL;
6431         if (entries > IORING_MAX_ENTRIES) {
6432                 if (!(p->flags & IORING_SETUP_CLAMP))
6433                         return -EINVAL;
6434                 entries = IORING_MAX_ENTRIES;
6435         }
6436
6437         /*
6438          * Use twice as many entries for the CQ ring. It's possible for the
6439          * application to drive a higher depth than the size of the SQ ring,
6440          * since the sqes are only used at submission time. This allows for
6441          * some flexibility in overcommitting a bit. If the application has
6442          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
6443          * of CQ ring entries manually.
6444          */
6445         p->sq_entries = roundup_pow_of_two(entries);
6446         if (p->flags & IORING_SETUP_CQSIZE) {
6447                 /*
6448                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
6449                  * to a power-of-two, if it isn't already. We do NOT impose
6450                  * any cq vs sq ring sizing.
6451                  */
6452                 if (p->cq_entries < p->sq_entries)
6453                         return -EINVAL;
6454                 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
6455                         if (!(p->flags & IORING_SETUP_CLAMP))
6456                                 return -EINVAL;
6457                         p->cq_entries = IORING_MAX_CQ_ENTRIES;
6458                 }
6459                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
6460         } else {
6461                 p->cq_entries = 2 * p->sq_entries;
6462         }
6463
6464         user = get_uid(current_user());
6465         account_mem = !capable(CAP_IPC_LOCK);
6466
6467         if (account_mem) {
6468                 ret = io_account_mem(user,
6469                                 ring_pages(p->sq_entries, p->cq_entries));
6470                 if (ret) {
6471                         free_uid(user);
6472                         return ret;
6473                 }
6474         }
6475
6476         ctx = io_ring_ctx_alloc(p);
6477         if (!ctx) {
6478                 if (account_mem)
6479                         io_unaccount_mem(user, ring_pages(p->sq_entries,
6480                                                                 p->cq_entries));
6481                 free_uid(user);
6482                 return -ENOMEM;
6483         }
6484         ctx->compat = in_compat_syscall();
6485         ctx->account_mem = account_mem;
6486         ctx->user = user;
6487         ctx->creds = get_current_cred();
6488
6489         ret = io_allocate_scq_urings(ctx, p);
6490         if (ret)
6491                 goto err;
6492
6493         ret = io_sq_offload_start(ctx, p);
6494         if (ret)
6495                 goto err;
6496
6497         memset(&p->sq_off, 0, sizeof(p->sq_off));
6498         p->sq_off.head = offsetof(struct io_rings, sq.head);
6499         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
6500         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
6501         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
6502         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
6503         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
6504         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
6505
6506         memset(&p->cq_off, 0, sizeof(p->cq_off));
6507         p->cq_off.head = offsetof(struct io_rings, cq.head);
6508         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
6509         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
6510         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
6511         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
6512         p->cq_off.cqes = offsetof(struct io_rings, cqes);
6513
6514         /*
6515          * Install ring fd as the very last thing, so we don't risk someone
6516          * having closed it before we finish setup
6517          */
6518         ret = io_uring_get_fd(ctx);
6519         if (ret < 0)
6520                 goto err;
6521
6522         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
6523                         IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS;
6524         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
6525         return ret;
6526 err:
6527         io_ring_ctx_wait_and_kill(ctx);
6528         return ret;
6529 }
6530
6531 /*
6532  * Sets up an aio uring context, and returns the fd. Applications asks for a
6533  * ring size, we return the actual sq/cq ring sizes (among other things) in the
6534  * params structure passed in.
6535  */
6536 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
6537 {
6538         struct io_uring_params p;
6539         long ret;
6540         int i;
6541
6542         if (copy_from_user(&p, params, sizeof(p)))
6543                 return -EFAULT;
6544         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
6545                 if (p.resv[i])
6546                         return -EINVAL;
6547         }
6548
6549         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
6550                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
6551                         IORING_SETUP_CLAMP))
6552                 return -EINVAL;
6553
6554         ret = io_uring_create(entries, &p);
6555         if (ret < 0)
6556                 return ret;
6557
6558         if (copy_to_user(params, &p, sizeof(p)))
6559                 return -EFAULT;
6560
6561         return ret;
6562 }
6563
6564 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
6565                 struct io_uring_params __user *, params)
6566 {
6567         return io_uring_setup(entries, params);
6568 }
6569
6570 static int io_probe(struct io_ring_ctx *ctx, void __user *arg, unsigned nr_args)
6571 {
6572         struct io_uring_probe *p;
6573         size_t size;
6574         int i, ret;
6575
6576         size = struct_size(p, ops, nr_args);
6577         if (size == SIZE_MAX)
6578                 return -EOVERFLOW;
6579         p = kzalloc(size, GFP_KERNEL);
6580         if (!p)
6581                 return -ENOMEM;
6582
6583         ret = -EFAULT;
6584         if (copy_from_user(p, arg, size))
6585                 goto out;
6586         ret = -EINVAL;
6587         if (memchr_inv(p, 0, size))
6588                 goto out;
6589
6590         p->last_op = IORING_OP_LAST - 1;
6591         if (nr_args > IORING_OP_LAST)
6592                 nr_args = IORING_OP_LAST;
6593
6594         for (i = 0; i < nr_args; i++) {
6595                 p->ops[i].op = i;
6596                 if (!io_op_defs[i].not_supported)
6597                         p->ops[i].flags = IO_URING_OP_SUPPORTED;
6598         }
6599         p->ops_len = i;
6600
6601         ret = 0;
6602         if (copy_to_user(arg, p, size))
6603                 ret = -EFAULT;
6604 out:
6605         kfree(p);
6606         return ret;
6607 }
6608
6609 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
6610                                void __user *arg, unsigned nr_args)
6611         __releases(ctx->uring_lock)
6612         __acquires(ctx->uring_lock)
6613 {
6614         int ret;
6615
6616         /*
6617          * We're inside the ring mutex, if the ref is already dying, then
6618          * someone else killed the ctx or is already going through
6619          * io_uring_register().
6620          */
6621         if (percpu_ref_is_dying(&ctx->refs))
6622                 return -ENXIO;
6623
6624         if (opcode != IORING_UNREGISTER_FILES &&
6625             opcode != IORING_REGISTER_FILES_UPDATE &&
6626             opcode != IORING_REGISTER_PROBE) {
6627                 percpu_ref_kill(&ctx->refs);
6628
6629                 /*
6630                  * Drop uring mutex before waiting for references to exit. If
6631                  * another thread is currently inside io_uring_enter() it might
6632                  * need to grab the uring_lock to make progress. If we hold it
6633                  * here across the drain wait, then we can deadlock. It's safe
6634                  * to drop the mutex here, since no new references will come in
6635                  * after we've killed the percpu ref.
6636                  */
6637                 mutex_unlock(&ctx->uring_lock);
6638                 ret = wait_for_completion_interruptible(&ctx->completions[0]);
6639                 mutex_lock(&ctx->uring_lock);
6640                 if (ret) {
6641                         percpu_ref_resurrect(&ctx->refs);
6642                         ret = -EINTR;
6643                         goto out;
6644                 }
6645         }
6646
6647         switch (opcode) {
6648         case IORING_REGISTER_BUFFERS:
6649                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
6650                 break;
6651         case IORING_UNREGISTER_BUFFERS:
6652                 ret = -EINVAL;
6653                 if (arg || nr_args)
6654                         break;
6655                 ret = io_sqe_buffer_unregister(ctx);
6656                 break;
6657         case IORING_REGISTER_FILES:
6658                 ret = io_sqe_files_register(ctx, arg, nr_args);
6659                 break;
6660         case IORING_UNREGISTER_FILES:
6661                 ret = -EINVAL;
6662                 if (arg || nr_args)
6663                         break;
6664                 ret = io_sqe_files_unregister(ctx);
6665                 break;
6666         case IORING_REGISTER_FILES_UPDATE:
6667                 ret = io_sqe_files_update(ctx, arg, nr_args);
6668                 break;
6669         case IORING_REGISTER_EVENTFD:
6670         case IORING_REGISTER_EVENTFD_ASYNC:
6671                 ret = -EINVAL;
6672                 if (nr_args != 1)
6673                         break;
6674                 ret = io_eventfd_register(ctx, arg);
6675                 if (ret)
6676                         break;
6677                 if (opcode == IORING_REGISTER_EVENTFD_ASYNC)
6678                         ctx->eventfd_async = 1;
6679                 else
6680                         ctx->eventfd_async = 0;
6681                 break;
6682         case IORING_UNREGISTER_EVENTFD:
6683                 ret = -EINVAL;
6684                 if (arg || nr_args)
6685                         break;
6686                 ret = io_eventfd_unregister(ctx);
6687                 break;
6688         case IORING_REGISTER_PROBE:
6689                 ret = -EINVAL;
6690                 if (!arg || nr_args > 256)
6691                         break;
6692                 ret = io_probe(ctx, arg, nr_args);
6693                 break;
6694         default:
6695                 ret = -EINVAL;
6696                 break;
6697         }
6698
6699
6700         if (opcode != IORING_UNREGISTER_FILES &&
6701             opcode != IORING_REGISTER_FILES_UPDATE &&
6702             opcode != IORING_REGISTER_PROBE) {
6703                 /* bring the ctx back to life */
6704                 percpu_ref_reinit(&ctx->refs);
6705 out:
6706                 reinit_completion(&ctx->completions[0]);
6707         }
6708         return ret;
6709 }
6710
6711 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
6712                 void __user *, arg, unsigned int, nr_args)
6713 {
6714         struct io_ring_ctx *ctx;
6715         long ret = -EBADF;
6716         struct fd f;
6717
6718         f = fdget(fd);
6719         if (!f.file)
6720                 return -EBADF;
6721
6722         ret = -EOPNOTSUPP;
6723         if (f.file->f_op != &io_uring_fops)
6724                 goto out_fput;
6725
6726         ctx = f.file->private_data;
6727
6728         mutex_lock(&ctx->uring_lock);
6729         ret = __io_uring_register(ctx, opcode, arg, nr_args);
6730         mutex_unlock(&ctx->uring_lock);
6731         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
6732                                                         ctx->cq_ev_fd != NULL, ret);
6733 out_fput:
6734         fdput(f);
6735         return ret;
6736 }
6737
6738 static int __init io_uring_init(void)
6739 {
6740         BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
6741         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
6742         return 0;
6743 };
6744 __initcall(io_uring_init);