io_uring: add support for send(2) and recv(2)
[linux-2.6-microblaze.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73 #include <linux/namei.h>
74 #include <linux/fsnotify.h>
75 #include <linux/fadvise.h>
76
77 #define CREATE_TRACE_POINTS
78 #include <trace/events/io_uring.h>
79
80 #include <uapi/linux/io_uring.h>
81
82 #include "internal.h"
83 #include "io-wq.h"
84
85 #define IORING_MAX_ENTRIES      32768
86 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
87
88 /*
89  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
90  */
91 #define IORING_FILE_TABLE_SHIFT 9
92 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
93 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
94 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
95
96 struct io_uring {
97         u32 head ____cacheline_aligned_in_smp;
98         u32 tail ____cacheline_aligned_in_smp;
99 };
100
101 /*
102  * This data is shared with the application through the mmap at offsets
103  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
104  *
105  * The offsets to the member fields are published through struct
106  * io_sqring_offsets when calling io_uring_setup.
107  */
108 struct io_rings {
109         /*
110          * Head and tail offsets into the ring; the offsets need to be
111          * masked to get valid indices.
112          *
113          * The kernel controls head of the sq ring and the tail of the cq ring,
114          * and the application controls tail of the sq ring and the head of the
115          * cq ring.
116          */
117         struct io_uring         sq, cq;
118         /*
119          * Bitmasks to apply to head and tail offsets (constant, equals
120          * ring_entries - 1)
121          */
122         u32                     sq_ring_mask, cq_ring_mask;
123         /* Ring sizes (constant, power of 2) */
124         u32                     sq_ring_entries, cq_ring_entries;
125         /*
126          * Number of invalid entries dropped by the kernel due to
127          * invalid index stored in array
128          *
129          * Written by the kernel, shouldn't be modified by the
130          * application (i.e. get number of "new events" by comparing to
131          * cached value).
132          *
133          * After a new SQ head value was read by the application this
134          * counter includes all submissions that were dropped reaching
135          * the new SQ head (and possibly more).
136          */
137         u32                     sq_dropped;
138         /*
139          * Runtime flags
140          *
141          * Written by the kernel, shouldn't be modified by the
142          * application.
143          *
144          * The application needs a full memory barrier before checking
145          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
146          */
147         u32                     sq_flags;
148         /*
149          * Number of completion events lost because the queue was full;
150          * this should be avoided by the application by making sure
151          * there are not more requests pending than there is space in
152          * the completion queue.
153          *
154          * Written by the kernel, shouldn't be modified by the
155          * application (i.e. get number of "new events" by comparing to
156          * cached value).
157          *
158          * As completion events come in out of order this counter is not
159          * ordered with any other data.
160          */
161         u32                     cq_overflow;
162         /*
163          * Ring buffer of completion events.
164          *
165          * The kernel writes completion events fresh every time they are
166          * produced, so the application is allowed to modify pending
167          * entries.
168          */
169         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
170 };
171
172 struct io_mapped_ubuf {
173         u64             ubuf;
174         size_t          len;
175         struct          bio_vec *bvec;
176         unsigned int    nr_bvecs;
177 };
178
179 struct fixed_file_table {
180         struct file             **files;
181 };
182
183 enum {
184         FFD_F_ATOMIC,
185 };
186
187 struct fixed_file_data {
188         struct fixed_file_table         *table;
189         struct io_ring_ctx              *ctx;
190
191         struct percpu_ref               refs;
192         struct llist_head               put_llist;
193         unsigned long                   state;
194         struct work_struct              ref_work;
195         struct completion               done;
196 };
197
198 struct io_ring_ctx {
199         struct {
200                 struct percpu_ref       refs;
201         } ____cacheline_aligned_in_smp;
202
203         struct {
204                 unsigned int            flags;
205                 bool                    compat;
206                 bool                    account_mem;
207                 bool                    cq_overflow_flushed;
208                 bool                    drain_next;
209
210                 /*
211                  * Ring buffer of indices into array of io_uring_sqe, which is
212                  * mmapped by the application using the IORING_OFF_SQES offset.
213                  *
214                  * This indirection could e.g. be used to assign fixed
215                  * io_uring_sqe entries to operations and only submit them to
216                  * the queue when needed.
217                  *
218                  * The kernel modifies neither the indices array nor the entries
219                  * array.
220                  */
221                 u32                     *sq_array;
222                 unsigned                cached_sq_head;
223                 unsigned                sq_entries;
224                 unsigned                sq_mask;
225                 unsigned                sq_thread_idle;
226                 unsigned                cached_sq_dropped;
227                 atomic_t                cached_cq_overflow;
228                 unsigned long           sq_check_overflow;
229
230                 struct list_head        defer_list;
231                 struct list_head        timeout_list;
232                 struct list_head        cq_overflow_list;
233
234                 wait_queue_head_t       inflight_wait;
235                 struct io_uring_sqe     *sq_sqes;
236         } ____cacheline_aligned_in_smp;
237
238         struct io_rings *rings;
239
240         /* IO offload */
241         struct io_wq            *io_wq;
242         struct task_struct      *sqo_thread;    /* if using sq thread polling */
243         struct mm_struct        *sqo_mm;
244         wait_queue_head_t       sqo_wait;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct fixed_file_data  *file_data;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         const struct cred       *creds;
261
262         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
263         struct completion       *completions;
264
265         /* if all else fails... */
266         struct io_kiocb         *fallback_req;
267
268 #if defined(CONFIG_UNIX)
269         struct socket           *ring_sock;
270 #endif
271
272         struct {
273                 unsigned                cached_cq_tail;
274                 unsigned                cq_entries;
275                 unsigned                cq_mask;
276                 atomic_t                cq_timeouts;
277                 unsigned long           cq_check_overflow;
278                 struct wait_queue_head  cq_wait;
279                 struct fasync_struct    *cq_fasync;
280                 struct eventfd_ctx      *cq_ev_fd;
281         } ____cacheline_aligned_in_smp;
282
283         struct {
284                 struct mutex            uring_lock;
285                 wait_queue_head_t       wait;
286         } ____cacheline_aligned_in_smp;
287
288         struct {
289                 spinlock_t              completion_lock;
290                 struct llist_head       poll_llist;
291
292                 /*
293                  * ->poll_list is protected by the ctx->uring_lock for
294                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
295                  * For SQPOLL, only the single threaded io_sq_thread() will
296                  * manipulate the list, hence no extra locking is needed there.
297                  */
298                 struct list_head        poll_list;
299                 struct hlist_head       *cancel_hash;
300                 unsigned                cancel_hash_bits;
301                 bool                    poll_multi_file;
302
303                 spinlock_t              inflight_lock;
304                 struct list_head        inflight_list;
305         } ____cacheline_aligned_in_smp;
306 };
307
308 /*
309  * First field must be the file pointer in all the
310  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
311  */
312 struct io_poll_iocb {
313         struct file                     *file;
314         union {
315                 struct wait_queue_head  *head;
316                 u64                     addr;
317         };
318         __poll_t                        events;
319         bool                            done;
320         bool                            canceled;
321         struct wait_queue_entry         wait;
322 };
323
324 struct io_close {
325         struct file                     *file;
326         struct file                     *put_file;
327         int                             fd;
328 };
329
330 struct io_timeout_data {
331         struct io_kiocb                 *req;
332         struct hrtimer                  timer;
333         struct timespec64               ts;
334         enum hrtimer_mode               mode;
335         u32                             seq_offset;
336 };
337
338 struct io_accept {
339         struct file                     *file;
340         struct sockaddr __user          *addr;
341         int __user                      *addr_len;
342         int                             flags;
343 };
344
345 struct io_sync {
346         struct file                     *file;
347         loff_t                          len;
348         loff_t                          off;
349         int                             flags;
350         int                             mode;
351 };
352
353 struct io_cancel {
354         struct file                     *file;
355         u64                             addr;
356 };
357
358 struct io_timeout {
359         struct file                     *file;
360         u64                             addr;
361         int                             flags;
362         unsigned                        count;
363 };
364
365 struct io_rw {
366         /* NOTE: kiocb has the file as the first member, so don't do it here */
367         struct kiocb                    kiocb;
368         u64                             addr;
369         u64                             len;
370 };
371
372 struct io_connect {
373         struct file                     *file;
374         struct sockaddr __user          *addr;
375         int                             addr_len;
376 };
377
378 struct io_sr_msg {
379         struct file                     *file;
380         union {
381                 struct user_msghdr __user *msg;
382                 void __user             *buf;
383         };
384         int                             msg_flags;
385         size_t                          len;
386 };
387
388 struct io_open {
389         struct file                     *file;
390         int                             dfd;
391         union {
392                 umode_t                 mode;
393                 unsigned                mask;
394         };
395         const char __user               *fname;
396         struct filename                 *filename;
397         struct statx __user             *buffer;
398         int                             flags;
399 };
400
401 struct io_files_update {
402         struct file                     *file;
403         u64                             arg;
404         u32                             nr_args;
405         u32                             offset;
406 };
407
408 struct io_fadvise {
409         struct file                     *file;
410         u64                             offset;
411         u32                             len;
412         u32                             advice;
413 };
414
415 struct io_madvise {
416         struct file                     *file;
417         u64                             addr;
418         u32                             len;
419         u32                             advice;
420 };
421
422 struct io_async_connect {
423         struct sockaddr_storage         address;
424 };
425
426 struct io_async_msghdr {
427         struct iovec                    fast_iov[UIO_FASTIOV];
428         struct iovec                    *iov;
429         struct sockaddr __user          *uaddr;
430         struct msghdr                   msg;
431 };
432
433 struct io_async_rw {
434         struct iovec                    fast_iov[UIO_FASTIOV];
435         struct iovec                    *iov;
436         ssize_t                         nr_segs;
437         ssize_t                         size;
438 };
439
440 struct io_async_open {
441         struct filename                 *filename;
442 };
443
444 struct io_async_ctx {
445         union {
446                 struct io_async_rw      rw;
447                 struct io_async_msghdr  msg;
448                 struct io_async_connect connect;
449                 struct io_timeout_data  timeout;
450                 struct io_async_open    open;
451         };
452 };
453
454 /*
455  * NOTE! Each of the iocb union members has the file pointer
456  * as the first entry in their struct definition. So you can
457  * access the file pointer through any of the sub-structs,
458  * or directly as just 'ki_filp' in this struct.
459  */
460 struct io_kiocb {
461         union {
462                 struct file             *file;
463                 struct io_rw            rw;
464                 struct io_poll_iocb     poll;
465                 struct io_accept        accept;
466                 struct io_sync          sync;
467                 struct io_cancel        cancel;
468                 struct io_timeout       timeout;
469                 struct io_connect       connect;
470                 struct io_sr_msg        sr_msg;
471                 struct io_open          open;
472                 struct io_close         close;
473                 struct io_files_update  files_update;
474                 struct io_fadvise       fadvise;
475                 struct io_madvise       madvise;
476         };
477
478         struct io_async_ctx             *io;
479         union {
480                 /*
481                  * ring_file is only used in the submission path, and
482                  * llist_node is only used for poll deferred completions
483                  */
484                 struct file             *ring_file;
485                 struct llist_node       llist_node;
486         };
487         int                             ring_fd;
488         bool                            has_user;
489         bool                            in_async;
490         bool                            needs_fixed_file;
491         u8                              opcode;
492
493         struct io_ring_ctx      *ctx;
494         union {
495                 struct list_head        list;
496                 struct hlist_node       hash_node;
497         };
498         struct list_head        link_list;
499         unsigned int            flags;
500         refcount_t              refs;
501 #define REQ_F_NOWAIT            1       /* must not punt to workers */
502 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
503 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
504 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
505 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
506 #define REQ_F_IO_DRAINED        32      /* drain done */
507 #define REQ_F_LINK              64      /* linked sqes */
508 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
509 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
510 #define REQ_F_DRAIN_LINK        512     /* link should be fully drained */
511 #define REQ_F_TIMEOUT           1024    /* timeout request */
512 #define REQ_F_ISREG             2048    /* regular file */
513 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
514 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
515 #define REQ_F_INFLIGHT          16384   /* on inflight list */
516 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
517 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
518 #define REQ_F_FORCE_ASYNC       131072  /* IOSQE_ASYNC */
519 #define REQ_F_CUR_POS           262144  /* read/write uses file position */
520         u64                     user_data;
521         u32                     result;
522         u32                     sequence;
523
524         struct list_head        inflight_entry;
525
526         struct io_wq_work       work;
527 };
528
529 #define IO_PLUG_THRESHOLD               2
530 #define IO_IOPOLL_BATCH                 8
531
532 struct io_submit_state {
533         struct blk_plug         plug;
534
535         /*
536          * io_kiocb alloc cache
537          */
538         void                    *reqs[IO_IOPOLL_BATCH];
539         unsigned                int free_reqs;
540         unsigned                int cur_req;
541
542         /*
543          * File reference cache
544          */
545         struct file             *file;
546         unsigned int            fd;
547         unsigned int            has_refs;
548         unsigned int            used_refs;
549         unsigned int            ios_left;
550 };
551
552 struct io_op_def {
553         /* needs req->io allocated for deferral/async */
554         unsigned                async_ctx : 1;
555         /* needs current->mm setup, does mm access */
556         unsigned                needs_mm : 1;
557         /* needs req->file assigned */
558         unsigned                needs_file : 1;
559         /* needs req->file assigned IFF fd is >= 0 */
560         unsigned                fd_non_neg : 1;
561         /* hash wq insertion if file is a regular file */
562         unsigned                hash_reg_file : 1;
563         /* unbound wq insertion if file is a non-regular file */
564         unsigned                unbound_nonreg_file : 1;
565 };
566
567 static const struct io_op_def io_op_defs[] = {
568         {
569                 /* IORING_OP_NOP */
570         },
571         {
572                 /* IORING_OP_READV */
573                 .async_ctx              = 1,
574                 .needs_mm               = 1,
575                 .needs_file             = 1,
576                 .unbound_nonreg_file    = 1,
577         },
578         {
579                 /* IORING_OP_WRITEV */
580                 .async_ctx              = 1,
581                 .needs_mm               = 1,
582                 .needs_file             = 1,
583                 .hash_reg_file          = 1,
584                 .unbound_nonreg_file    = 1,
585         },
586         {
587                 /* IORING_OP_FSYNC */
588                 .needs_file             = 1,
589         },
590         {
591                 /* IORING_OP_READ_FIXED */
592                 .needs_file             = 1,
593                 .unbound_nonreg_file    = 1,
594         },
595         {
596                 /* IORING_OP_WRITE_FIXED */
597                 .needs_file             = 1,
598                 .hash_reg_file          = 1,
599                 .unbound_nonreg_file    = 1,
600         },
601         {
602                 /* IORING_OP_POLL_ADD */
603                 .needs_file             = 1,
604                 .unbound_nonreg_file    = 1,
605         },
606         {
607                 /* IORING_OP_POLL_REMOVE */
608         },
609         {
610                 /* IORING_OP_SYNC_FILE_RANGE */
611                 .needs_file             = 1,
612         },
613         {
614                 /* IORING_OP_SENDMSG */
615                 .async_ctx              = 1,
616                 .needs_mm               = 1,
617                 .needs_file             = 1,
618                 .unbound_nonreg_file    = 1,
619         },
620         {
621                 /* IORING_OP_RECVMSG */
622                 .async_ctx              = 1,
623                 .needs_mm               = 1,
624                 .needs_file             = 1,
625                 .unbound_nonreg_file    = 1,
626         },
627         {
628                 /* IORING_OP_TIMEOUT */
629                 .async_ctx              = 1,
630                 .needs_mm               = 1,
631         },
632         {
633                 /* IORING_OP_TIMEOUT_REMOVE */
634         },
635         {
636                 /* IORING_OP_ACCEPT */
637                 .needs_mm               = 1,
638                 .needs_file             = 1,
639                 .unbound_nonreg_file    = 1,
640         },
641         {
642                 /* IORING_OP_ASYNC_CANCEL */
643         },
644         {
645                 /* IORING_OP_LINK_TIMEOUT */
646                 .async_ctx              = 1,
647                 .needs_mm               = 1,
648         },
649         {
650                 /* IORING_OP_CONNECT */
651                 .async_ctx              = 1,
652                 .needs_mm               = 1,
653                 .needs_file             = 1,
654                 .unbound_nonreg_file    = 1,
655         },
656         {
657                 /* IORING_OP_FALLOCATE */
658                 .needs_file             = 1,
659         },
660         {
661                 /* IORING_OP_OPENAT */
662                 .needs_file             = 1,
663                 .fd_non_neg             = 1,
664         },
665         {
666                 /* IORING_OP_CLOSE */
667                 .needs_file             = 1,
668         },
669         {
670                 /* IORING_OP_FILES_UPDATE */
671                 .needs_mm               = 1,
672         },
673         {
674                 /* IORING_OP_STATX */
675                 .needs_mm               = 1,
676                 .needs_file             = 1,
677                 .fd_non_neg             = 1,
678         },
679         {
680                 /* IORING_OP_READ */
681                 .needs_mm               = 1,
682                 .needs_file             = 1,
683                 .unbound_nonreg_file    = 1,
684         },
685         {
686                 /* IORING_OP_WRITE */
687                 .needs_mm               = 1,
688                 .needs_file             = 1,
689                 .unbound_nonreg_file    = 1,
690         },
691         {
692                 /* IORING_OP_FADVISE */
693                 .needs_file             = 1,
694         },
695         {
696                 /* IORING_OP_MADVISE */
697                 .needs_mm               = 1,
698         },
699         {
700                 /* IORING_OP_SEND */
701                 .needs_mm               = 1,
702                 .needs_file             = 1,
703                 .unbound_nonreg_file    = 1,
704         },
705         {
706                 /* IORING_OP_RECV */
707                 .needs_mm               = 1,
708                 .needs_file             = 1,
709                 .unbound_nonreg_file    = 1,
710         },
711 };
712
713 static void io_wq_submit_work(struct io_wq_work **workptr);
714 static void io_cqring_fill_event(struct io_kiocb *req, long res);
715 static void io_put_req(struct io_kiocb *req);
716 static void __io_double_put_req(struct io_kiocb *req);
717 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
718 static void io_queue_linked_timeout(struct io_kiocb *req);
719 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
720                                  struct io_uring_files_update *ip,
721                                  unsigned nr_args);
722
723 static struct kmem_cache *req_cachep;
724
725 static const struct file_operations io_uring_fops;
726
727 struct sock *io_uring_get_socket(struct file *file)
728 {
729 #if defined(CONFIG_UNIX)
730         if (file->f_op == &io_uring_fops) {
731                 struct io_ring_ctx *ctx = file->private_data;
732
733                 return ctx->ring_sock->sk;
734         }
735 #endif
736         return NULL;
737 }
738 EXPORT_SYMBOL(io_uring_get_socket);
739
740 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
741 {
742         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
743
744         complete(&ctx->completions[0]);
745 }
746
747 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
748 {
749         struct io_ring_ctx *ctx;
750         int hash_bits;
751
752         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
753         if (!ctx)
754                 return NULL;
755
756         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
757         if (!ctx->fallback_req)
758                 goto err;
759
760         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
761         if (!ctx->completions)
762                 goto err;
763
764         /*
765          * Use 5 bits less than the max cq entries, that should give us around
766          * 32 entries per hash list if totally full and uniformly spread.
767          */
768         hash_bits = ilog2(p->cq_entries);
769         hash_bits -= 5;
770         if (hash_bits <= 0)
771                 hash_bits = 1;
772         ctx->cancel_hash_bits = hash_bits;
773         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
774                                         GFP_KERNEL);
775         if (!ctx->cancel_hash)
776                 goto err;
777         __hash_init(ctx->cancel_hash, 1U << hash_bits);
778
779         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
780                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
781                 goto err;
782
783         ctx->flags = p->flags;
784         init_waitqueue_head(&ctx->cq_wait);
785         INIT_LIST_HEAD(&ctx->cq_overflow_list);
786         init_completion(&ctx->completions[0]);
787         init_completion(&ctx->completions[1]);
788         mutex_init(&ctx->uring_lock);
789         init_waitqueue_head(&ctx->wait);
790         spin_lock_init(&ctx->completion_lock);
791         init_llist_head(&ctx->poll_llist);
792         INIT_LIST_HEAD(&ctx->poll_list);
793         INIT_LIST_HEAD(&ctx->defer_list);
794         INIT_LIST_HEAD(&ctx->timeout_list);
795         init_waitqueue_head(&ctx->inflight_wait);
796         spin_lock_init(&ctx->inflight_lock);
797         INIT_LIST_HEAD(&ctx->inflight_list);
798         return ctx;
799 err:
800         if (ctx->fallback_req)
801                 kmem_cache_free(req_cachep, ctx->fallback_req);
802         kfree(ctx->completions);
803         kfree(ctx->cancel_hash);
804         kfree(ctx);
805         return NULL;
806 }
807
808 static inline bool __req_need_defer(struct io_kiocb *req)
809 {
810         struct io_ring_ctx *ctx = req->ctx;
811
812         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
813                                         + atomic_read(&ctx->cached_cq_overflow);
814 }
815
816 static inline bool req_need_defer(struct io_kiocb *req)
817 {
818         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
819                 return __req_need_defer(req);
820
821         return false;
822 }
823
824 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
825 {
826         struct io_kiocb *req;
827
828         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
829         if (req && !req_need_defer(req)) {
830                 list_del_init(&req->list);
831                 return req;
832         }
833
834         return NULL;
835 }
836
837 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
838 {
839         struct io_kiocb *req;
840
841         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
842         if (req) {
843                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
844                         return NULL;
845                 if (!__req_need_defer(req)) {
846                         list_del_init(&req->list);
847                         return req;
848                 }
849         }
850
851         return NULL;
852 }
853
854 static void __io_commit_cqring(struct io_ring_ctx *ctx)
855 {
856         struct io_rings *rings = ctx->rings;
857
858         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
859                 /* order cqe stores with ring update */
860                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
861
862                 if (wq_has_sleeper(&ctx->cq_wait)) {
863                         wake_up_interruptible(&ctx->cq_wait);
864                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
865                 }
866         }
867 }
868
869 static inline bool io_prep_async_work(struct io_kiocb *req,
870                                       struct io_kiocb **link)
871 {
872         const struct io_op_def *def = &io_op_defs[req->opcode];
873         bool do_hashed = false;
874
875         if (req->flags & REQ_F_ISREG) {
876                 if (def->hash_reg_file)
877                         do_hashed = true;
878         } else {
879                 if (def->unbound_nonreg_file)
880                         req->work.flags |= IO_WQ_WORK_UNBOUND;
881         }
882         if (def->needs_mm)
883                 req->work.flags |= IO_WQ_WORK_NEEDS_USER;
884
885         *link = io_prep_linked_timeout(req);
886         return do_hashed;
887 }
888
889 static inline void io_queue_async_work(struct io_kiocb *req)
890 {
891         struct io_ring_ctx *ctx = req->ctx;
892         struct io_kiocb *link;
893         bool do_hashed;
894
895         do_hashed = io_prep_async_work(req, &link);
896
897         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
898                                         req->flags);
899         if (!do_hashed) {
900                 io_wq_enqueue(ctx->io_wq, &req->work);
901         } else {
902                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
903                                         file_inode(req->file));
904         }
905
906         if (link)
907                 io_queue_linked_timeout(link);
908 }
909
910 static void io_kill_timeout(struct io_kiocb *req)
911 {
912         int ret;
913
914         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
915         if (ret != -1) {
916                 atomic_inc(&req->ctx->cq_timeouts);
917                 list_del_init(&req->list);
918                 io_cqring_fill_event(req, 0);
919                 io_put_req(req);
920         }
921 }
922
923 static void io_kill_timeouts(struct io_ring_ctx *ctx)
924 {
925         struct io_kiocb *req, *tmp;
926
927         spin_lock_irq(&ctx->completion_lock);
928         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
929                 io_kill_timeout(req);
930         spin_unlock_irq(&ctx->completion_lock);
931 }
932
933 static void io_commit_cqring(struct io_ring_ctx *ctx)
934 {
935         struct io_kiocb *req;
936
937         while ((req = io_get_timeout_req(ctx)) != NULL)
938                 io_kill_timeout(req);
939
940         __io_commit_cqring(ctx);
941
942         while ((req = io_get_deferred_req(ctx)) != NULL) {
943                 req->flags |= REQ_F_IO_DRAINED;
944                 io_queue_async_work(req);
945         }
946 }
947
948 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
949 {
950         struct io_rings *rings = ctx->rings;
951         unsigned tail;
952
953         tail = ctx->cached_cq_tail;
954         /*
955          * writes to the cq entry need to come after reading head; the
956          * control dependency is enough as we're using WRITE_ONCE to
957          * fill the cq entry
958          */
959         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
960                 return NULL;
961
962         ctx->cached_cq_tail++;
963         return &rings->cqes[tail & ctx->cq_mask];
964 }
965
966 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
967 {
968         if (waitqueue_active(&ctx->wait))
969                 wake_up(&ctx->wait);
970         if (waitqueue_active(&ctx->sqo_wait))
971                 wake_up(&ctx->sqo_wait);
972         if (ctx->cq_ev_fd)
973                 eventfd_signal(ctx->cq_ev_fd, 1);
974 }
975
976 /* Returns true if there are no backlogged entries after the flush */
977 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
978 {
979         struct io_rings *rings = ctx->rings;
980         struct io_uring_cqe *cqe;
981         struct io_kiocb *req;
982         unsigned long flags;
983         LIST_HEAD(list);
984
985         if (!force) {
986                 if (list_empty_careful(&ctx->cq_overflow_list))
987                         return true;
988                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
989                     rings->cq_ring_entries))
990                         return false;
991         }
992
993         spin_lock_irqsave(&ctx->completion_lock, flags);
994
995         /* if force is set, the ring is going away. always drop after that */
996         if (force)
997                 ctx->cq_overflow_flushed = true;
998
999         cqe = NULL;
1000         while (!list_empty(&ctx->cq_overflow_list)) {
1001                 cqe = io_get_cqring(ctx);
1002                 if (!cqe && !force)
1003                         break;
1004
1005                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
1006                                                 list);
1007                 list_move(&req->list, &list);
1008                 if (cqe) {
1009                         WRITE_ONCE(cqe->user_data, req->user_data);
1010                         WRITE_ONCE(cqe->res, req->result);
1011                         WRITE_ONCE(cqe->flags, 0);
1012                 } else {
1013                         WRITE_ONCE(ctx->rings->cq_overflow,
1014                                 atomic_inc_return(&ctx->cached_cq_overflow));
1015                 }
1016         }
1017
1018         io_commit_cqring(ctx);
1019         if (cqe) {
1020                 clear_bit(0, &ctx->sq_check_overflow);
1021                 clear_bit(0, &ctx->cq_check_overflow);
1022         }
1023         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1024         io_cqring_ev_posted(ctx);
1025
1026         while (!list_empty(&list)) {
1027                 req = list_first_entry(&list, struct io_kiocb, list);
1028                 list_del(&req->list);
1029                 io_put_req(req);
1030         }
1031
1032         return cqe != NULL;
1033 }
1034
1035 static void io_cqring_fill_event(struct io_kiocb *req, long res)
1036 {
1037         struct io_ring_ctx *ctx = req->ctx;
1038         struct io_uring_cqe *cqe;
1039
1040         trace_io_uring_complete(ctx, req->user_data, res);
1041
1042         /*
1043          * If we can't get a cq entry, userspace overflowed the
1044          * submission (by quite a lot). Increment the overflow count in
1045          * the ring.
1046          */
1047         cqe = io_get_cqring(ctx);
1048         if (likely(cqe)) {
1049                 WRITE_ONCE(cqe->user_data, req->user_data);
1050                 WRITE_ONCE(cqe->res, res);
1051                 WRITE_ONCE(cqe->flags, 0);
1052         } else if (ctx->cq_overflow_flushed) {
1053                 WRITE_ONCE(ctx->rings->cq_overflow,
1054                                 atomic_inc_return(&ctx->cached_cq_overflow));
1055         } else {
1056                 if (list_empty(&ctx->cq_overflow_list)) {
1057                         set_bit(0, &ctx->sq_check_overflow);
1058                         set_bit(0, &ctx->cq_check_overflow);
1059                 }
1060                 refcount_inc(&req->refs);
1061                 req->result = res;
1062                 list_add_tail(&req->list, &ctx->cq_overflow_list);
1063         }
1064 }
1065
1066 static void io_cqring_add_event(struct io_kiocb *req, long res)
1067 {
1068         struct io_ring_ctx *ctx = req->ctx;
1069         unsigned long flags;
1070
1071         spin_lock_irqsave(&ctx->completion_lock, flags);
1072         io_cqring_fill_event(req, res);
1073         io_commit_cqring(ctx);
1074         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1075
1076         io_cqring_ev_posted(ctx);
1077 }
1078
1079 static inline bool io_is_fallback_req(struct io_kiocb *req)
1080 {
1081         return req == (struct io_kiocb *)
1082                         ((unsigned long) req->ctx->fallback_req & ~1UL);
1083 }
1084
1085 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
1086 {
1087         struct io_kiocb *req;
1088
1089         req = ctx->fallback_req;
1090         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
1091                 return req;
1092
1093         return NULL;
1094 }
1095
1096 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
1097                                    struct io_submit_state *state)
1098 {
1099         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1100         struct io_kiocb *req;
1101
1102         if (!state) {
1103                 req = kmem_cache_alloc(req_cachep, gfp);
1104                 if (unlikely(!req))
1105                         goto fallback;
1106         } else if (!state->free_reqs) {
1107                 size_t sz;
1108                 int ret;
1109
1110                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
1111                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
1112
1113                 /*
1114                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
1115                  * retry single alloc to be on the safe side.
1116                  */
1117                 if (unlikely(ret <= 0)) {
1118                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1119                         if (!state->reqs[0])
1120                                 goto fallback;
1121                         ret = 1;
1122                 }
1123                 state->free_reqs = ret - 1;
1124                 state->cur_req = 1;
1125                 req = state->reqs[0];
1126         } else {
1127                 req = state->reqs[state->cur_req];
1128                 state->free_reqs--;
1129                 state->cur_req++;
1130         }
1131
1132 got_it:
1133         req->io = NULL;
1134         req->ring_file = NULL;
1135         req->file = NULL;
1136         req->ctx = ctx;
1137         req->flags = 0;
1138         /* one is dropped after submission, the other at completion */
1139         refcount_set(&req->refs, 2);
1140         req->result = 0;
1141         INIT_IO_WORK(&req->work, io_wq_submit_work);
1142         return req;
1143 fallback:
1144         req = io_get_fallback_req(ctx);
1145         if (req)
1146                 goto got_it;
1147         percpu_ref_put(&ctx->refs);
1148         return NULL;
1149 }
1150
1151 static void __io_req_do_free(struct io_kiocb *req)
1152 {
1153         if (likely(!io_is_fallback_req(req)))
1154                 kmem_cache_free(req_cachep, req);
1155         else
1156                 clear_bit_unlock(0, (unsigned long *) req->ctx->fallback_req);
1157 }
1158
1159 static void __io_req_aux_free(struct io_kiocb *req)
1160 {
1161         struct io_ring_ctx *ctx = req->ctx;
1162
1163         if (req->io)
1164                 kfree(req->io);
1165         if (req->file) {
1166                 if (req->flags & REQ_F_FIXED_FILE)
1167                         percpu_ref_put(&ctx->file_data->refs);
1168                 else
1169                         fput(req->file);
1170         }
1171 }
1172
1173 static void __io_free_req(struct io_kiocb *req)
1174 {
1175         __io_req_aux_free(req);
1176
1177         if (req->flags & REQ_F_INFLIGHT) {
1178                 struct io_ring_ctx *ctx = req->ctx;
1179                 unsigned long flags;
1180
1181                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1182                 list_del(&req->inflight_entry);
1183                 if (waitqueue_active(&ctx->inflight_wait))
1184                         wake_up(&ctx->inflight_wait);
1185                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1186         }
1187
1188         percpu_ref_put(&req->ctx->refs);
1189         __io_req_do_free(req);
1190 }
1191
1192 struct req_batch {
1193         void *reqs[IO_IOPOLL_BATCH];
1194         int to_free;
1195         int need_iter;
1196 };
1197
1198 static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
1199 {
1200         if (!rb->to_free)
1201                 return;
1202         if (rb->need_iter) {
1203                 int i, inflight = 0;
1204                 unsigned long flags;
1205
1206                 for (i = 0; i < rb->to_free; i++) {
1207                         struct io_kiocb *req = rb->reqs[i];
1208
1209                         if (req->flags & REQ_F_FIXED_FILE)
1210                                 req->file = NULL;
1211                         if (req->flags & REQ_F_INFLIGHT)
1212                                 inflight++;
1213                         else
1214                                 rb->reqs[i] = NULL;
1215                         __io_req_aux_free(req);
1216                 }
1217                 if (!inflight)
1218                         goto do_free;
1219
1220                 spin_lock_irqsave(&ctx->inflight_lock, flags);
1221                 for (i = 0; i < rb->to_free; i++) {
1222                         struct io_kiocb *req = rb->reqs[i];
1223
1224                         if (req) {
1225                                 list_del(&req->inflight_entry);
1226                                 if (!--inflight)
1227                                         break;
1228                         }
1229                 }
1230                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
1231
1232                 if (waitqueue_active(&ctx->inflight_wait))
1233                         wake_up(&ctx->inflight_wait);
1234         }
1235 do_free:
1236         kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
1237         percpu_ref_put_many(&ctx->refs, rb->to_free);
1238         percpu_ref_put_many(&ctx->file_data->refs, rb->to_free);
1239         rb->to_free = rb->need_iter = 0;
1240 }
1241
1242 static bool io_link_cancel_timeout(struct io_kiocb *req)
1243 {
1244         struct io_ring_ctx *ctx = req->ctx;
1245         int ret;
1246
1247         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
1248         if (ret != -1) {
1249                 io_cqring_fill_event(req, -ECANCELED);
1250                 io_commit_cqring(ctx);
1251                 req->flags &= ~REQ_F_LINK;
1252                 io_put_req(req);
1253                 return true;
1254         }
1255
1256         return false;
1257 }
1258
1259 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1260 {
1261         struct io_ring_ctx *ctx = req->ctx;
1262         bool wake_ev = false;
1263
1264         /* Already got next link */
1265         if (req->flags & REQ_F_LINK_NEXT)
1266                 return;
1267
1268         /*
1269          * The list should never be empty when we are called here. But could
1270          * potentially happen if the chain is messed up, check to be on the
1271          * safe side.
1272          */
1273         while (!list_empty(&req->link_list)) {
1274                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
1275                                                 struct io_kiocb, link_list);
1276
1277                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
1278                              (nxt->flags & REQ_F_TIMEOUT))) {
1279                         list_del_init(&nxt->link_list);
1280                         wake_ev |= io_link_cancel_timeout(nxt);
1281                         req->flags &= ~REQ_F_LINK_TIMEOUT;
1282                         continue;
1283                 }
1284
1285                 list_del_init(&req->link_list);
1286                 if (!list_empty(&nxt->link_list))
1287                         nxt->flags |= REQ_F_LINK;
1288                 *nxtptr = nxt;
1289                 break;
1290         }
1291
1292         req->flags |= REQ_F_LINK_NEXT;
1293         if (wake_ev)
1294                 io_cqring_ev_posted(ctx);
1295 }
1296
1297 /*
1298  * Called if REQ_F_LINK is set, and we fail the head request
1299  */
1300 static void io_fail_links(struct io_kiocb *req)
1301 {
1302         struct io_ring_ctx *ctx = req->ctx;
1303         unsigned long flags;
1304
1305         spin_lock_irqsave(&ctx->completion_lock, flags);
1306
1307         while (!list_empty(&req->link_list)) {
1308                 struct io_kiocb *link = list_first_entry(&req->link_list,
1309                                                 struct io_kiocb, link_list);
1310
1311                 list_del_init(&link->link_list);
1312                 trace_io_uring_fail_link(req, link);
1313
1314                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
1315                     link->opcode == IORING_OP_LINK_TIMEOUT) {
1316                         io_link_cancel_timeout(link);
1317                 } else {
1318                         io_cqring_fill_event(link, -ECANCELED);
1319                         __io_double_put_req(link);
1320                 }
1321                 req->flags &= ~REQ_F_LINK_TIMEOUT;
1322         }
1323
1324         io_commit_cqring(ctx);
1325         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1326         io_cqring_ev_posted(ctx);
1327 }
1328
1329 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1330 {
1331         if (likely(!(req->flags & REQ_F_LINK)))
1332                 return;
1333
1334         /*
1335          * If LINK is set, we have dependent requests in this chain. If we
1336          * didn't fail this request, queue the first one up, moving any other
1337          * dependencies to the next request. In case of failure, fail the rest
1338          * of the chain.
1339          */
1340         if (req->flags & REQ_F_FAIL_LINK) {
1341                 io_fail_links(req);
1342         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1343                         REQ_F_LINK_TIMEOUT) {
1344                 struct io_ring_ctx *ctx = req->ctx;
1345                 unsigned long flags;
1346
1347                 /*
1348                  * If this is a timeout link, we could be racing with the
1349                  * timeout timer. Grab the completion lock for this case to
1350                  * protect against that.
1351                  */
1352                 spin_lock_irqsave(&ctx->completion_lock, flags);
1353                 io_req_link_next(req, nxt);
1354                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1355         } else {
1356                 io_req_link_next(req, nxt);
1357         }
1358 }
1359
1360 static void io_free_req(struct io_kiocb *req)
1361 {
1362         struct io_kiocb *nxt = NULL;
1363
1364         io_req_find_next(req, &nxt);
1365         __io_free_req(req);
1366
1367         if (nxt)
1368                 io_queue_async_work(nxt);
1369 }
1370
1371 /*
1372  * Drop reference to request, return next in chain (if there is one) if this
1373  * was the last reference to this request.
1374  */
1375 __attribute__((nonnull))
1376 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1377 {
1378         io_req_find_next(req, nxtptr);
1379
1380         if (refcount_dec_and_test(&req->refs))
1381                 __io_free_req(req);
1382 }
1383
1384 static void io_put_req(struct io_kiocb *req)
1385 {
1386         if (refcount_dec_and_test(&req->refs))
1387                 io_free_req(req);
1388 }
1389
1390 /*
1391  * Must only be used if we don't need to care about links, usually from
1392  * within the completion handling itself.
1393  */
1394 static void __io_double_put_req(struct io_kiocb *req)
1395 {
1396         /* drop both submit and complete references */
1397         if (refcount_sub_and_test(2, &req->refs))
1398                 __io_free_req(req);
1399 }
1400
1401 static void io_double_put_req(struct io_kiocb *req)
1402 {
1403         /* drop both submit and complete references */
1404         if (refcount_sub_and_test(2, &req->refs))
1405                 io_free_req(req);
1406 }
1407
1408 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1409 {
1410         struct io_rings *rings = ctx->rings;
1411
1412         if (test_bit(0, &ctx->cq_check_overflow)) {
1413                 /*
1414                  * noflush == true is from the waitqueue handler, just ensure
1415                  * we wake up the task, and the next invocation will flush the
1416                  * entries. We cannot safely to it from here.
1417                  */
1418                 if (noflush && !list_empty(&ctx->cq_overflow_list))
1419                         return -1U;
1420
1421                 io_cqring_overflow_flush(ctx, false);
1422         }
1423
1424         /* See comment at the top of this file */
1425         smp_rmb();
1426         return ctx->cached_cq_tail - READ_ONCE(rings->cq.head);
1427 }
1428
1429 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1430 {
1431         struct io_rings *rings = ctx->rings;
1432
1433         /* make sure SQ entry isn't read before tail */
1434         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1435 }
1436
1437 static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req)
1438 {
1439         if ((req->flags & REQ_F_LINK) || io_is_fallback_req(req))
1440                 return false;
1441
1442         if (!(req->flags & REQ_F_FIXED_FILE) || req->io)
1443                 rb->need_iter++;
1444
1445         rb->reqs[rb->to_free++] = req;
1446         if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
1447                 io_free_req_many(req->ctx, rb);
1448         return true;
1449 }
1450
1451 /*
1452  * Find and free completed poll iocbs
1453  */
1454 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1455                                struct list_head *done)
1456 {
1457         struct req_batch rb;
1458         struct io_kiocb *req;
1459
1460         rb.to_free = rb.need_iter = 0;
1461         while (!list_empty(done)) {
1462                 req = list_first_entry(done, struct io_kiocb, list);
1463                 list_del(&req->list);
1464
1465                 io_cqring_fill_event(req, req->result);
1466                 (*nr_events)++;
1467
1468                 if (refcount_dec_and_test(&req->refs) &&
1469                     !io_req_multi_free(&rb, req))
1470                         io_free_req(req);
1471         }
1472
1473         io_commit_cqring(ctx);
1474         io_free_req_many(ctx, &rb);
1475 }
1476
1477 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1478                         long min)
1479 {
1480         struct io_kiocb *req, *tmp;
1481         LIST_HEAD(done);
1482         bool spin;
1483         int ret;
1484
1485         /*
1486          * Only spin for completions if we don't have multiple devices hanging
1487          * off our complete list, and we're under the requested amount.
1488          */
1489         spin = !ctx->poll_multi_file && *nr_events < min;
1490
1491         ret = 0;
1492         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1493                 struct kiocb *kiocb = &req->rw.kiocb;
1494
1495                 /*
1496                  * Move completed entries to our local list. If we find a
1497                  * request that requires polling, break out and complete
1498                  * the done list first, if we have entries there.
1499                  */
1500                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1501                         list_move_tail(&req->list, &done);
1502                         continue;
1503                 }
1504                 if (!list_empty(&done))
1505                         break;
1506
1507                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1508                 if (ret < 0)
1509                         break;
1510
1511                 if (ret && spin)
1512                         spin = false;
1513                 ret = 0;
1514         }
1515
1516         if (!list_empty(&done))
1517                 io_iopoll_complete(ctx, nr_events, &done);
1518
1519         return ret;
1520 }
1521
1522 /*
1523  * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1524  * non-spinning poll check - we'll still enter the driver poll loop, but only
1525  * as a non-spinning completion check.
1526  */
1527 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1528                                 long min)
1529 {
1530         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1531                 int ret;
1532
1533                 ret = io_do_iopoll(ctx, nr_events, min);
1534                 if (ret < 0)
1535                         return ret;
1536                 if (!min || *nr_events >= min)
1537                         return 0;
1538         }
1539
1540         return 1;
1541 }
1542
1543 /*
1544  * We can't just wait for polled events to come to us, we have to actively
1545  * find and complete them.
1546  */
1547 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1548 {
1549         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1550                 return;
1551
1552         mutex_lock(&ctx->uring_lock);
1553         while (!list_empty(&ctx->poll_list)) {
1554                 unsigned int nr_events = 0;
1555
1556                 io_iopoll_getevents(ctx, &nr_events, 1);
1557
1558                 /*
1559                  * Ensure we allow local-to-the-cpu processing to take place,
1560                  * in this case we need to ensure that we reap all events.
1561                  */
1562                 cond_resched();
1563         }
1564         mutex_unlock(&ctx->uring_lock);
1565 }
1566
1567 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1568                             long min)
1569 {
1570         int iters = 0, ret = 0;
1571
1572         do {
1573                 int tmin = 0;
1574
1575                 /*
1576                  * Don't enter poll loop if we already have events pending.
1577                  * If we do, we can potentially be spinning for commands that
1578                  * already triggered a CQE (eg in error).
1579                  */
1580                 if (io_cqring_events(ctx, false))
1581                         break;
1582
1583                 /*
1584                  * If a submit got punted to a workqueue, we can have the
1585                  * application entering polling for a command before it gets
1586                  * issued. That app will hold the uring_lock for the duration
1587                  * of the poll right here, so we need to take a breather every
1588                  * now and then to ensure that the issue has a chance to add
1589                  * the poll to the issued list. Otherwise we can spin here
1590                  * forever, while the workqueue is stuck trying to acquire the
1591                  * very same mutex.
1592                  */
1593                 if (!(++iters & 7)) {
1594                         mutex_unlock(&ctx->uring_lock);
1595                         mutex_lock(&ctx->uring_lock);
1596                 }
1597
1598                 if (*nr_events < min)
1599                         tmin = min - *nr_events;
1600
1601                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1602                 if (ret <= 0)
1603                         break;
1604                 ret = 0;
1605         } while (min && !*nr_events && !need_resched());
1606
1607         return ret;
1608 }
1609
1610 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1611                            long min)
1612 {
1613         int ret;
1614
1615         /*
1616          * We disallow the app entering submit/complete with polling, but we
1617          * still need to lock the ring to prevent racing with polled issue
1618          * that got punted to a workqueue.
1619          */
1620         mutex_lock(&ctx->uring_lock);
1621         ret = __io_iopoll_check(ctx, nr_events, min);
1622         mutex_unlock(&ctx->uring_lock);
1623         return ret;
1624 }
1625
1626 static void kiocb_end_write(struct io_kiocb *req)
1627 {
1628         /*
1629          * Tell lockdep we inherited freeze protection from submission
1630          * thread.
1631          */
1632         if (req->flags & REQ_F_ISREG) {
1633                 struct inode *inode = file_inode(req->file);
1634
1635                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1636         }
1637         file_end_write(req->file);
1638 }
1639
1640 static inline void req_set_fail_links(struct io_kiocb *req)
1641 {
1642         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1643                 req->flags |= REQ_F_FAIL_LINK;
1644 }
1645
1646 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1647 {
1648         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1649
1650         if (kiocb->ki_flags & IOCB_WRITE)
1651                 kiocb_end_write(req);
1652
1653         if (res != req->result)
1654                 req_set_fail_links(req);
1655         io_cqring_add_event(req, res);
1656 }
1657
1658 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1659 {
1660         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1661
1662         io_complete_rw_common(kiocb, res);
1663         io_put_req(req);
1664 }
1665
1666 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1667 {
1668         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1669         struct io_kiocb *nxt = NULL;
1670
1671         io_complete_rw_common(kiocb, res);
1672         io_put_req_find_next(req, &nxt);
1673
1674         return nxt;
1675 }
1676
1677 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1678 {
1679         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1680
1681         if (kiocb->ki_flags & IOCB_WRITE)
1682                 kiocb_end_write(req);
1683
1684         if (res != req->result)
1685                 req_set_fail_links(req);
1686         req->result = res;
1687         if (res != -EAGAIN)
1688                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1689 }
1690
1691 /*
1692  * After the iocb has been issued, it's safe to be found on the poll list.
1693  * Adding the kiocb to the list AFTER submission ensures that we don't
1694  * find it from a io_iopoll_getevents() thread before the issuer is done
1695  * accessing the kiocb cookie.
1696  */
1697 static void io_iopoll_req_issued(struct io_kiocb *req)
1698 {
1699         struct io_ring_ctx *ctx = req->ctx;
1700
1701         /*
1702          * Track whether we have multiple files in our lists. This will impact
1703          * how we do polling eventually, not spinning if we're on potentially
1704          * different devices.
1705          */
1706         if (list_empty(&ctx->poll_list)) {
1707                 ctx->poll_multi_file = false;
1708         } else if (!ctx->poll_multi_file) {
1709                 struct io_kiocb *list_req;
1710
1711                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1712                                                 list);
1713                 if (list_req->file != req->file)
1714                         ctx->poll_multi_file = true;
1715         }
1716
1717         /*
1718          * For fast devices, IO may have already completed. If it has, add
1719          * it to the front so we find it first.
1720          */
1721         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1722                 list_add(&req->list, &ctx->poll_list);
1723         else
1724                 list_add_tail(&req->list, &ctx->poll_list);
1725 }
1726
1727 static void io_file_put(struct io_submit_state *state)
1728 {
1729         if (state->file) {
1730                 int diff = state->has_refs - state->used_refs;
1731
1732                 if (diff)
1733                         fput_many(state->file, diff);
1734                 state->file = NULL;
1735         }
1736 }
1737
1738 /*
1739  * Get as many references to a file as we have IOs left in this submission,
1740  * assuming most submissions are for one file, or at least that each file
1741  * has more than one submission.
1742  */
1743 static struct file *io_file_get(struct io_submit_state *state, int fd)
1744 {
1745         if (!state)
1746                 return fget(fd);
1747
1748         if (state->file) {
1749                 if (state->fd == fd) {
1750                         state->used_refs++;
1751                         state->ios_left--;
1752                         return state->file;
1753                 }
1754                 io_file_put(state);
1755         }
1756         state->file = fget_many(fd, state->ios_left);
1757         if (!state->file)
1758                 return NULL;
1759
1760         state->fd = fd;
1761         state->has_refs = state->ios_left;
1762         state->used_refs = 1;
1763         state->ios_left--;
1764         return state->file;
1765 }
1766
1767 /*
1768  * If we tracked the file through the SCM inflight mechanism, we could support
1769  * any file. For now, just ensure that anything potentially problematic is done
1770  * inline.
1771  */
1772 static bool io_file_supports_async(struct file *file)
1773 {
1774         umode_t mode = file_inode(file)->i_mode;
1775
1776         if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1777                 return true;
1778         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1779                 return true;
1780
1781         return false;
1782 }
1783
1784 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1785                       bool force_nonblock)
1786 {
1787         struct io_ring_ctx *ctx = req->ctx;
1788         struct kiocb *kiocb = &req->rw.kiocb;
1789         unsigned ioprio;
1790         int ret;
1791
1792         if (!req->file)
1793                 return -EBADF;
1794
1795         if (S_ISREG(file_inode(req->file)->i_mode))
1796                 req->flags |= REQ_F_ISREG;
1797
1798         kiocb->ki_pos = READ_ONCE(sqe->off);
1799         if (kiocb->ki_pos == -1 && !(req->file->f_mode & FMODE_STREAM)) {
1800                 req->flags |= REQ_F_CUR_POS;
1801                 kiocb->ki_pos = req->file->f_pos;
1802         }
1803         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1804         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1805
1806         ioprio = READ_ONCE(sqe->ioprio);
1807         if (ioprio) {
1808                 ret = ioprio_check_cap(ioprio);
1809                 if (ret)
1810                         return ret;
1811
1812                 kiocb->ki_ioprio = ioprio;
1813         } else
1814                 kiocb->ki_ioprio = get_current_ioprio();
1815
1816         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1817         if (unlikely(ret))
1818                 return ret;
1819
1820         /* don't allow async punt if RWF_NOWAIT was requested */
1821         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1822             (req->file->f_flags & O_NONBLOCK))
1823                 req->flags |= REQ_F_NOWAIT;
1824
1825         if (force_nonblock)
1826                 kiocb->ki_flags |= IOCB_NOWAIT;
1827
1828         if (ctx->flags & IORING_SETUP_IOPOLL) {
1829                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1830                     !kiocb->ki_filp->f_op->iopoll)
1831                         return -EOPNOTSUPP;
1832
1833                 kiocb->ki_flags |= IOCB_HIPRI;
1834                 kiocb->ki_complete = io_complete_rw_iopoll;
1835                 req->result = 0;
1836         } else {
1837                 if (kiocb->ki_flags & IOCB_HIPRI)
1838                         return -EINVAL;
1839                 kiocb->ki_complete = io_complete_rw;
1840         }
1841
1842         req->rw.addr = READ_ONCE(sqe->addr);
1843         req->rw.len = READ_ONCE(sqe->len);
1844         /* we own ->private, reuse it for the buffer index */
1845         req->rw.kiocb.private = (void *) (unsigned long)
1846                                         READ_ONCE(sqe->buf_index);
1847         return 0;
1848 }
1849
1850 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1851 {
1852         switch (ret) {
1853         case -EIOCBQUEUED:
1854                 break;
1855         case -ERESTARTSYS:
1856         case -ERESTARTNOINTR:
1857         case -ERESTARTNOHAND:
1858         case -ERESTART_RESTARTBLOCK:
1859                 /*
1860                  * We can't just restart the syscall, since previously
1861                  * submitted sqes may already be in progress. Just fail this
1862                  * IO with EINTR.
1863                  */
1864                 ret = -EINTR;
1865                 /* fall through */
1866         default:
1867                 kiocb->ki_complete(kiocb, ret, 0);
1868         }
1869 }
1870
1871 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1872                        bool in_async)
1873 {
1874         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1875
1876         if (req->flags & REQ_F_CUR_POS)
1877                 req->file->f_pos = kiocb->ki_pos;
1878         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1879                 *nxt = __io_complete_rw(kiocb, ret);
1880         else
1881                 io_rw_done(kiocb, ret);
1882 }
1883
1884 static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
1885                                struct iov_iter *iter)
1886 {
1887         struct io_ring_ctx *ctx = req->ctx;
1888         size_t len = req->rw.len;
1889         struct io_mapped_ubuf *imu;
1890         unsigned index, buf_index;
1891         size_t offset;
1892         u64 buf_addr;
1893
1894         /* attempt to use fixed buffers without having provided iovecs */
1895         if (unlikely(!ctx->user_bufs))
1896                 return -EFAULT;
1897
1898         buf_index = (unsigned long) req->rw.kiocb.private;
1899         if (unlikely(buf_index >= ctx->nr_user_bufs))
1900                 return -EFAULT;
1901
1902         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1903         imu = &ctx->user_bufs[index];
1904         buf_addr = req->rw.addr;
1905
1906         /* overflow */
1907         if (buf_addr + len < buf_addr)
1908                 return -EFAULT;
1909         /* not inside the mapped region */
1910         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1911                 return -EFAULT;
1912
1913         /*
1914          * May not be a start of buffer, set size appropriately
1915          * and advance us to the beginning.
1916          */
1917         offset = buf_addr - imu->ubuf;
1918         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1919
1920         if (offset) {
1921                 /*
1922                  * Don't use iov_iter_advance() here, as it's really slow for
1923                  * using the latter parts of a big fixed buffer - it iterates
1924                  * over each segment manually. We can cheat a bit here, because
1925                  * we know that:
1926                  *
1927                  * 1) it's a BVEC iter, we set it up
1928                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1929                  *    first and last bvec
1930                  *
1931                  * So just find our index, and adjust the iterator afterwards.
1932                  * If the offset is within the first bvec (or the whole first
1933                  * bvec, just use iov_iter_advance(). This makes it easier
1934                  * since we can just skip the first segment, which may not
1935                  * be PAGE_SIZE aligned.
1936                  */
1937                 const struct bio_vec *bvec = imu->bvec;
1938
1939                 if (offset <= bvec->bv_len) {
1940                         iov_iter_advance(iter, offset);
1941                 } else {
1942                         unsigned long seg_skip;
1943
1944                         /* skip first vec */
1945                         offset -= bvec->bv_len;
1946                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1947
1948                         iter->bvec = bvec + seg_skip;
1949                         iter->nr_segs -= seg_skip;
1950                         iter->count -= bvec->bv_len + offset;
1951                         iter->iov_offset = offset & ~PAGE_MASK;
1952                 }
1953         }
1954
1955         return len;
1956 }
1957
1958 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1959                                struct iovec **iovec, struct iov_iter *iter)
1960 {
1961         void __user *buf = u64_to_user_ptr(req->rw.addr);
1962         size_t sqe_len = req->rw.len;
1963         u8 opcode;
1964
1965         opcode = req->opcode;
1966         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1967                 *iovec = NULL;
1968                 return io_import_fixed(req, rw, iter);
1969         }
1970
1971         /* buffer index only valid with fixed read/write */
1972         if (req->rw.kiocb.private)
1973                 return -EINVAL;
1974
1975         if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
1976                 ssize_t ret;
1977                 ret = import_single_range(rw, buf, sqe_len, *iovec, iter);
1978                 *iovec = NULL;
1979                 return ret;
1980         }
1981
1982         if (req->io) {
1983                 struct io_async_rw *iorw = &req->io->rw;
1984
1985                 *iovec = iorw->iov;
1986                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1987                 if (iorw->iov == iorw->fast_iov)
1988                         *iovec = NULL;
1989                 return iorw->size;
1990         }
1991
1992         if (!req->has_user)
1993                 return -EFAULT;
1994
1995 #ifdef CONFIG_COMPAT
1996         if (req->ctx->compat)
1997                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1998                                                 iovec, iter);
1999 #endif
2000
2001         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
2002 }
2003
2004 /*
2005  * For files that don't have ->read_iter() and ->write_iter(), handle them
2006  * by looping over ->read() or ->write() manually.
2007  */
2008 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
2009                            struct iov_iter *iter)
2010 {
2011         ssize_t ret = 0;
2012
2013         /*
2014          * Don't support polled IO through this interface, and we can't
2015          * support non-blocking either. For the latter, this just causes
2016          * the kiocb to be handled from an async context.
2017          */
2018         if (kiocb->ki_flags & IOCB_HIPRI)
2019                 return -EOPNOTSUPP;
2020         if (kiocb->ki_flags & IOCB_NOWAIT)
2021                 return -EAGAIN;
2022
2023         while (iov_iter_count(iter)) {
2024                 struct iovec iovec;
2025                 ssize_t nr;
2026
2027                 if (!iov_iter_is_bvec(iter)) {
2028                         iovec = iov_iter_iovec(iter);
2029                 } else {
2030                         /* fixed buffers import bvec */
2031                         iovec.iov_base = kmap(iter->bvec->bv_page)
2032                                                 + iter->iov_offset;
2033                         iovec.iov_len = min(iter->count,
2034                                         iter->bvec->bv_len - iter->iov_offset);
2035                 }
2036
2037                 if (rw == READ) {
2038                         nr = file->f_op->read(file, iovec.iov_base,
2039                                               iovec.iov_len, &kiocb->ki_pos);
2040                 } else {
2041                         nr = file->f_op->write(file, iovec.iov_base,
2042                                                iovec.iov_len, &kiocb->ki_pos);
2043                 }
2044
2045                 if (iov_iter_is_bvec(iter))
2046                         kunmap(iter->bvec->bv_page);
2047
2048                 if (nr < 0) {
2049                         if (!ret)
2050                                 ret = nr;
2051                         break;
2052                 }
2053                 ret += nr;
2054                 if (nr != iovec.iov_len)
2055                         break;
2056                 iov_iter_advance(iter, nr);
2057         }
2058
2059         return ret;
2060 }
2061
2062 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
2063                           struct iovec *iovec, struct iovec *fast_iov,
2064                           struct iov_iter *iter)
2065 {
2066         req->io->rw.nr_segs = iter->nr_segs;
2067         req->io->rw.size = io_size;
2068         req->io->rw.iov = iovec;
2069         if (!req->io->rw.iov) {
2070                 req->io->rw.iov = req->io->rw.fast_iov;
2071                 memcpy(req->io->rw.iov, fast_iov,
2072                         sizeof(struct iovec) * iter->nr_segs);
2073         }
2074 }
2075
2076 static int io_alloc_async_ctx(struct io_kiocb *req)
2077 {
2078         if (!io_op_defs[req->opcode].async_ctx)
2079                 return 0;
2080         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
2081         return req->io == NULL;
2082 }
2083
2084 static void io_rw_async(struct io_wq_work **workptr)
2085 {
2086         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2087         struct iovec *iov = NULL;
2088
2089         if (req->io->rw.iov != req->io->rw.fast_iov)
2090                 iov = req->io->rw.iov;
2091         io_wq_submit_work(workptr);
2092         kfree(iov);
2093 }
2094
2095 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
2096                              struct iovec *iovec, struct iovec *fast_iov,
2097                              struct iov_iter *iter)
2098 {
2099         if (req->opcode == IORING_OP_READ_FIXED ||
2100             req->opcode == IORING_OP_WRITE_FIXED)
2101                 return 0;
2102         if (!req->io && io_alloc_async_ctx(req))
2103                 return -ENOMEM;
2104
2105         io_req_map_rw(req, io_size, iovec, fast_iov, iter);
2106         req->work.func = io_rw_async;
2107         return 0;
2108 }
2109
2110 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2111                         bool force_nonblock)
2112 {
2113         struct io_async_ctx *io;
2114         struct iov_iter iter;
2115         ssize_t ret;
2116
2117         ret = io_prep_rw(req, sqe, force_nonblock);
2118         if (ret)
2119                 return ret;
2120
2121         if (unlikely(!(req->file->f_mode & FMODE_READ)))
2122                 return -EBADF;
2123
2124         if (!req->io)
2125                 return 0;
2126
2127         io = req->io;
2128         io->rw.iov = io->rw.fast_iov;
2129         req->io = NULL;
2130         ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
2131         req->io = io;
2132         if (ret < 0)
2133                 return ret;
2134
2135         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2136         return 0;
2137 }
2138
2139 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
2140                    bool force_nonblock)
2141 {
2142         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2143         struct kiocb *kiocb = &req->rw.kiocb;
2144         struct iov_iter iter;
2145         size_t iov_count;
2146         ssize_t io_size, ret;
2147
2148         ret = io_import_iovec(READ, req, &iovec, &iter);
2149         if (ret < 0)
2150                 return ret;
2151
2152         /* Ensure we clear previously set non-block flag */
2153         if (!force_nonblock)
2154                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2155
2156         req->result = 0;
2157         io_size = ret;
2158         if (req->flags & REQ_F_LINK)
2159                 req->result = io_size;
2160
2161         /*
2162          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2163          * we know to async punt it even if it was opened O_NONBLOCK
2164          */
2165         if (force_nonblock && !io_file_supports_async(req->file)) {
2166                 req->flags |= REQ_F_MUST_PUNT;
2167                 goto copy_iov;
2168         }
2169
2170         iov_count = iov_iter_count(&iter);
2171         ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
2172         if (!ret) {
2173                 ssize_t ret2;
2174
2175                 if (req->file->f_op->read_iter)
2176                         ret2 = call_read_iter(req->file, kiocb, &iter);
2177                 else
2178                         ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
2179
2180                 /* Catch -EAGAIN return for forced non-blocking submission */
2181                 if (!force_nonblock || ret2 != -EAGAIN) {
2182                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2183                 } else {
2184 copy_iov:
2185                         ret = io_setup_async_rw(req, io_size, iovec,
2186                                                 inline_vecs, &iter);
2187                         if (ret)
2188                                 goto out_free;
2189                         return -EAGAIN;
2190                 }
2191         }
2192 out_free:
2193         if (!io_wq_current_is_worker())
2194                 kfree(iovec);
2195         return ret;
2196 }
2197
2198 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2199                          bool force_nonblock)
2200 {
2201         struct io_async_ctx *io;
2202         struct iov_iter iter;
2203         ssize_t ret;
2204
2205         ret = io_prep_rw(req, sqe, force_nonblock);
2206         if (ret)
2207                 return ret;
2208
2209         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
2210                 return -EBADF;
2211
2212         if (!req->io)
2213                 return 0;
2214
2215         io = req->io;
2216         io->rw.iov = io->rw.fast_iov;
2217         req->io = NULL;
2218         ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
2219         req->io = io;
2220         if (ret < 0)
2221                 return ret;
2222
2223         io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
2224         return 0;
2225 }
2226
2227 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
2228                     bool force_nonblock)
2229 {
2230         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2231         struct kiocb *kiocb = &req->rw.kiocb;
2232         struct iov_iter iter;
2233         size_t iov_count;
2234         ssize_t ret, io_size;
2235
2236         ret = io_import_iovec(WRITE, req, &iovec, &iter);
2237         if (ret < 0)
2238                 return ret;
2239
2240         /* Ensure we clear previously set non-block flag */
2241         if (!force_nonblock)
2242                 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
2243
2244         req->result = 0;
2245         io_size = ret;
2246         if (req->flags & REQ_F_LINK)
2247                 req->result = io_size;
2248
2249         /*
2250          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
2251          * we know to async punt it even if it was opened O_NONBLOCK
2252          */
2253         if (force_nonblock && !io_file_supports_async(req->file)) {
2254                 req->flags |= REQ_F_MUST_PUNT;
2255                 goto copy_iov;
2256         }
2257
2258         /* file path doesn't support NOWAIT for non-direct_IO */
2259         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
2260             (req->flags & REQ_F_ISREG))
2261                 goto copy_iov;
2262
2263         iov_count = iov_iter_count(&iter);
2264         ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
2265         if (!ret) {
2266                 ssize_t ret2;
2267
2268                 /*
2269                  * Open-code file_start_write here to grab freeze protection,
2270                  * which will be released by another thread in
2271                  * io_complete_rw().  Fool lockdep by telling it the lock got
2272                  * released so that it doesn't complain about the held lock when
2273                  * we return to userspace.
2274                  */
2275                 if (req->flags & REQ_F_ISREG) {
2276                         __sb_start_write(file_inode(req->file)->i_sb,
2277                                                 SB_FREEZE_WRITE, true);
2278                         __sb_writers_release(file_inode(req->file)->i_sb,
2279                                                 SB_FREEZE_WRITE);
2280                 }
2281                 kiocb->ki_flags |= IOCB_WRITE;
2282
2283                 if (req->file->f_op->write_iter)
2284                         ret2 = call_write_iter(req->file, kiocb, &iter);
2285                 else
2286                         ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
2287                 if (!force_nonblock || ret2 != -EAGAIN) {
2288                         kiocb_done(kiocb, ret2, nxt, req->in_async);
2289                 } else {
2290 copy_iov:
2291                         ret = io_setup_async_rw(req, io_size, iovec,
2292                                                 inline_vecs, &iter);
2293                         if (ret)
2294                                 goto out_free;
2295                         return -EAGAIN;
2296                 }
2297         }
2298 out_free:
2299         if (!io_wq_current_is_worker())
2300                 kfree(iovec);
2301         return ret;
2302 }
2303
2304 /*
2305  * IORING_OP_NOP just posts a completion event, nothing else.
2306  */
2307 static int io_nop(struct io_kiocb *req)
2308 {
2309         struct io_ring_ctx *ctx = req->ctx;
2310
2311         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2312                 return -EINVAL;
2313
2314         io_cqring_add_event(req, 0);
2315         io_put_req(req);
2316         return 0;
2317 }
2318
2319 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2320 {
2321         struct io_ring_ctx *ctx = req->ctx;
2322
2323         if (!req->file)
2324                 return -EBADF;
2325
2326         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2327                 return -EINVAL;
2328         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2329                 return -EINVAL;
2330
2331         req->sync.flags = READ_ONCE(sqe->fsync_flags);
2332         if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
2333                 return -EINVAL;
2334
2335         req->sync.off = READ_ONCE(sqe->off);
2336         req->sync.len = READ_ONCE(sqe->len);
2337         return 0;
2338 }
2339
2340 static bool io_req_cancelled(struct io_kiocb *req)
2341 {
2342         if (req->work.flags & IO_WQ_WORK_CANCEL) {
2343                 req_set_fail_links(req);
2344                 io_cqring_add_event(req, -ECANCELED);
2345                 io_put_req(req);
2346                 return true;
2347         }
2348
2349         return false;
2350 }
2351
2352 static void io_link_work_cb(struct io_wq_work **workptr)
2353 {
2354         struct io_wq_work *work = *workptr;
2355         struct io_kiocb *link = work->data;
2356
2357         io_queue_linked_timeout(link);
2358         work->func = io_wq_submit_work;
2359 }
2360
2361 static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
2362 {
2363         struct io_kiocb *link;
2364
2365         io_prep_async_work(nxt, &link);
2366         *workptr = &nxt->work;
2367         if (link) {
2368                 nxt->work.flags |= IO_WQ_WORK_CB;
2369                 nxt->work.func = io_link_work_cb;
2370                 nxt->work.data = link;
2371         }
2372 }
2373
2374 static void io_fsync_finish(struct io_wq_work **workptr)
2375 {
2376         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2377         loff_t end = req->sync.off + req->sync.len;
2378         struct io_kiocb *nxt = NULL;
2379         int ret;
2380
2381         if (io_req_cancelled(req))
2382                 return;
2383
2384         ret = vfs_fsync_range(req->file, req->sync.off,
2385                                 end > 0 ? end : LLONG_MAX,
2386                                 req->sync.flags & IORING_FSYNC_DATASYNC);
2387         if (ret < 0)
2388                 req_set_fail_links(req);
2389         io_cqring_add_event(req, ret);
2390         io_put_req_find_next(req, &nxt);
2391         if (nxt)
2392                 io_wq_assign_next(workptr, nxt);
2393 }
2394
2395 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2396                     bool force_nonblock)
2397 {
2398         struct io_wq_work *work, *old_work;
2399
2400         /* fsync always requires a blocking context */
2401         if (force_nonblock) {
2402                 io_put_req(req);
2403                 req->work.func = io_fsync_finish;
2404                 return -EAGAIN;
2405         }
2406
2407         work = old_work = &req->work;
2408         io_fsync_finish(&work);
2409         if (work && work != old_work)
2410                 *nxt = container_of(work, struct io_kiocb, work);
2411         return 0;
2412 }
2413
2414 static void io_fallocate_finish(struct io_wq_work **workptr)
2415 {
2416         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2417         struct io_kiocb *nxt = NULL;
2418         int ret;
2419
2420         ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off,
2421                                 req->sync.len);
2422         if (ret < 0)
2423                 req_set_fail_links(req);
2424         io_cqring_add_event(req, ret);
2425         io_put_req_find_next(req, &nxt);
2426         if (nxt)
2427                 io_wq_assign_next(workptr, nxt);
2428 }
2429
2430 static int io_fallocate_prep(struct io_kiocb *req,
2431                              const struct io_uring_sqe *sqe)
2432 {
2433         if (sqe->ioprio || sqe->buf_index || sqe->rw_flags)
2434                 return -EINVAL;
2435
2436         req->sync.off = READ_ONCE(sqe->off);
2437         req->sync.len = READ_ONCE(sqe->addr);
2438         req->sync.mode = READ_ONCE(sqe->len);
2439         return 0;
2440 }
2441
2442 static int io_fallocate(struct io_kiocb *req, struct io_kiocb **nxt,
2443                         bool force_nonblock)
2444 {
2445         struct io_wq_work *work, *old_work;
2446
2447         /* fallocate always requiring blocking context */
2448         if (force_nonblock) {
2449                 io_put_req(req);
2450                 req->work.func = io_fallocate_finish;
2451                 return -EAGAIN;
2452         }
2453
2454         work = old_work = &req->work;
2455         io_fallocate_finish(&work);
2456         if (work && work != old_work)
2457                 *nxt = container_of(work, struct io_kiocb, work);
2458
2459         return 0;
2460 }
2461
2462 static int io_openat_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2463 {
2464         int ret;
2465
2466         if (sqe->ioprio || sqe->buf_index)
2467                 return -EINVAL;
2468
2469         req->open.dfd = READ_ONCE(sqe->fd);
2470         req->open.mode = READ_ONCE(sqe->len);
2471         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2472         req->open.flags = READ_ONCE(sqe->open_flags);
2473
2474         req->open.filename = getname(req->open.fname);
2475         if (IS_ERR(req->open.filename)) {
2476                 ret = PTR_ERR(req->open.filename);
2477                 req->open.filename = NULL;
2478                 return ret;
2479         }
2480
2481         return 0;
2482 }
2483
2484 static int io_openat(struct io_kiocb *req, struct io_kiocb **nxt,
2485                      bool force_nonblock)
2486 {
2487         struct open_flags op;
2488         struct open_how how;
2489         struct file *file;
2490         int ret;
2491
2492         if (force_nonblock) {
2493                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2494                 return -EAGAIN;
2495         }
2496
2497         how = build_open_how(req->open.flags, req->open.mode);
2498         ret = build_open_flags(&how, &op);
2499         if (ret)
2500                 goto err;
2501
2502         ret = get_unused_fd_flags(how.flags);
2503         if (ret < 0)
2504                 goto err;
2505
2506         file = do_filp_open(req->open.dfd, req->open.filename, &op);
2507         if (IS_ERR(file)) {
2508                 put_unused_fd(ret);
2509                 ret = PTR_ERR(file);
2510         } else {
2511                 fsnotify_open(file);
2512                 fd_install(ret, file);
2513         }
2514 err:
2515         putname(req->open.filename);
2516         if (ret < 0)
2517                 req_set_fail_links(req);
2518         io_cqring_add_event(req, ret);
2519         io_put_req_find_next(req, nxt);
2520         return 0;
2521 }
2522
2523 static int io_madvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2524 {
2525 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2526         if (sqe->ioprio || sqe->buf_index || sqe->off)
2527                 return -EINVAL;
2528
2529         req->madvise.addr = READ_ONCE(sqe->addr);
2530         req->madvise.len = READ_ONCE(sqe->len);
2531         req->madvise.advice = READ_ONCE(sqe->fadvise_advice);
2532         return 0;
2533 #else
2534         return -EOPNOTSUPP;
2535 #endif
2536 }
2537
2538 static int io_madvise(struct io_kiocb *req, struct io_kiocb **nxt,
2539                       bool force_nonblock)
2540 {
2541 #if defined(CONFIG_ADVISE_SYSCALLS) && defined(CONFIG_MMU)
2542         struct io_madvise *ma = &req->madvise;
2543         int ret;
2544
2545         if (force_nonblock)
2546                 return -EAGAIN;
2547
2548         ret = do_madvise(ma->addr, ma->len, ma->advice);
2549         if (ret < 0)
2550                 req_set_fail_links(req);
2551         io_cqring_add_event(req, ret);
2552         io_put_req_find_next(req, nxt);
2553         return 0;
2554 #else
2555         return -EOPNOTSUPP;
2556 #endif
2557 }
2558
2559 static int io_fadvise_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2560 {
2561         if (sqe->ioprio || sqe->buf_index || sqe->addr)
2562                 return -EINVAL;
2563
2564         req->fadvise.offset = READ_ONCE(sqe->off);
2565         req->fadvise.len = READ_ONCE(sqe->len);
2566         req->fadvise.advice = READ_ONCE(sqe->fadvise_advice);
2567         return 0;
2568 }
2569
2570 static int io_fadvise(struct io_kiocb *req, struct io_kiocb **nxt,
2571                       bool force_nonblock)
2572 {
2573         struct io_fadvise *fa = &req->fadvise;
2574         int ret;
2575
2576         /* DONTNEED may block, others _should_ not */
2577         if (fa->advice == POSIX_FADV_DONTNEED && force_nonblock)
2578                 return -EAGAIN;
2579
2580         ret = vfs_fadvise(req->file, fa->offset, fa->len, fa->advice);
2581         if (ret < 0)
2582                 req_set_fail_links(req);
2583         io_cqring_add_event(req, ret);
2584         io_put_req_find_next(req, nxt);
2585         return 0;
2586 }
2587
2588 static int io_statx_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2589 {
2590         unsigned lookup_flags;
2591         int ret;
2592
2593         if (sqe->ioprio || sqe->buf_index)
2594                 return -EINVAL;
2595
2596         req->open.dfd = READ_ONCE(sqe->fd);
2597         req->open.mask = READ_ONCE(sqe->len);
2598         req->open.fname = u64_to_user_ptr(READ_ONCE(sqe->addr));
2599         req->open.buffer = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2600         req->open.flags = READ_ONCE(sqe->statx_flags);
2601
2602         if (vfs_stat_set_lookup_flags(&lookup_flags, req->open.flags))
2603                 return -EINVAL;
2604
2605         req->open.filename = getname_flags(req->open.fname, lookup_flags, NULL);
2606         if (IS_ERR(req->open.filename)) {
2607                 ret = PTR_ERR(req->open.filename);
2608                 req->open.filename = NULL;
2609                 return ret;
2610         }
2611
2612         return 0;
2613 }
2614
2615 static int io_statx(struct io_kiocb *req, struct io_kiocb **nxt,
2616                     bool force_nonblock)
2617 {
2618         struct io_open *ctx = &req->open;
2619         unsigned lookup_flags;
2620         struct path path;
2621         struct kstat stat;
2622         int ret;
2623
2624         if (force_nonblock)
2625                 return -EAGAIN;
2626
2627         if (vfs_stat_set_lookup_flags(&lookup_flags, ctx->flags))
2628                 return -EINVAL;
2629
2630 retry:
2631         /* filename_lookup() drops it, keep a reference */
2632         ctx->filename->refcnt++;
2633
2634         ret = filename_lookup(ctx->dfd, ctx->filename, lookup_flags, &path,
2635                                 NULL);
2636         if (ret)
2637                 goto err;
2638
2639         ret = vfs_getattr(&path, &stat, ctx->mask, ctx->flags);
2640         path_put(&path);
2641         if (retry_estale(ret, lookup_flags)) {
2642                 lookup_flags |= LOOKUP_REVAL;
2643                 goto retry;
2644         }
2645         if (!ret)
2646                 ret = cp_statx(&stat, ctx->buffer);
2647 err:
2648         putname(ctx->filename);
2649         if (ret < 0)
2650                 req_set_fail_links(req);
2651         io_cqring_add_event(req, ret);
2652         io_put_req_find_next(req, nxt);
2653         return 0;
2654 }
2655
2656 static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2657 {
2658         /*
2659          * If we queue this for async, it must not be cancellable. That would
2660          * leave the 'file' in an undeterminate state.
2661          */
2662         req->work.flags |= IO_WQ_WORK_NO_CANCEL;
2663
2664         if (sqe->ioprio || sqe->off || sqe->addr || sqe->len ||
2665             sqe->rw_flags || sqe->buf_index)
2666                 return -EINVAL;
2667         if (sqe->flags & IOSQE_FIXED_FILE)
2668                 return -EINVAL;
2669
2670         req->close.fd = READ_ONCE(sqe->fd);
2671         if (req->file->f_op == &io_uring_fops ||
2672             req->close.fd == req->ring_fd)
2673                 return -EBADF;
2674
2675         return 0;
2676 }
2677
2678 static void io_close_finish(struct io_wq_work **workptr)
2679 {
2680         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2681         struct io_kiocb *nxt = NULL;
2682
2683         /* Invoked with files, we need to do the close */
2684         if (req->work.files) {
2685                 int ret;
2686
2687                 ret = filp_close(req->close.put_file, req->work.files);
2688                 if (ret < 0) {
2689                         req_set_fail_links(req);
2690                 }
2691                 io_cqring_add_event(req, ret);
2692         }
2693
2694         fput(req->close.put_file);
2695
2696         /* we bypassed the re-issue, drop the submission reference */
2697         io_put_req(req);
2698         io_put_req_find_next(req, &nxt);
2699         if (nxt)
2700                 io_wq_assign_next(workptr, nxt);
2701 }
2702
2703 static int io_close(struct io_kiocb *req, struct io_kiocb **nxt,
2704                     bool force_nonblock)
2705 {
2706         int ret;
2707
2708         req->close.put_file = NULL;
2709         ret = __close_fd_get_file(req->close.fd, &req->close.put_file);
2710         if (ret < 0)
2711                 return ret;
2712
2713         /* if the file has a flush method, be safe and punt to async */
2714         if (req->close.put_file->f_op->flush && !io_wq_current_is_worker()) {
2715                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2716                 goto eagain;
2717         }
2718
2719         /*
2720          * No ->flush(), safely close from here and just punt the
2721          * fput() to async context.
2722          */
2723         ret = filp_close(req->close.put_file, current->files);
2724
2725         if (ret < 0)
2726                 req_set_fail_links(req);
2727         io_cqring_add_event(req, ret);
2728
2729         if (io_wq_current_is_worker()) {
2730                 struct io_wq_work *old_work, *work;
2731
2732                 old_work = work = &req->work;
2733                 io_close_finish(&work);
2734                 if (work && work != old_work)
2735                         *nxt = container_of(work, struct io_kiocb, work);
2736                 return 0;
2737         }
2738
2739 eagain:
2740         req->work.func = io_close_finish;
2741         return -EAGAIN;
2742 }
2743
2744 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2745 {
2746         struct io_ring_ctx *ctx = req->ctx;
2747
2748         if (!req->file)
2749                 return -EBADF;
2750
2751         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2752                 return -EINVAL;
2753         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2754                 return -EINVAL;
2755
2756         req->sync.off = READ_ONCE(sqe->off);
2757         req->sync.len = READ_ONCE(sqe->len);
2758         req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2759         return 0;
2760 }
2761
2762 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2763 {
2764         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2765         struct io_kiocb *nxt = NULL;
2766         int ret;
2767
2768         if (io_req_cancelled(req))
2769                 return;
2770
2771         ret = sync_file_range(req->file, req->sync.off, req->sync.len,
2772                                 req->sync.flags);
2773         if (ret < 0)
2774                 req_set_fail_links(req);
2775         io_cqring_add_event(req, ret);
2776         io_put_req_find_next(req, &nxt);
2777         if (nxt)
2778                 io_wq_assign_next(workptr, nxt);
2779 }
2780
2781 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2782                               bool force_nonblock)
2783 {
2784         struct io_wq_work *work, *old_work;
2785
2786         /* sync_file_range always requires a blocking context */
2787         if (force_nonblock) {
2788                 io_put_req(req);
2789                 req->work.func = io_sync_file_range_finish;
2790                 return -EAGAIN;
2791         }
2792
2793         work = old_work = &req->work;
2794         io_sync_file_range_finish(&work);
2795         if (work && work != old_work)
2796                 *nxt = container_of(work, struct io_kiocb, work);
2797         return 0;
2798 }
2799
2800 #if defined(CONFIG_NET)
2801 static void io_sendrecv_async(struct io_wq_work **workptr)
2802 {
2803         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2804         struct iovec *iov = NULL;
2805
2806         if (req->io->rw.iov != req->io->rw.fast_iov)
2807                 iov = req->io->msg.iov;
2808         io_wq_submit_work(workptr);
2809         kfree(iov);
2810 }
2811 #endif
2812
2813 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2814 {
2815 #if defined(CONFIG_NET)
2816         struct io_sr_msg *sr = &req->sr_msg;
2817         struct io_async_ctx *io = req->io;
2818
2819         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2820         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2821         sr->len = READ_ONCE(sqe->len);
2822
2823         if (!io || req->opcode == IORING_OP_SEND)
2824                 return 0;
2825
2826         io->msg.iov = io->msg.fast_iov;
2827         return sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2828                                         &io->msg.iov);
2829 #else
2830         return -EOPNOTSUPP;
2831 #endif
2832 }
2833
2834 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2835                       bool force_nonblock)
2836 {
2837 #if defined(CONFIG_NET)
2838         struct io_async_msghdr *kmsg = NULL;
2839         struct socket *sock;
2840         int ret;
2841
2842         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2843                 return -EINVAL;
2844
2845         sock = sock_from_file(req->file, &ret);
2846         if (sock) {
2847                 struct io_async_ctx io;
2848                 struct sockaddr_storage addr;
2849                 unsigned flags;
2850
2851                 if (req->io) {
2852                         kmsg = &req->io->msg;
2853                         kmsg->msg.msg_name = &addr;
2854                         /* if iov is set, it's allocated already */
2855                         if (!kmsg->iov)
2856                                 kmsg->iov = kmsg->fast_iov;
2857                         kmsg->msg.msg_iter.iov = kmsg->iov;
2858                 } else {
2859                         struct io_sr_msg *sr = &req->sr_msg;
2860
2861                         kmsg = &io.msg;
2862                         kmsg->msg.msg_name = &addr;
2863
2864                         io.msg.iov = io.msg.fast_iov;
2865                         ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
2866                                         sr->msg_flags, &io.msg.iov);
2867                         if (ret)
2868                                 return ret;
2869                 }
2870
2871                 flags = req->sr_msg.msg_flags;
2872                 if (flags & MSG_DONTWAIT)
2873                         req->flags |= REQ_F_NOWAIT;
2874                 else if (force_nonblock)
2875                         flags |= MSG_DONTWAIT;
2876
2877                 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2878                 if (force_nonblock && ret == -EAGAIN) {
2879                         if (req->io)
2880                                 return -EAGAIN;
2881                         if (io_alloc_async_ctx(req))
2882                                 return -ENOMEM;
2883                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2884                         req->work.func = io_sendrecv_async;
2885                         return -EAGAIN;
2886                 }
2887                 if (ret == -ERESTARTSYS)
2888                         ret = -EINTR;
2889         }
2890
2891         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
2892                 kfree(kmsg->iov);
2893         io_cqring_add_event(req, ret);
2894         if (ret < 0)
2895                 req_set_fail_links(req);
2896         io_put_req_find_next(req, nxt);
2897         return 0;
2898 #else
2899         return -EOPNOTSUPP;
2900 #endif
2901 }
2902
2903 static int io_send(struct io_kiocb *req, struct io_kiocb **nxt,
2904                    bool force_nonblock)
2905 {
2906 #if defined(CONFIG_NET)
2907         struct socket *sock;
2908         int ret;
2909
2910         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2911                 return -EINVAL;
2912
2913         sock = sock_from_file(req->file, &ret);
2914         if (sock) {
2915                 struct io_sr_msg *sr = &req->sr_msg;
2916                 struct msghdr msg;
2917                 struct iovec iov;
2918                 unsigned flags;
2919
2920                 ret = import_single_range(WRITE, sr->buf, sr->len, &iov,
2921                                                 &msg.msg_iter);
2922                 if (ret)
2923                         return ret;
2924
2925                 msg.msg_name = NULL;
2926                 msg.msg_control = NULL;
2927                 msg.msg_controllen = 0;
2928                 msg.msg_namelen = 0;
2929
2930                 flags = req->sr_msg.msg_flags;
2931                 if (flags & MSG_DONTWAIT)
2932                         req->flags |= REQ_F_NOWAIT;
2933                 else if (force_nonblock)
2934                         flags |= MSG_DONTWAIT;
2935
2936                 ret = __sys_sendmsg_sock(sock, &msg, flags);
2937                 if (force_nonblock && ret == -EAGAIN)
2938                         return -EAGAIN;
2939                 if (ret == -ERESTARTSYS)
2940                         ret = -EINTR;
2941         }
2942
2943         io_cqring_add_event(req, ret);
2944         if (ret < 0)
2945                 req_set_fail_links(req);
2946         io_put_req_find_next(req, nxt);
2947         return 0;
2948 #else
2949         return -EOPNOTSUPP;
2950 #endif
2951 }
2952
2953 static int io_recvmsg_prep(struct io_kiocb *req,
2954                            const struct io_uring_sqe *sqe)
2955 {
2956 #if defined(CONFIG_NET)
2957         struct io_sr_msg *sr = &req->sr_msg;
2958         struct io_async_ctx *io = req->io;
2959
2960         sr->msg_flags = READ_ONCE(sqe->msg_flags);
2961         sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2962
2963         if (!io || req->opcode == IORING_OP_RECV)
2964                 return 0;
2965
2966         io->msg.iov = io->msg.fast_iov;
2967         return recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2968                                         &io->msg.uaddr, &io->msg.iov);
2969 #else
2970         return -EOPNOTSUPP;
2971 #endif
2972 }
2973
2974 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2975                       bool force_nonblock)
2976 {
2977 #if defined(CONFIG_NET)
2978         struct io_async_msghdr *kmsg = NULL;
2979         struct socket *sock;
2980         int ret;
2981
2982         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2983                 return -EINVAL;
2984
2985         sock = sock_from_file(req->file, &ret);
2986         if (sock) {
2987                 struct io_async_ctx io;
2988                 struct sockaddr_storage addr;
2989                 unsigned flags;
2990
2991                 if (req->io) {
2992                         kmsg = &req->io->msg;
2993                         kmsg->msg.msg_name = &addr;
2994                         /* if iov is set, it's allocated already */
2995                         if (!kmsg->iov)
2996                                 kmsg->iov = kmsg->fast_iov;
2997                         kmsg->msg.msg_iter.iov = kmsg->iov;
2998                 } else {
2999                         struct io_sr_msg *sr = &req->sr_msg;
3000
3001                         kmsg = &io.msg;
3002                         kmsg->msg.msg_name = &addr;
3003
3004                         io.msg.iov = io.msg.fast_iov;
3005                         ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg,
3006                                         sr->msg_flags, &io.msg.uaddr,
3007                                         &io.msg.iov);
3008                         if (ret)
3009                                 return ret;
3010                 }
3011
3012                 flags = req->sr_msg.msg_flags;
3013                 if (flags & MSG_DONTWAIT)
3014                         req->flags |= REQ_F_NOWAIT;
3015                 else if (force_nonblock)
3016                         flags |= MSG_DONTWAIT;
3017
3018                 ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
3019                                                 kmsg->uaddr, flags);
3020                 if (force_nonblock && ret == -EAGAIN) {
3021                         if (req->io)
3022                                 return -EAGAIN;
3023                         if (io_alloc_async_ctx(req))
3024                                 return -ENOMEM;
3025                         memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
3026                         req->work.func = io_sendrecv_async;
3027                         return -EAGAIN;
3028                 }
3029                 if (ret == -ERESTARTSYS)
3030                         ret = -EINTR;
3031         }
3032
3033         if (!io_wq_current_is_worker() && kmsg && kmsg->iov != kmsg->fast_iov)
3034                 kfree(kmsg->iov);
3035         io_cqring_add_event(req, ret);
3036         if (ret < 0)
3037                 req_set_fail_links(req);
3038         io_put_req_find_next(req, nxt);
3039         return 0;
3040 #else
3041         return -EOPNOTSUPP;
3042 #endif
3043 }
3044
3045 static int io_recv(struct io_kiocb *req, struct io_kiocb **nxt,
3046                    bool force_nonblock)
3047 {
3048 #if defined(CONFIG_NET)
3049         struct socket *sock;
3050         int ret;
3051
3052         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3053                 return -EINVAL;
3054
3055         sock = sock_from_file(req->file, &ret);
3056         if (sock) {
3057                 struct io_sr_msg *sr = &req->sr_msg;
3058                 struct msghdr msg;
3059                 struct iovec iov;
3060                 unsigned flags;
3061
3062                 ret = import_single_range(READ, sr->buf, sr->len, &iov,
3063                                                 &msg.msg_iter);
3064                 if (ret)
3065                         return ret;
3066
3067                 msg.msg_name = NULL;
3068                 msg.msg_control = NULL;
3069                 msg.msg_controllen = 0;
3070                 msg.msg_namelen = 0;
3071                 msg.msg_iocb = NULL;
3072                 msg.msg_flags = 0;
3073
3074                 flags = req->sr_msg.msg_flags;
3075                 if (flags & MSG_DONTWAIT)
3076                         req->flags |= REQ_F_NOWAIT;
3077                 else if (force_nonblock)
3078                         flags |= MSG_DONTWAIT;
3079
3080                 ret = __sys_recvmsg_sock(sock, &msg, NULL, NULL, flags);
3081                 if (force_nonblock && ret == -EAGAIN)
3082                         return -EAGAIN;
3083                 if (ret == -ERESTARTSYS)
3084                         ret = -EINTR;
3085         }
3086
3087         io_cqring_add_event(req, ret);
3088         if (ret < 0)
3089                 req_set_fail_links(req);
3090         io_put_req_find_next(req, nxt);
3091         return 0;
3092 #else
3093         return -EOPNOTSUPP;
3094 #endif
3095 }
3096
3097
3098 static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3099 {
3100 #if defined(CONFIG_NET)
3101         struct io_accept *accept = &req->accept;
3102
3103         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3104                 return -EINVAL;
3105         if (sqe->ioprio || sqe->len || sqe->buf_index)
3106                 return -EINVAL;
3107
3108         accept->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3109         accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2));
3110         accept->flags = READ_ONCE(sqe->accept_flags);
3111         return 0;
3112 #else
3113         return -EOPNOTSUPP;
3114 #endif
3115 }
3116
3117 #if defined(CONFIG_NET)
3118 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
3119                        bool force_nonblock)
3120 {
3121         struct io_accept *accept = &req->accept;
3122         unsigned file_flags;
3123         int ret;
3124
3125         file_flags = force_nonblock ? O_NONBLOCK : 0;
3126         ret = __sys_accept4_file(req->file, file_flags, accept->addr,
3127                                         accept->addr_len, accept->flags);
3128         if (ret == -EAGAIN && force_nonblock)
3129                 return -EAGAIN;
3130         if (ret == -ERESTARTSYS)
3131                 ret = -EINTR;
3132         if (ret < 0)
3133                 req_set_fail_links(req);
3134         io_cqring_add_event(req, ret);
3135         io_put_req_find_next(req, nxt);
3136         return 0;
3137 }
3138
3139 static void io_accept_finish(struct io_wq_work **workptr)
3140 {
3141         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3142         struct io_kiocb *nxt = NULL;
3143
3144         if (io_req_cancelled(req))
3145                 return;
3146         __io_accept(req, &nxt, false);
3147         if (nxt)
3148                 io_wq_assign_next(workptr, nxt);
3149 }
3150 #endif
3151
3152 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
3153                      bool force_nonblock)
3154 {
3155 #if defined(CONFIG_NET)
3156         int ret;
3157
3158         ret = __io_accept(req, nxt, force_nonblock);
3159         if (ret == -EAGAIN && force_nonblock) {
3160                 req->work.func = io_accept_finish;
3161                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3162                 io_put_req(req);
3163                 return -EAGAIN;
3164         }
3165         return 0;
3166 #else
3167         return -EOPNOTSUPP;
3168 #endif
3169 }
3170
3171 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3172 {
3173 #if defined(CONFIG_NET)
3174         struct io_connect *conn = &req->connect;
3175         struct io_async_ctx *io = req->io;
3176
3177         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
3178                 return -EINVAL;
3179         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
3180                 return -EINVAL;
3181
3182         conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
3183         conn->addr_len =  READ_ONCE(sqe->addr2);
3184
3185         if (!io)
3186                 return 0;
3187
3188         return move_addr_to_kernel(conn->addr, conn->addr_len,
3189                                         &io->connect.address);
3190 #else
3191         return -EOPNOTSUPP;
3192 #endif
3193 }
3194
3195 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
3196                       bool force_nonblock)
3197 {
3198 #if defined(CONFIG_NET)
3199         struct io_async_ctx __io, *io;
3200         unsigned file_flags;
3201         int ret;
3202
3203         if (req->io) {
3204                 io = req->io;
3205         } else {
3206                 ret = move_addr_to_kernel(req->connect.addr,
3207                                                 req->connect.addr_len,
3208                                                 &__io.connect.address);
3209                 if (ret)
3210                         goto out;
3211                 io = &__io;
3212         }
3213
3214         file_flags = force_nonblock ? O_NONBLOCK : 0;
3215
3216         ret = __sys_connect_file(req->file, &io->connect.address,
3217                                         req->connect.addr_len, file_flags);
3218         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
3219                 if (req->io)
3220                         return -EAGAIN;
3221                 if (io_alloc_async_ctx(req)) {
3222                         ret = -ENOMEM;
3223                         goto out;
3224                 }
3225                 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
3226                 return -EAGAIN;
3227         }
3228         if (ret == -ERESTARTSYS)
3229                 ret = -EINTR;
3230 out:
3231         if (ret < 0)
3232                 req_set_fail_links(req);
3233         io_cqring_add_event(req, ret);
3234         io_put_req_find_next(req, nxt);
3235         return 0;
3236 #else
3237         return -EOPNOTSUPP;
3238 #endif
3239 }
3240
3241 static void io_poll_remove_one(struct io_kiocb *req)
3242 {
3243         struct io_poll_iocb *poll = &req->poll;
3244
3245         spin_lock(&poll->head->lock);
3246         WRITE_ONCE(poll->canceled, true);
3247         if (!list_empty(&poll->wait.entry)) {
3248                 list_del_init(&poll->wait.entry);
3249                 io_queue_async_work(req);
3250         }
3251         spin_unlock(&poll->head->lock);
3252         hash_del(&req->hash_node);
3253 }
3254
3255 static void io_poll_remove_all(struct io_ring_ctx *ctx)
3256 {
3257         struct hlist_node *tmp;
3258         struct io_kiocb *req;
3259         int i;
3260
3261         spin_lock_irq(&ctx->completion_lock);
3262         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
3263                 struct hlist_head *list;
3264
3265                 list = &ctx->cancel_hash[i];
3266                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
3267                         io_poll_remove_one(req);
3268         }
3269         spin_unlock_irq(&ctx->completion_lock);
3270 }
3271
3272 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
3273 {
3274         struct hlist_head *list;
3275         struct io_kiocb *req;
3276
3277         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
3278         hlist_for_each_entry(req, list, hash_node) {
3279                 if (sqe_addr == req->user_data) {
3280                         io_poll_remove_one(req);
3281                         return 0;
3282                 }
3283         }
3284
3285         return -ENOENT;
3286 }
3287
3288 static int io_poll_remove_prep(struct io_kiocb *req,
3289                                const struct io_uring_sqe *sqe)
3290 {
3291         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3292                 return -EINVAL;
3293         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
3294             sqe->poll_events)
3295                 return -EINVAL;
3296
3297         req->poll.addr = READ_ONCE(sqe->addr);
3298         return 0;
3299 }
3300
3301 /*
3302  * Find a running poll command that matches one specified in sqe->addr,
3303  * and remove it if found.
3304  */
3305 static int io_poll_remove(struct io_kiocb *req)
3306 {
3307         struct io_ring_ctx *ctx = req->ctx;
3308         u64 addr;
3309         int ret;
3310
3311         addr = req->poll.addr;
3312         spin_lock_irq(&ctx->completion_lock);
3313         ret = io_poll_cancel(ctx, addr);
3314         spin_unlock_irq(&ctx->completion_lock);
3315
3316         io_cqring_add_event(req, ret);
3317         if (ret < 0)
3318                 req_set_fail_links(req);
3319         io_put_req(req);
3320         return 0;
3321 }
3322
3323 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
3324 {
3325         struct io_ring_ctx *ctx = req->ctx;
3326
3327         req->poll.done = true;
3328         if (error)
3329                 io_cqring_fill_event(req, error);
3330         else
3331                 io_cqring_fill_event(req, mangle_poll(mask));
3332         io_commit_cqring(ctx);
3333 }
3334
3335 static void io_poll_complete_work(struct io_wq_work **workptr)
3336 {
3337         struct io_wq_work *work = *workptr;
3338         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3339         struct io_poll_iocb *poll = &req->poll;
3340         struct poll_table_struct pt = { ._key = poll->events };
3341         struct io_ring_ctx *ctx = req->ctx;
3342         struct io_kiocb *nxt = NULL;
3343         __poll_t mask = 0;
3344         int ret = 0;
3345
3346         if (work->flags & IO_WQ_WORK_CANCEL) {
3347                 WRITE_ONCE(poll->canceled, true);
3348                 ret = -ECANCELED;
3349         } else if (READ_ONCE(poll->canceled)) {
3350                 ret = -ECANCELED;
3351         }
3352
3353         if (ret != -ECANCELED)
3354                 mask = vfs_poll(poll->file, &pt) & poll->events;
3355
3356         /*
3357          * Note that ->ki_cancel callers also delete iocb from active_reqs after
3358          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
3359          * synchronize with them.  In the cancellation case the list_del_init
3360          * itself is not actually needed, but harmless so we keep it in to
3361          * avoid further branches in the fast path.
3362          */
3363         spin_lock_irq(&ctx->completion_lock);
3364         if (!mask && ret != -ECANCELED) {
3365                 add_wait_queue(poll->head, &poll->wait);
3366                 spin_unlock_irq(&ctx->completion_lock);
3367                 return;
3368         }
3369         hash_del(&req->hash_node);
3370         io_poll_complete(req, mask, ret);
3371         spin_unlock_irq(&ctx->completion_lock);
3372
3373         io_cqring_ev_posted(ctx);
3374
3375         if (ret < 0)
3376                 req_set_fail_links(req);
3377         io_put_req_find_next(req, &nxt);
3378         if (nxt)
3379                 io_wq_assign_next(workptr, nxt);
3380 }
3381
3382 static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes)
3383 {
3384         struct io_kiocb *req, *tmp;
3385         struct req_batch rb;
3386
3387         rb.to_free = rb.need_iter = 0;
3388         spin_lock_irq(&ctx->completion_lock);
3389         llist_for_each_entry_safe(req, tmp, nodes, llist_node) {
3390                 hash_del(&req->hash_node);
3391                 io_poll_complete(req, req->result, 0);
3392
3393                 if (refcount_dec_and_test(&req->refs) &&
3394                     !io_req_multi_free(&rb, req)) {
3395                         req->flags |= REQ_F_COMP_LOCKED;
3396                         io_free_req(req);
3397                 }
3398         }
3399         spin_unlock_irq(&ctx->completion_lock);
3400
3401         io_cqring_ev_posted(ctx);
3402         io_free_req_many(ctx, &rb);
3403 }
3404
3405 static void io_poll_flush(struct io_wq_work **workptr)
3406 {
3407         struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
3408         struct llist_node *nodes;
3409
3410         nodes = llist_del_all(&req->ctx->poll_llist);
3411         if (nodes)
3412                 __io_poll_flush(req->ctx, nodes);
3413 }
3414
3415 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
3416                         void *key)
3417 {
3418         struct io_poll_iocb *poll = wait->private;
3419         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
3420         struct io_ring_ctx *ctx = req->ctx;
3421         __poll_t mask = key_to_poll(key);
3422
3423         /* for instances that support it check for an event match first: */
3424         if (mask && !(mask & poll->events))
3425                 return 0;
3426
3427         list_del_init(&poll->wait.entry);
3428
3429         /*
3430          * Run completion inline if we can. We're using trylock here because
3431          * we are violating the completion_lock -> poll wq lock ordering.
3432          * If we have a link timeout we're going to need the completion_lock
3433          * for finalizing the request, mark us as having grabbed that already.
3434          */
3435         if (mask) {
3436                 unsigned long flags;
3437
3438                 if (llist_empty(&ctx->poll_llist) &&
3439                     spin_trylock_irqsave(&ctx->completion_lock, flags)) {
3440                         hash_del(&req->hash_node);
3441                         io_poll_complete(req, mask, 0);
3442                         req->flags |= REQ_F_COMP_LOCKED;
3443                         io_put_req(req);
3444                         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3445
3446                         io_cqring_ev_posted(ctx);
3447                         req = NULL;
3448                 } else {
3449                         req->result = mask;
3450                         req->llist_node.next = NULL;
3451                         /* if the list wasn't empty, we're done */
3452                         if (!llist_add(&req->llist_node, &ctx->poll_llist))
3453                                 req = NULL;
3454                         else
3455                                 req->work.func = io_poll_flush;
3456                 }
3457         }
3458         if (req)
3459                 io_queue_async_work(req);
3460
3461         return 1;
3462 }
3463
3464 struct io_poll_table {
3465         struct poll_table_struct pt;
3466         struct io_kiocb *req;
3467         int error;
3468 };
3469
3470 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
3471                                struct poll_table_struct *p)
3472 {
3473         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
3474
3475         if (unlikely(pt->req->poll.head)) {
3476                 pt->error = -EINVAL;
3477                 return;
3478         }
3479
3480         pt->error = 0;
3481         pt->req->poll.head = head;
3482         add_wait_queue(head, &pt->req->poll.wait);
3483 }
3484
3485 static void io_poll_req_insert(struct io_kiocb *req)
3486 {
3487         struct io_ring_ctx *ctx = req->ctx;
3488         struct hlist_head *list;
3489
3490         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
3491         hlist_add_head(&req->hash_node, list);
3492 }
3493
3494 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3495 {
3496         struct io_poll_iocb *poll = &req->poll;
3497         u16 events;
3498
3499         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3500                 return -EINVAL;
3501         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
3502                 return -EINVAL;
3503         if (!poll->file)
3504                 return -EBADF;
3505
3506         events = READ_ONCE(sqe->poll_events);
3507         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
3508         return 0;
3509 }
3510
3511 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
3512 {
3513         struct io_poll_iocb *poll = &req->poll;
3514         struct io_ring_ctx *ctx = req->ctx;
3515         struct io_poll_table ipt;
3516         bool cancel = false;
3517         __poll_t mask;
3518
3519         INIT_IO_WORK(&req->work, io_poll_complete_work);
3520         INIT_HLIST_NODE(&req->hash_node);
3521
3522         poll->head = NULL;
3523         poll->done = false;
3524         poll->canceled = false;
3525
3526         ipt.pt._qproc = io_poll_queue_proc;
3527         ipt.pt._key = poll->events;
3528         ipt.req = req;
3529         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
3530
3531         /* initialized the list so that we can do list_empty checks */
3532         INIT_LIST_HEAD(&poll->wait.entry);
3533         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
3534         poll->wait.private = poll;
3535
3536         INIT_LIST_HEAD(&req->list);
3537
3538         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
3539
3540         spin_lock_irq(&ctx->completion_lock);
3541         if (likely(poll->head)) {
3542                 spin_lock(&poll->head->lock);
3543                 if (unlikely(list_empty(&poll->wait.entry))) {
3544                         if (ipt.error)
3545                                 cancel = true;
3546                         ipt.error = 0;
3547                         mask = 0;
3548                 }
3549                 if (mask || ipt.error)
3550                         list_del_init(&poll->wait.entry);
3551                 else if (cancel)
3552                         WRITE_ONCE(poll->canceled, true);
3553                 else if (!poll->done) /* actually waiting for an event */
3554                         io_poll_req_insert(req);
3555                 spin_unlock(&poll->head->lock);
3556         }
3557         if (mask) { /* no async, we'd stolen it */
3558                 ipt.error = 0;
3559                 io_poll_complete(req, mask, 0);
3560         }
3561         spin_unlock_irq(&ctx->completion_lock);
3562
3563         if (mask) {
3564                 io_cqring_ev_posted(ctx);
3565                 io_put_req_find_next(req, nxt);
3566         }
3567         return ipt.error;
3568 }
3569
3570 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
3571 {
3572         struct io_timeout_data *data = container_of(timer,
3573                                                 struct io_timeout_data, timer);
3574         struct io_kiocb *req = data->req;
3575         struct io_ring_ctx *ctx = req->ctx;
3576         unsigned long flags;
3577
3578         atomic_inc(&ctx->cq_timeouts);
3579
3580         spin_lock_irqsave(&ctx->completion_lock, flags);
3581         /*
3582          * We could be racing with timeout deletion. If the list is empty,
3583          * then timeout lookup already found it and will be handling it.
3584          */
3585         if (!list_empty(&req->list)) {
3586                 struct io_kiocb *prev;
3587
3588                 /*
3589                  * Adjust the reqs sequence before the current one because it
3590                  * will consume a slot in the cq_ring and the cq_tail
3591                  * pointer will be increased, otherwise other timeout reqs may
3592                  * return in advance without waiting for enough wait_nr.
3593                  */
3594                 prev = req;
3595                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
3596                         prev->sequence++;
3597                 list_del_init(&req->list);
3598         }
3599
3600         io_cqring_fill_event(req, -ETIME);
3601         io_commit_cqring(ctx);
3602         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3603
3604         io_cqring_ev_posted(ctx);
3605         req_set_fail_links(req);
3606         io_put_req(req);
3607         return HRTIMER_NORESTART;
3608 }
3609
3610 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
3611 {
3612         struct io_kiocb *req;
3613         int ret = -ENOENT;
3614
3615         list_for_each_entry(req, &ctx->timeout_list, list) {
3616                 if (user_data == req->user_data) {
3617                         list_del_init(&req->list);
3618                         ret = 0;
3619                         break;
3620                 }
3621         }
3622
3623         if (ret == -ENOENT)
3624                 return ret;
3625
3626         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
3627         if (ret == -1)
3628                 return -EALREADY;
3629
3630         req_set_fail_links(req);
3631         io_cqring_fill_event(req, -ECANCELED);
3632         io_put_req(req);
3633         return 0;
3634 }
3635
3636 static int io_timeout_remove_prep(struct io_kiocb *req,
3637                                   const struct io_uring_sqe *sqe)
3638 {
3639         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3640                 return -EINVAL;
3641         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
3642                 return -EINVAL;
3643
3644         req->timeout.addr = READ_ONCE(sqe->addr);
3645         req->timeout.flags = READ_ONCE(sqe->timeout_flags);
3646         if (req->timeout.flags)
3647                 return -EINVAL;
3648
3649         return 0;
3650 }
3651
3652 /*
3653  * Remove or update an existing timeout command
3654  */
3655 static int io_timeout_remove(struct io_kiocb *req)
3656 {
3657         struct io_ring_ctx *ctx = req->ctx;
3658         int ret;
3659
3660         spin_lock_irq(&ctx->completion_lock);
3661         ret = io_timeout_cancel(ctx, req->timeout.addr);
3662
3663         io_cqring_fill_event(req, ret);
3664         io_commit_cqring(ctx);
3665         spin_unlock_irq(&ctx->completion_lock);
3666         io_cqring_ev_posted(ctx);
3667         if (ret < 0)
3668                 req_set_fail_links(req);
3669         io_put_req(req);
3670         return 0;
3671 }
3672
3673 static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3674                            bool is_timeout_link)
3675 {
3676         struct io_timeout_data *data;
3677         unsigned flags;
3678
3679         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3680                 return -EINVAL;
3681         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
3682                 return -EINVAL;
3683         if (sqe->off && is_timeout_link)
3684                 return -EINVAL;
3685         flags = READ_ONCE(sqe->timeout_flags);
3686         if (flags & ~IORING_TIMEOUT_ABS)
3687                 return -EINVAL;
3688
3689         req->timeout.count = READ_ONCE(sqe->off);
3690
3691         if (!req->io && io_alloc_async_ctx(req))
3692                 return -ENOMEM;
3693
3694         data = &req->io->timeout;
3695         data->req = req;
3696         req->flags |= REQ_F_TIMEOUT;
3697
3698         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
3699                 return -EFAULT;
3700
3701         if (flags & IORING_TIMEOUT_ABS)
3702                 data->mode = HRTIMER_MODE_ABS;
3703         else
3704                 data->mode = HRTIMER_MODE_REL;
3705
3706         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
3707         return 0;
3708 }
3709
3710 static int io_timeout(struct io_kiocb *req)
3711 {
3712         unsigned count;
3713         struct io_ring_ctx *ctx = req->ctx;
3714         struct io_timeout_data *data;
3715         struct list_head *entry;
3716         unsigned span = 0;
3717
3718         data = &req->io->timeout;
3719
3720         /*
3721          * sqe->off holds how many events that need to occur for this
3722          * timeout event to be satisfied. If it isn't set, then this is
3723          * a pure timeout request, sequence isn't used.
3724          */
3725         count = req->timeout.count;
3726         if (!count) {
3727                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
3728                 spin_lock_irq(&ctx->completion_lock);
3729                 entry = ctx->timeout_list.prev;
3730                 goto add;
3731         }
3732
3733         req->sequence = ctx->cached_sq_head + count - 1;
3734         data->seq_offset = count;
3735
3736         /*
3737          * Insertion sort, ensuring the first entry in the list is always
3738          * the one we need first.
3739          */
3740         spin_lock_irq(&ctx->completion_lock);
3741         list_for_each_prev(entry, &ctx->timeout_list) {
3742                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
3743                 unsigned nxt_sq_head;
3744                 long long tmp, tmp_nxt;
3745                 u32 nxt_offset = nxt->io->timeout.seq_offset;
3746
3747                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
3748                         continue;
3749
3750                 /*
3751                  * Since cached_sq_head + count - 1 can overflow, use type long
3752                  * long to store it.
3753                  */
3754                 tmp = (long long)ctx->cached_sq_head + count - 1;
3755                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
3756                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
3757
3758                 /*
3759                  * cached_sq_head may overflow, and it will never overflow twice
3760                  * once there is some timeout req still be valid.
3761                  */
3762                 if (ctx->cached_sq_head < nxt_sq_head)
3763                         tmp += UINT_MAX;
3764
3765                 if (tmp > tmp_nxt)
3766                         break;
3767
3768                 /*
3769                  * Sequence of reqs after the insert one and itself should
3770                  * be adjusted because each timeout req consumes a slot.
3771                  */
3772                 span++;
3773                 nxt->sequence++;
3774         }
3775         req->sequence -= span;
3776 add:
3777         list_add(&req->list, entry);
3778         data->timer.function = io_timeout_fn;
3779         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
3780         spin_unlock_irq(&ctx->completion_lock);
3781         return 0;
3782 }
3783
3784 static bool io_cancel_cb(struct io_wq_work *work, void *data)
3785 {
3786         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3787
3788         return req->user_data == (unsigned long) data;
3789 }
3790
3791 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
3792 {
3793         enum io_wq_cancel cancel_ret;
3794         int ret = 0;
3795
3796         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
3797         switch (cancel_ret) {
3798         case IO_WQ_CANCEL_OK:
3799                 ret = 0;
3800                 break;
3801         case IO_WQ_CANCEL_RUNNING:
3802                 ret = -EALREADY;
3803                 break;
3804         case IO_WQ_CANCEL_NOTFOUND:
3805                 ret = -ENOENT;
3806                 break;
3807         }
3808
3809         return ret;
3810 }
3811
3812 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
3813                                      struct io_kiocb *req, __u64 sqe_addr,
3814                                      struct io_kiocb **nxt, int success_ret)
3815 {
3816         unsigned long flags;
3817         int ret;
3818
3819         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
3820         if (ret != -ENOENT) {
3821                 spin_lock_irqsave(&ctx->completion_lock, flags);
3822                 goto done;
3823         }
3824
3825         spin_lock_irqsave(&ctx->completion_lock, flags);
3826         ret = io_timeout_cancel(ctx, sqe_addr);
3827         if (ret != -ENOENT)
3828                 goto done;
3829         ret = io_poll_cancel(ctx, sqe_addr);
3830 done:
3831         if (!ret)
3832                 ret = success_ret;
3833         io_cqring_fill_event(req, ret);
3834         io_commit_cqring(ctx);
3835         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3836         io_cqring_ev_posted(ctx);
3837
3838         if (ret < 0)
3839                 req_set_fail_links(req);
3840         io_put_req_find_next(req, nxt);
3841 }
3842
3843 static int io_async_cancel_prep(struct io_kiocb *req,
3844                                 const struct io_uring_sqe *sqe)
3845 {
3846         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3847                 return -EINVAL;
3848         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3849             sqe->cancel_flags)
3850                 return -EINVAL;
3851
3852         req->cancel.addr = READ_ONCE(sqe->addr);
3853         return 0;
3854 }
3855
3856 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3857 {
3858         struct io_ring_ctx *ctx = req->ctx;
3859
3860         io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0);
3861         return 0;
3862 }
3863
3864 static int io_files_update_prep(struct io_kiocb *req,
3865                                 const struct io_uring_sqe *sqe)
3866 {
3867         if (sqe->flags || sqe->ioprio || sqe->rw_flags)
3868                 return -EINVAL;
3869
3870         req->files_update.offset = READ_ONCE(sqe->off);
3871         req->files_update.nr_args = READ_ONCE(sqe->len);
3872         if (!req->files_update.nr_args)
3873                 return -EINVAL;
3874         req->files_update.arg = READ_ONCE(sqe->addr);
3875         return 0;
3876 }
3877
3878 static int io_files_update(struct io_kiocb *req, bool force_nonblock)
3879 {
3880         struct io_ring_ctx *ctx = req->ctx;
3881         struct io_uring_files_update up;
3882         int ret;
3883
3884         if (force_nonblock) {
3885                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
3886                 return -EAGAIN;
3887         }
3888
3889         up.offset = req->files_update.offset;
3890         up.fds = req->files_update.arg;
3891
3892         mutex_lock(&ctx->uring_lock);
3893         ret = __io_sqe_files_update(ctx, &up, req->files_update.nr_args);
3894         mutex_unlock(&ctx->uring_lock);
3895
3896         if (ret < 0)
3897                 req_set_fail_links(req);
3898         io_cqring_add_event(req, ret);
3899         io_put_req(req);
3900         return 0;
3901 }
3902
3903 static int io_req_defer_prep(struct io_kiocb *req,
3904                              const struct io_uring_sqe *sqe)
3905 {
3906         ssize_t ret = 0;
3907
3908         switch (req->opcode) {
3909         case IORING_OP_NOP:
3910                 break;
3911         case IORING_OP_READV:
3912         case IORING_OP_READ_FIXED:
3913         case IORING_OP_READ:
3914                 ret = io_read_prep(req, sqe, true);
3915                 break;
3916         case IORING_OP_WRITEV:
3917         case IORING_OP_WRITE_FIXED:
3918         case IORING_OP_WRITE:
3919                 ret = io_write_prep(req, sqe, true);
3920                 break;
3921         case IORING_OP_POLL_ADD:
3922                 ret = io_poll_add_prep(req, sqe);
3923                 break;
3924         case IORING_OP_POLL_REMOVE:
3925                 ret = io_poll_remove_prep(req, sqe);
3926                 break;
3927         case IORING_OP_FSYNC:
3928                 ret = io_prep_fsync(req, sqe);
3929                 break;
3930         case IORING_OP_SYNC_FILE_RANGE:
3931                 ret = io_prep_sfr(req, sqe);
3932                 break;
3933         case IORING_OP_SENDMSG:
3934         case IORING_OP_SEND:
3935                 ret = io_sendmsg_prep(req, sqe);
3936                 break;
3937         case IORING_OP_RECVMSG:
3938         case IORING_OP_RECV:
3939                 ret = io_recvmsg_prep(req, sqe);
3940                 break;
3941         case IORING_OP_CONNECT:
3942                 ret = io_connect_prep(req, sqe);
3943                 break;
3944         case IORING_OP_TIMEOUT:
3945                 ret = io_timeout_prep(req, sqe, false);
3946                 break;
3947         case IORING_OP_TIMEOUT_REMOVE:
3948                 ret = io_timeout_remove_prep(req, sqe);
3949                 break;
3950         case IORING_OP_ASYNC_CANCEL:
3951                 ret = io_async_cancel_prep(req, sqe);
3952                 break;
3953         case IORING_OP_LINK_TIMEOUT:
3954                 ret = io_timeout_prep(req, sqe, true);
3955                 break;
3956         case IORING_OP_ACCEPT:
3957                 ret = io_accept_prep(req, sqe);
3958                 break;
3959         case IORING_OP_FALLOCATE:
3960                 ret = io_fallocate_prep(req, sqe);
3961                 break;
3962         case IORING_OP_OPENAT:
3963                 ret = io_openat_prep(req, sqe);
3964                 break;
3965         case IORING_OP_CLOSE:
3966                 ret = io_close_prep(req, sqe);
3967                 break;
3968         case IORING_OP_FILES_UPDATE:
3969                 ret = io_files_update_prep(req, sqe);
3970                 break;
3971         case IORING_OP_STATX:
3972                 ret = io_statx_prep(req, sqe);
3973                 break;
3974         case IORING_OP_FADVISE:
3975                 ret = io_fadvise_prep(req, sqe);
3976                 break;
3977         case IORING_OP_MADVISE:
3978                 ret = io_madvise_prep(req, sqe);
3979                 break;
3980         default:
3981                 printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
3982                                 req->opcode);
3983                 ret = -EINVAL;
3984                 break;
3985         }
3986
3987         return ret;
3988 }
3989
3990 static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3991 {
3992         struct io_ring_ctx *ctx = req->ctx;
3993         int ret;
3994
3995         /* Still need defer if there is pending req in defer list. */
3996         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
3997                 return 0;
3998
3999         if (!req->io && io_alloc_async_ctx(req))
4000                 return -EAGAIN;
4001
4002         ret = io_req_defer_prep(req, sqe);
4003         if (ret < 0)
4004                 return ret;
4005
4006         spin_lock_irq(&ctx->completion_lock);
4007         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
4008                 spin_unlock_irq(&ctx->completion_lock);
4009                 return 0;
4010         }
4011
4012         trace_io_uring_defer(ctx, req, req->user_data);
4013         list_add_tail(&req->list, &ctx->defer_list);
4014         spin_unlock_irq(&ctx->completion_lock);
4015         return -EIOCBQUEUED;
4016 }
4017
4018 static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4019                         struct io_kiocb **nxt, bool force_nonblock)
4020 {
4021         struct io_ring_ctx *ctx = req->ctx;
4022         int ret;
4023
4024         switch (req->opcode) {
4025         case IORING_OP_NOP:
4026                 ret = io_nop(req);
4027                 break;
4028         case IORING_OP_READV:
4029         case IORING_OP_READ_FIXED:
4030         case IORING_OP_READ:
4031                 if (sqe) {
4032                         ret = io_read_prep(req, sqe, force_nonblock);
4033                         if (ret < 0)
4034                                 break;
4035                 }
4036                 ret = io_read(req, nxt, force_nonblock);
4037                 break;
4038         case IORING_OP_WRITEV:
4039         case IORING_OP_WRITE_FIXED:
4040         case IORING_OP_WRITE:
4041                 if (sqe) {
4042                         ret = io_write_prep(req, sqe, force_nonblock);
4043                         if (ret < 0)
4044                                 break;
4045                 }
4046                 ret = io_write(req, nxt, force_nonblock);
4047                 break;
4048         case IORING_OP_FSYNC:
4049                 if (sqe) {
4050                         ret = io_prep_fsync(req, sqe);
4051                         if (ret < 0)
4052                                 break;
4053                 }
4054                 ret = io_fsync(req, nxt, force_nonblock);
4055                 break;
4056         case IORING_OP_POLL_ADD:
4057                 if (sqe) {
4058                         ret = io_poll_add_prep(req, sqe);
4059                         if (ret)
4060                                 break;
4061                 }
4062                 ret = io_poll_add(req, nxt);
4063                 break;
4064         case IORING_OP_POLL_REMOVE:
4065                 if (sqe) {
4066                         ret = io_poll_remove_prep(req, sqe);
4067                         if (ret < 0)
4068                                 break;
4069                 }
4070                 ret = io_poll_remove(req);
4071                 break;
4072         case IORING_OP_SYNC_FILE_RANGE:
4073                 if (sqe) {
4074                         ret = io_prep_sfr(req, sqe);
4075                         if (ret < 0)
4076                                 break;
4077                 }
4078                 ret = io_sync_file_range(req, nxt, force_nonblock);
4079                 break;
4080         case IORING_OP_SENDMSG:
4081         case IORING_OP_SEND:
4082                 if (sqe) {
4083                         ret = io_sendmsg_prep(req, sqe);
4084                         if (ret < 0)
4085                                 break;
4086                 }
4087                 if (req->opcode == IORING_OP_SENDMSG)
4088                         ret = io_sendmsg(req, nxt, force_nonblock);
4089                 else
4090                         ret = io_send(req, nxt, force_nonblock);
4091                 break;
4092         case IORING_OP_RECVMSG:
4093         case IORING_OP_RECV:
4094                 if (sqe) {
4095                         ret = io_recvmsg_prep(req, sqe);
4096                         if (ret)
4097                                 break;
4098                 }
4099                 if (req->opcode == IORING_OP_RECVMSG)
4100                         ret = io_recvmsg(req, nxt, force_nonblock);
4101                 else
4102                         ret = io_recv(req, nxt, force_nonblock);
4103                 break;
4104         case IORING_OP_TIMEOUT:
4105                 if (sqe) {
4106                         ret = io_timeout_prep(req, sqe, false);
4107                         if (ret)
4108                                 break;
4109                 }
4110                 ret = io_timeout(req);
4111                 break;
4112         case IORING_OP_TIMEOUT_REMOVE:
4113                 if (sqe) {
4114                         ret = io_timeout_remove_prep(req, sqe);
4115                         if (ret)
4116                                 break;
4117                 }
4118                 ret = io_timeout_remove(req);
4119                 break;
4120         case IORING_OP_ACCEPT:
4121                 if (sqe) {
4122                         ret = io_accept_prep(req, sqe);
4123                         if (ret)
4124                                 break;
4125                 }
4126                 ret = io_accept(req, nxt, force_nonblock);
4127                 break;
4128         case IORING_OP_CONNECT:
4129                 if (sqe) {
4130                         ret = io_connect_prep(req, sqe);
4131                         if (ret)
4132                                 break;
4133                 }
4134                 ret = io_connect(req, nxt, force_nonblock);
4135                 break;
4136         case IORING_OP_ASYNC_CANCEL:
4137                 if (sqe) {
4138                         ret = io_async_cancel_prep(req, sqe);
4139                         if (ret)
4140                                 break;
4141                 }
4142                 ret = io_async_cancel(req, nxt);
4143                 break;
4144         case IORING_OP_FALLOCATE:
4145                 if (sqe) {
4146                         ret = io_fallocate_prep(req, sqe);
4147                         if (ret)
4148                                 break;
4149                 }
4150                 ret = io_fallocate(req, nxt, force_nonblock);
4151                 break;
4152         case IORING_OP_OPENAT:
4153                 if (sqe) {
4154                         ret = io_openat_prep(req, sqe);
4155                         if (ret)
4156                                 break;
4157                 }
4158                 ret = io_openat(req, nxt, force_nonblock);
4159                 break;
4160         case IORING_OP_CLOSE:
4161                 if (sqe) {
4162                         ret = io_close_prep(req, sqe);
4163                         if (ret)
4164                                 break;
4165                 }
4166                 ret = io_close(req, nxt, force_nonblock);
4167                 break;
4168         case IORING_OP_FILES_UPDATE:
4169                 if (sqe) {
4170                         ret = io_files_update_prep(req, sqe);
4171                         if (ret)
4172                                 break;
4173                 }
4174                 ret = io_files_update(req, force_nonblock);
4175                 break;
4176         case IORING_OP_STATX:
4177                 if (sqe) {
4178                         ret = io_statx_prep(req, sqe);
4179                         if (ret)
4180                                 break;
4181                 }
4182                 ret = io_statx(req, nxt, force_nonblock);
4183                 break;
4184         case IORING_OP_FADVISE:
4185                 if (sqe) {
4186                         ret = io_fadvise_prep(req, sqe);
4187                         if (ret)
4188                                 break;
4189                 }
4190                 ret = io_fadvise(req, nxt, force_nonblock);
4191                 break;
4192         case IORING_OP_MADVISE:
4193                 if (sqe) {
4194                         ret = io_madvise_prep(req, sqe);
4195                         if (ret)
4196                                 break;
4197                 }
4198                 ret = io_madvise(req, nxt, force_nonblock);
4199                 break;
4200         default:
4201                 ret = -EINVAL;
4202                 break;
4203         }
4204
4205         if (ret)
4206                 return ret;
4207
4208         if (ctx->flags & IORING_SETUP_IOPOLL) {
4209                 const bool in_async = io_wq_current_is_worker();
4210
4211                 if (req->result == -EAGAIN)
4212                         return -EAGAIN;
4213
4214                 /* workqueue context doesn't hold uring_lock, grab it now */
4215                 if (in_async)
4216                         mutex_lock(&ctx->uring_lock);
4217
4218                 io_iopoll_req_issued(req);
4219
4220                 if (in_async)
4221                         mutex_unlock(&ctx->uring_lock);
4222         }
4223
4224         return 0;
4225 }
4226
4227 static void io_wq_submit_work(struct io_wq_work **workptr)
4228 {
4229         struct io_wq_work *work = *workptr;
4230         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4231         struct io_kiocb *nxt = NULL;
4232         int ret = 0;
4233
4234         /* if NO_CANCEL is set, we must still run the work */
4235         if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
4236                                 IO_WQ_WORK_CANCEL) {
4237                 ret = -ECANCELED;
4238         }
4239
4240         if (!ret) {
4241                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
4242                 req->in_async = true;
4243                 do {
4244                         ret = io_issue_sqe(req, NULL, &nxt, false);
4245                         /*
4246                          * We can get EAGAIN for polled IO even though we're
4247                          * forcing a sync submission from here, since we can't
4248                          * wait for request slots on the block side.
4249                          */
4250                         if (ret != -EAGAIN)
4251                                 break;
4252                         cond_resched();
4253                 } while (1);
4254         }
4255
4256         /* drop submission reference */
4257         io_put_req(req);
4258
4259         if (ret) {
4260                 req_set_fail_links(req);
4261                 io_cqring_add_event(req, ret);
4262                 io_put_req(req);
4263         }
4264
4265         /* if a dependent link is ready, pass it back */
4266         if (!ret && nxt)
4267                 io_wq_assign_next(workptr, nxt);
4268 }
4269
4270 static int io_req_needs_file(struct io_kiocb *req, int fd)
4271 {
4272         if (!io_op_defs[req->opcode].needs_file)
4273                 return 0;
4274         if (fd == -1 && io_op_defs[req->opcode].fd_non_neg)
4275                 return 0;
4276         return 1;
4277 }
4278
4279 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
4280                                               int index)
4281 {
4282         struct fixed_file_table *table;
4283
4284         table = &ctx->file_data->table[index >> IORING_FILE_TABLE_SHIFT];
4285         return table->files[index & IORING_FILE_TABLE_MASK];;
4286 }
4287
4288 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req,
4289                            const struct io_uring_sqe *sqe)
4290 {
4291         struct io_ring_ctx *ctx = req->ctx;
4292         unsigned flags;
4293         int fd;
4294
4295         flags = READ_ONCE(sqe->flags);
4296         fd = READ_ONCE(sqe->fd);
4297
4298         if (flags & IOSQE_IO_DRAIN)
4299                 req->flags |= REQ_F_IO_DRAIN;
4300
4301         if (!io_req_needs_file(req, fd))
4302                 return 0;
4303
4304         if (flags & IOSQE_FIXED_FILE) {
4305                 if (unlikely(!ctx->file_data ||
4306                     (unsigned) fd >= ctx->nr_user_files))
4307                         return -EBADF;
4308                 fd = array_index_nospec(fd, ctx->nr_user_files);
4309                 req->file = io_file_from_index(ctx, fd);
4310                 if (!req->file)
4311                         return -EBADF;
4312                 req->flags |= REQ_F_FIXED_FILE;
4313                 percpu_ref_get(&ctx->file_data->refs);
4314         } else {
4315                 if (req->needs_fixed_file)
4316                         return -EBADF;
4317                 trace_io_uring_file_get(ctx, fd);
4318                 req->file = io_file_get(state, fd);
4319                 if (unlikely(!req->file))
4320                         return -EBADF;
4321         }
4322
4323         return 0;
4324 }
4325
4326 static int io_grab_files(struct io_kiocb *req)
4327 {
4328         int ret = -EBADF;
4329         struct io_ring_ctx *ctx = req->ctx;
4330
4331         if (!req->ring_file)
4332                 return -EBADF;
4333
4334         rcu_read_lock();
4335         spin_lock_irq(&ctx->inflight_lock);
4336         /*
4337          * We use the f_ops->flush() handler to ensure that we can flush
4338          * out work accessing these files if the fd is closed. Check if
4339          * the fd has changed since we started down this path, and disallow
4340          * this operation if it has.
4341          */
4342         if (fcheck(req->ring_fd) == req->ring_file) {
4343                 list_add(&req->inflight_entry, &ctx->inflight_list);
4344                 req->flags |= REQ_F_INFLIGHT;
4345                 req->work.files = current->files;
4346                 ret = 0;
4347         }
4348         spin_unlock_irq(&ctx->inflight_lock);
4349         rcu_read_unlock();
4350
4351         return ret;
4352 }
4353
4354 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
4355 {
4356         struct io_timeout_data *data = container_of(timer,
4357                                                 struct io_timeout_data, timer);
4358         struct io_kiocb *req = data->req;
4359         struct io_ring_ctx *ctx = req->ctx;
4360         struct io_kiocb *prev = NULL;
4361         unsigned long flags;
4362
4363         spin_lock_irqsave(&ctx->completion_lock, flags);
4364
4365         /*
4366          * We don't expect the list to be empty, that will only happen if we
4367          * race with the completion of the linked work.
4368          */
4369         if (!list_empty(&req->link_list)) {
4370                 prev = list_entry(req->link_list.prev, struct io_kiocb,
4371                                   link_list);
4372                 if (refcount_inc_not_zero(&prev->refs)) {
4373                         list_del_init(&req->link_list);
4374                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
4375                 } else
4376                         prev = NULL;
4377         }
4378
4379         spin_unlock_irqrestore(&ctx->completion_lock, flags);
4380
4381         if (prev) {
4382                 req_set_fail_links(prev);
4383                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
4384                                                 -ETIME);
4385                 io_put_req(prev);
4386         } else {
4387                 io_cqring_add_event(req, -ETIME);
4388                 io_put_req(req);
4389         }
4390         return HRTIMER_NORESTART;
4391 }
4392
4393 static void io_queue_linked_timeout(struct io_kiocb *req)
4394 {
4395         struct io_ring_ctx *ctx = req->ctx;
4396
4397         /*
4398          * If the list is now empty, then our linked request finished before
4399          * we got a chance to setup the timer
4400          */
4401         spin_lock_irq(&ctx->completion_lock);
4402         if (!list_empty(&req->link_list)) {
4403                 struct io_timeout_data *data = &req->io->timeout;
4404
4405                 data->timer.function = io_link_timeout_fn;
4406                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
4407                                 data->mode);
4408         }
4409         spin_unlock_irq(&ctx->completion_lock);
4410
4411         /* drop submission reference */
4412         io_put_req(req);
4413 }
4414
4415 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
4416 {
4417         struct io_kiocb *nxt;
4418
4419         if (!(req->flags & REQ_F_LINK))
4420                 return NULL;
4421
4422         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
4423                                         link_list);
4424         if (!nxt || nxt->opcode != IORING_OP_LINK_TIMEOUT)
4425                 return NULL;
4426
4427         req->flags |= REQ_F_LINK_TIMEOUT;
4428         return nxt;
4429 }
4430
4431 static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4432 {
4433         struct io_kiocb *linked_timeout;
4434         struct io_kiocb *nxt = NULL;
4435         int ret;
4436
4437 again:
4438         linked_timeout = io_prep_linked_timeout(req);
4439
4440         ret = io_issue_sqe(req, sqe, &nxt, true);
4441
4442         /*
4443          * We async punt it if the file wasn't marked NOWAIT, or if the file
4444          * doesn't support non-blocking read/write attempts
4445          */
4446         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
4447             (req->flags & REQ_F_MUST_PUNT))) {
4448                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
4449                         ret = io_grab_files(req);
4450                         if (ret)
4451                                 goto err;
4452                 }
4453
4454                 /*
4455                  * Queued up for async execution, worker will release
4456                  * submit reference when the iocb is actually submitted.
4457                  */
4458                 io_queue_async_work(req);
4459                 goto done_req;
4460         }
4461
4462 err:
4463         /* drop submission reference */
4464         io_put_req(req);
4465
4466         if (linked_timeout) {
4467                 if (!ret)
4468                         io_queue_linked_timeout(linked_timeout);
4469                 else
4470                         io_put_req(linked_timeout);
4471         }
4472
4473         /* and drop final reference, if we failed */
4474         if (ret) {
4475                 io_cqring_add_event(req, ret);
4476                 req_set_fail_links(req);
4477                 io_put_req(req);
4478         }
4479 done_req:
4480         if (nxt) {
4481                 req = nxt;
4482                 nxt = NULL;
4483                 goto again;
4484         }
4485 }
4486
4487 static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4488 {
4489         int ret;
4490
4491         if (unlikely(req->ctx->drain_next)) {
4492                 req->flags |= REQ_F_IO_DRAIN;
4493                 req->ctx->drain_next = false;
4494         }
4495         req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
4496
4497         ret = io_req_defer(req, sqe);
4498         if (ret) {
4499                 if (ret != -EIOCBQUEUED) {
4500                         io_cqring_add_event(req, ret);
4501                         req_set_fail_links(req);
4502                         io_double_put_req(req);
4503                 }
4504         } else if (req->flags & REQ_F_FORCE_ASYNC) {
4505                 /*
4506                  * Never try inline submit of IOSQE_ASYNC is set, go straight
4507                  * to async execution.
4508                  */
4509                 req->work.flags |= IO_WQ_WORK_CONCURRENT;
4510                 io_queue_async_work(req);
4511         } else {
4512                 __io_queue_sqe(req, sqe);
4513         }
4514 }
4515
4516 static inline void io_queue_link_head(struct io_kiocb *req)
4517 {
4518         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
4519                 io_cqring_add_event(req, -ECANCELED);
4520                 io_double_put_req(req);
4521         } else
4522                 io_queue_sqe(req, NULL);
4523 }
4524
4525 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
4526                                 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
4527
4528 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
4529                           struct io_submit_state *state, struct io_kiocb **link)
4530 {
4531         struct io_ring_ctx *ctx = req->ctx;
4532         unsigned int sqe_flags;
4533         int ret;
4534
4535         sqe_flags = READ_ONCE(sqe->flags);
4536
4537         /* enforce forwards compatibility on users */
4538         if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) {
4539                 ret = -EINVAL;
4540                 goto err_req;
4541         }
4542         if (sqe_flags & IOSQE_ASYNC)
4543                 req->flags |= REQ_F_FORCE_ASYNC;
4544
4545         ret = io_req_set_file(state, req, sqe);
4546         if (unlikely(ret)) {
4547 err_req:
4548                 io_cqring_add_event(req, ret);
4549                 io_double_put_req(req);
4550                 return false;
4551         }
4552
4553         /*
4554          * If we already have a head request, queue this one for async
4555          * submittal once the head completes. If we don't have a head but
4556          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
4557          * submitted sync once the chain is complete. If none of those
4558          * conditions are true (normal request), then just queue it.
4559          */
4560         if (*link) {
4561                 struct io_kiocb *head = *link;
4562
4563                 if (sqe_flags & IOSQE_IO_DRAIN)
4564                         head->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
4565
4566                 if (sqe_flags & IOSQE_IO_HARDLINK)
4567                         req->flags |= REQ_F_HARDLINK;
4568
4569                 if (io_alloc_async_ctx(req)) {
4570                         ret = -EAGAIN;
4571                         goto err_req;
4572                 }
4573
4574                 ret = io_req_defer_prep(req, sqe);
4575                 if (ret) {
4576                         /* fail even hard links since we don't submit */
4577                         head->flags |= REQ_F_FAIL_LINK;
4578                         goto err_req;
4579                 }
4580                 trace_io_uring_link(ctx, req, head);
4581                 list_add_tail(&req->link_list, &head->link_list);
4582
4583                 /* last request of a link, enqueue the link */
4584                 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK))) {
4585                         io_queue_link_head(head);
4586                         *link = NULL;
4587                 }
4588         } else if (sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
4589                 req->flags |= REQ_F_LINK;
4590                 if (sqe_flags & IOSQE_IO_HARDLINK)
4591                         req->flags |= REQ_F_HARDLINK;
4592
4593                 INIT_LIST_HEAD(&req->link_list);
4594                 ret = io_req_defer_prep(req, sqe);
4595                 if (ret)
4596                         req->flags |= REQ_F_FAIL_LINK;
4597                 *link = req;
4598         } else {
4599                 io_queue_sqe(req, sqe);
4600         }
4601
4602         return true;
4603 }
4604
4605 /*
4606  * Batched submission is done, ensure local IO is flushed out.
4607  */
4608 static void io_submit_state_end(struct io_submit_state *state)
4609 {
4610         blk_finish_plug(&state->plug);
4611         io_file_put(state);
4612         if (state->free_reqs)
4613                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
4614                                         &state->reqs[state->cur_req]);
4615 }
4616
4617 /*
4618  * Start submission side cache.
4619  */
4620 static void io_submit_state_start(struct io_submit_state *state,
4621                                   unsigned int max_ios)
4622 {
4623         blk_start_plug(&state->plug);
4624         state->free_reqs = 0;
4625         state->file = NULL;
4626         state->ios_left = max_ios;
4627 }
4628
4629 static void io_commit_sqring(struct io_ring_ctx *ctx)
4630 {
4631         struct io_rings *rings = ctx->rings;
4632
4633         /*
4634          * Ensure any loads from the SQEs are done at this point,
4635          * since once we write the new head, the application could
4636          * write new data to them.
4637          */
4638         smp_store_release(&rings->sq.head, ctx->cached_sq_head);
4639 }
4640
4641 /*
4642  * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
4643  * that is mapped by userspace. This means that care needs to be taken to
4644  * ensure that reads are stable, as we cannot rely on userspace always
4645  * being a good citizen. If members of the sqe are validated and then later
4646  * used, it's important that those reads are done through READ_ONCE() to
4647  * prevent a re-load down the line.
4648  */
4649 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req,
4650                           const struct io_uring_sqe **sqe_ptr)
4651 {
4652         u32 *sq_array = ctx->sq_array;
4653         unsigned head;
4654
4655         /*
4656          * The cached sq head (or cq tail) serves two purposes:
4657          *
4658          * 1) allows us to batch the cost of updating the user visible
4659          *    head updates.
4660          * 2) allows the kernel side to track the head on its own, even
4661          *    though the application is the one updating it.
4662          */
4663         head = READ_ONCE(sq_array[ctx->cached_sq_head & ctx->sq_mask]);
4664         if (likely(head < ctx->sq_entries)) {
4665                 /*
4666                  * All io need record the previous position, if LINK vs DARIN,
4667                  * it can be used to mark the position of the first IO in the
4668                  * link list.
4669                  */
4670                 req->sequence = ctx->cached_sq_head;
4671                 *sqe_ptr = &ctx->sq_sqes[head];
4672                 req->opcode = READ_ONCE((*sqe_ptr)->opcode);
4673                 req->user_data = READ_ONCE((*sqe_ptr)->user_data);
4674                 ctx->cached_sq_head++;
4675                 return true;
4676         }
4677
4678         /* drop invalid entries */
4679         ctx->cached_sq_head++;
4680         ctx->cached_sq_dropped++;
4681         WRITE_ONCE(ctx->rings->sq_dropped, ctx->cached_sq_dropped);
4682         return false;
4683 }
4684
4685 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
4686                           struct file *ring_file, int ring_fd,
4687                           struct mm_struct **mm, bool async)
4688 {
4689         struct io_submit_state state, *statep = NULL;
4690         struct io_kiocb *link = NULL;
4691         int i, submitted = 0;
4692         bool mm_fault = false;
4693
4694         /* if we have a backlog and couldn't flush it all, return BUSY */
4695         if (test_bit(0, &ctx->sq_check_overflow)) {
4696                 if (!list_empty(&ctx->cq_overflow_list) &&
4697                     !io_cqring_overflow_flush(ctx, false))
4698                         return -EBUSY;
4699         }
4700
4701         /* make sure SQ entry isn't read before tail */
4702         nr = min3(nr, ctx->sq_entries, io_sqring_entries(ctx));
4703
4704         if (!percpu_ref_tryget_many(&ctx->refs, nr))
4705                 return -EAGAIN;
4706
4707         if (nr > IO_PLUG_THRESHOLD) {
4708                 io_submit_state_start(&state, nr);
4709                 statep = &state;
4710         }
4711
4712         for (i = 0; i < nr; i++) {
4713                 const struct io_uring_sqe *sqe;
4714                 struct io_kiocb *req;
4715
4716                 req = io_get_req(ctx, statep);
4717                 if (unlikely(!req)) {
4718                         if (!submitted)
4719                                 submitted = -EAGAIN;
4720                         break;
4721                 }
4722                 if (!io_get_sqring(ctx, req, &sqe)) {
4723                         __io_req_do_free(req);
4724                         break;
4725                 }
4726
4727                 /* will complete beyond this point, count as submitted */
4728                 submitted++;
4729
4730                 if (unlikely(req->opcode >= IORING_OP_LAST)) {
4731                         io_cqring_add_event(req, -EINVAL);
4732                         io_double_put_req(req);
4733                         break;
4734                 }
4735
4736                 if (io_op_defs[req->opcode].needs_mm && !*mm) {
4737                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
4738                         if (!mm_fault) {
4739                                 use_mm(ctx->sqo_mm);
4740                                 *mm = ctx->sqo_mm;
4741                         }
4742                 }
4743
4744                 req->ring_file = ring_file;
4745                 req->ring_fd = ring_fd;
4746                 req->has_user = *mm != NULL;
4747                 req->in_async = async;
4748                 req->needs_fixed_file = async;
4749                 trace_io_uring_submit_sqe(ctx, req->user_data, true, async);
4750                 if (!io_submit_sqe(req, sqe, statep, &link))
4751                         break;
4752         }
4753
4754         if (submitted != nr)
4755                 percpu_ref_put_many(&ctx->refs, nr - submitted);
4756         if (link)
4757                 io_queue_link_head(link);
4758         if (statep)
4759                 io_submit_state_end(&state);
4760
4761          /* Commit SQ ring head once we've consumed and submitted all SQEs */
4762         io_commit_sqring(ctx);
4763
4764         return submitted;
4765 }
4766
4767 static int io_sq_thread(void *data)
4768 {
4769         struct io_ring_ctx *ctx = data;
4770         struct mm_struct *cur_mm = NULL;
4771         const struct cred *old_cred;
4772         mm_segment_t old_fs;
4773         DEFINE_WAIT(wait);
4774         unsigned inflight;
4775         unsigned long timeout;
4776         int ret;
4777
4778         complete(&ctx->completions[1]);
4779
4780         old_fs = get_fs();
4781         set_fs(USER_DS);
4782         old_cred = override_creds(ctx->creds);
4783
4784         ret = timeout = inflight = 0;
4785         while (!kthread_should_park()) {
4786                 unsigned int to_submit;
4787
4788                 if (inflight) {
4789                         unsigned nr_events = 0;
4790
4791                         if (ctx->flags & IORING_SETUP_IOPOLL) {
4792                                 /*
4793                                  * inflight is the count of the maximum possible
4794                                  * entries we submitted, but it can be smaller
4795                                  * if we dropped some of them. If we don't have
4796                                  * poll entries available, then we know that we
4797                                  * have nothing left to poll for. Reset the
4798                                  * inflight count to zero in that case.
4799                                  */
4800                                 mutex_lock(&ctx->uring_lock);
4801                                 if (!list_empty(&ctx->poll_list))
4802                                         __io_iopoll_check(ctx, &nr_events, 0);
4803                                 else
4804                                         inflight = 0;
4805                                 mutex_unlock(&ctx->uring_lock);
4806                         } else {
4807                                 /*
4808                                  * Normal IO, just pretend everything completed.
4809                                  * We don't have to poll completions for that.
4810                                  */
4811                                 nr_events = inflight;
4812                         }
4813
4814                         inflight -= nr_events;
4815                         if (!inflight)
4816                                 timeout = jiffies + ctx->sq_thread_idle;
4817                 }
4818
4819                 to_submit = io_sqring_entries(ctx);
4820
4821                 /*
4822                  * If submit got -EBUSY, flag us as needing the application
4823                  * to enter the kernel to reap and flush events.
4824                  */
4825                 if (!to_submit || ret == -EBUSY) {
4826                         /*
4827                          * We're polling. If we're within the defined idle
4828                          * period, then let us spin without work before going
4829                          * to sleep. The exception is if we got EBUSY doing
4830                          * more IO, we should wait for the application to
4831                          * reap events and wake us up.
4832                          */
4833                         if (inflight ||
4834                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
4835                                 cond_resched();
4836                                 continue;
4837                         }
4838
4839                         /*
4840                          * Drop cur_mm before scheduling, we can't hold it for
4841                          * long periods (or over schedule()). Do this before
4842                          * adding ourselves to the waitqueue, as the unuse/drop
4843                          * may sleep.
4844                          */
4845                         if (cur_mm) {
4846                                 unuse_mm(cur_mm);
4847                                 mmput(cur_mm);
4848                                 cur_mm = NULL;
4849                         }
4850
4851                         prepare_to_wait(&ctx->sqo_wait, &wait,
4852                                                 TASK_INTERRUPTIBLE);
4853
4854                         /* Tell userspace we may need a wakeup call */
4855                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
4856                         /* make sure to read SQ tail after writing flags */
4857                         smp_mb();
4858
4859                         to_submit = io_sqring_entries(ctx);
4860                         if (!to_submit || ret == -EBUSY) {
4861                                 if (kthread_should_park()) {
4862                                         finish_wait(&ctx->sqo_wait, &wait);
4863                                         break;
4864                                 }
4865                                 if (signal_pending(current))
4866                                         flush_signals(current);
4867                                 schedule();
4868                                 finish_wait(&ctx->sqo_wait, &wait);
4869
4870                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4871                                 continue;
4872                         }
4873                         finish_wait(&ctx->sqo_wait, &wait);
4874
4875                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
4876                 }
4877
4878                 mutex_lock(&ctx->uring_lock);
4879                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
4880                 mutex_unlock(&ctx->uring_lock);
4881                 if (ret > 0)
4882                         inflight += ret;
4883         }
4884
4885         set_fs(old_fs);
4886         if (cur_mm) {
4887                 unuse_mm(cur_mm);
4888                 mmput(cur_mm);
4889         }
4890         revert_creds(old_cred);
4891
4892         kthread_parkme();
4893
4894         return 0;
4895 }
4896
4897 struct io_wait_queue {
4898         struct wait_queue_entry wq;
4899         struct io_ring_ctx *ctx;
4900         unsigned to_wait;
4901         unsigned nr_timeouts;
4902 };
4903
4904 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
4905 {
4906         struct io_ring_ctx *ctx = iowq->ctx;
4907
4908         /*
4909          * Wake up if we have enough events, or if a timeout occurred since we
4910          * started waiting. For timeouts, we always want to return to userspace,
4911          * regardless of event count.
4912          */
4913         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
4914                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
4915 }
4916
4917 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
4918                             int wake_flags, void *key)
4919 {
4920         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
4921                                                         wq);
4922
4923         /* use noflush == true, as we can't safely rely on locking context */
4924         if (!io_should_wake(iowq, true))
4925                 return -1;
4926
4927         return autoremove_wake_function(curr, mode, wake_flags, key);
4928 }
4929
4930 /*
4931  * Wait until events become available, if we don't already have some. The
4932  * application must reap them itself, as they reside on the shared cq ring.
4933  */
4934 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
4935                           const sigset_t __user *sig, size_t sigsz)
4936 {
4937         struct io_wait_queue iowq = {
4938                 .wq = {
4939                         .private        = current,
4940                         .func           = io_wake_function,
4941                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
4942                 },
4943                 .ctx            = ctx,
4944                 .to_wait        = min_events,
4945         };
4946         struct io_rings *rings = ctx->rings;
4947         int ret = 0;
4948
4949         if (io_cqring_events(ctx, false) >= min_events)
4950                 return 0;
4951
4952         if (sig) {
4953 #ifdef CONFIG_COMPAT
4954                 if (in_compat_syscall())
4955                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
4956                                                       sigsz);
4957                 else
4958 #endif
4959                         ret = set_user_sigmask(sig, sigsz);
4960
4961                 if (ret)
4962                         return ret;
4963         }
4964
4965         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
4966         trace_io_uring_cqring_wait(ctx, min_events);
4967         do {
4968                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
4969                                                 TASK_INTERRUPTIBLE);
4970                 if (io_should_wake(&iowq, false))
4971                         break;
4972                 schedule();
4973                 if (signal_pending(current)) {
4974                         ret = -EINTR;
4975                         break;
4976                 }
4977         } while (1);
4978         finish_wait(&ctx->wait, &iowq.wq);
4979
4980         restore_saved_sigmask_unless(ret == -EINTR);
4981
4982         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
4983 }
4984
4985 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
4986 {
4987 #if defined(CONFIG_UNIX)
4988         if (ctx->ring_sock) {
4989                 struct sock *sock = ctx->ring_sock->sk;
4990                 struct sk_buff *skb;
4991
4992                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
4993                         kfree_skb(skb);
4994         }
4995 #else
4996         int i;
4997
4998         for (i = 0; i < ctx->nr_user_files; i++) {
4999                 struct file *file;
5000
5001                 file = io_file_from_index(ctx, i);
5002                 if (file)
5003                         fput(file);
5004         }
5005 #endif
5006 }
5007
5008 static void io_file_ref_kill(struct percpu_ref *ref)
5009 {
5010         struct fixed_file_data *data;
5011
5012         data = container_of(ref, struct fixed_file_data, refs);
5013         complete(&data->done);
5014 }
5015
5016 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
5017 {
5018         struct fixed_file_data *data = ctx->file_data;
5019         unsigned nr_tables, i;
5020
5021         if (!data)
5022                 return -ENXIO;
5023
5024         /* protect against inflight atomic switch, which drops the ref */
5025         flush_work(&data->ref_work);
5026         percpu_ref_get(&data->refs);
5027         percpu_ref_kill_and_confirm(&data->refs, io_file_ref_kill);
5028         wait_for_completion(&data->done);
5029         percpu_ref_put(&data->refs);
5030         percpu_ref_exit(&data->refs);
5031
5032         __io_sqe_files_unregister(ctx);
5033         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
5034         for (i = 0; i < nr_tables; i++)
5035                 kfree(data->table[i].files);
5036         kfree(data->table);
5037         kfree(data);
5038         ctx->file_data = NULL;
5039         ctx->nr_user_files = 0;
5040         return 0;
5041 }
5042
5043 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
5044 {
5045         if (ctx->sqo_thread) {
5046                 wait_for_completion(&ctx->completions[1]);
5047                 /*
5048                  * The park is a bit of a work-around, without it we get
5049                  * warning spews on shutdown with SQPOLL set and affinity
5050                  * set to a single CPU.
5051                  */
5052                 kthread_park(ctx->sqo_thread);
5053                 kthread_stop(ctx->sqo_thread);
5054                 ctx->sqo_thread = NULL;
5055         }
5056 }
5057
5058 static void io_finish_async(struct io_ring_ctx *ctx)
5059 {
5060         io_sq_thread_stop(ctx);
5061
5062         if (ctx->io_wq) {
5063                 io_wq_destroy(ctx->io_wq);
5064                 ctx->io_wq = NULL;
5065         }
5066 }
5067
5068 #if defined(CONFIG_UNIX)
5069 /*
5070  * Ensure the UNIX gc is aware of our file set, so we are certain that
5071  * the io_uring can be safely unregistered on process exit, even if we have
5072  * loops in the file referencing.
5073  */
5074 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
5075 {
5076         struct sock *sk = ctx->ring_sock->sk;
5077         struct scm_fp_list *fpl;
5078         struct sk_buff *skb;
5079         int i, nr_files;
5080
5081         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
5082                 unsigned long inflight = ctx->user->unix_inflight + nr;
5083
5084                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
5085                         return -EMFILE;
5086         }
5087
5088         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
5089         if (!fpl)
5090                 return -ENOMEM;
5091
5092         skb = alloc_skb(0, GFP_KERNEL);
5093         if (!skb) {
5094                 kfree(fpl);
5095                 return -ENOMEM;
5096         }
5097
5098         skb->sk = sk;
5099
5100         nr_files = 0;
5101         fpl->user = get_uid(ctx->user);
5102         for (i = 0; i < nr; i++) {
5103                 struct file *file = io_file_from_index(ctx, i + offset);
5104
5105                 if (!file)
5106                         continue;
5107                 fpl->fp[nr_files] = get_file(file);
5108                 unix_inflight(fpl->user, fpl->fp[nr_files]);
5109                 nr_files++;
5110         }
5111
5112         if (nr_files) {
5113                 fpl->max = SCM_MAX_FD;
5114                 fpl->count = nr_files;
5115                 UNIXCB(skb).fp = fpl;
5116                 skb->destructor = unix_destruct_scm;
5117                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
5118                 skb_queue_head(&sk->sk_receive_queue, skb);
5119
5120                 for (i = 0; i < nr_files; i++)
5121                         fput(fpl->fp[i]);
5122         } else {
5123                 kfree_skb(skb);
5124                 kfree(fpl);
5125         }
5126
5127         return 0;
5128 }
5129
5130 /*
5131  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
5132  * causes regular reference counting to break down. We rely on the UNIX
5133  * garbage collection to take care of this problem for us.
5134  */
5135 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5136 {
5137         unsigned left, total;
5138         int ret = 0;
5139
5140         total = 0;
5141         left = ctx->nr_user_files;
5142         while (left) {
5143                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
5144
5145                 ret = __io_sqe_files_scm(ctx, this_files, total);
5146                 if (ret)
5147                         break;
5148                 left -= this_files;
5149                 total += this_files;
5150         }
5151
5152         if (!ret)
5153                 return 0;
5154
5155         while (total < ctx->nr_user_files) {
5156                 struct file *file = io_file_from_index(ctx, total);
5157
5158                 if (file)
5159                         fput(file);
5160                 total++;
5161         }
5162
5163         return ret;
5164 }
5165 #else
5166 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
5167 {
5168         return 0;
5169 }
5170 #endif
5171
5172 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
5173                                     unsigned nr_files)
5174 {
5175         int i;
5176
5177         for (i = 0; i < nr_tables; i++) {
5178                 struct fixed_file_table *table = &ctx->file_data->table[i];
5179                 unsigned this_files;
5180
5181                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
5182                 table->files = kcalloc(this_files, sizeof(struct file *),
5183                                         GFP_KERNEL);
5184                 if (!table->files)
5185                         break;
5186                 nr_files -= this_files;
5187         }
5188
5189         if (i == nr_tables)
5190                 return 0;
5191
5192         for (i = 0; i < nr_tables; i++) {
5193                 struct fixed_file_table *table = &ctx->file_data->table[i];
5194                 kfree(table->files);
5195         }
5196         return 1;
5197 }
5198
5199 static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file)
5200 {
5201 #if defined(CONFIG_UNIX)
5202         struct sock *sock = ctx->ring_sock->sk;
5203         struct sk_buff_head list, *head = &sock->sk_receive_queue;
5204         struct sk_buff *skb;
5205         int i;
5206
5207         __skb_queue_head_init(&list);
5208
5209         /*
5210          * Find the skb that holds this file in its SCM_RIGHTS. When found,
5211          * remove this entry and rearrange the file array.
5212          */
5213         skb = skb_dequeue(head);
5214         while (skb) {
5215                 struct scm_fp_list *fp;
5216
5217                 fp = UNIXCB(skb).fp;
5218                 for (i = 0; i < fp->count; i++) {
5219                         int left;
5220
5221                         if (fp->fp[i] != file)
5222                                 continue;
5223
5224                         unix_notinflight(fp->user, fp->fp[i]);
5225                         left = fp->count - 1 - i;
5226                         if (left) {
5227                                 memmove(&fp->fp[i], &fp->fp[i + 1],
5228                                                 left * sizeof(struct file *));
5229                         }
5230                         fp->count--;
5231                         if (!fp->count) {
5232                                 kfree_skb(skb);
5233                                 skb = NULL;
5234                         } else {
5235                                 __skb_queue_tail(&list, skb);
5236                         }
5237                         fput(file);
5238                         file = NULL;
5239                         break;
5240                 }
5241
5242                 if (!file)
5243                         break;
5244
5245                 __skb_queue_tail(&list, skb);
5246
5247                 skb = skb_dequeue(head);
5248         }
5249
5250         if (skb_peek(&list)) {
5251                 spin_lock_irq(&head->lock);
5252                 while ((skb = __skb_dequeue(&list)) != NULL)
5253                         __skb_queue_tail(head, skb);
5254                 spin_unlock_irq(&head->lock);
5255         }
5256 #else
5257         fput(file);
5258 #endif
5259 }
5260
5261 struct io_file_put {
5262         struct llist_node llist;
5263         struct file *file;
5264         struct completion *done;
5265 };
5266
5267 static void io_ring_file_ref_switch(struct work_struct *work)
5268 {
5269         struct io_file_put *pfile, *tmp;
5270         struct fixed_file_data *data;
5271         struct llist_node *node;
5272
5273         data = container_of(work, struct fixed_file_data, ref_work);
5274
5275         while ((node = llist_del_all(&data->put_llist)) != NULL) {
5276                 llist_for_each_entry_safe(pfile, tmp, node, llist) {
5277                         io_ring_file_put(data->ctx, pfile->file);
5278                         if (pfile->done)
5279                                 complete(pfile->done);
5280                         else
5281                                 kfree(pfile);
5282                 }
5283         }
5284
5285         percpu_ref_get(&data->refs);
5286         percpu_ref_switch_to_percpu(&data->refs);
5287 }
5288
5289 static void io_file_data_ref_zero(struct percpu_ref *ref)
5290 {
5291         struct fixed_file_data *data;
5292
5293         data = container_of(ref, struct fixed_file_data, refs);
5294
5295         /* we can't safely switch from inside this context, punt to wq */
5296         queue_work(system_wq, &data->ref_work);
5297 }
5298
5299 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
5300                                  unsigned nr_args)
5301 {
5302         __s32 __user *fds = (__s32 __user *) arg;
5303         unsigned nr_tables;
5304         struct file *file;
5305         int fd, ret = 0;
5306         unsigned i;
5307
5308         if (ctx->file_data)
5309                 return -EBUSY;
5310         if (!nr_args)
5311                 return -EINVAL;
5312         if (nr_args > IORING_MAX_FIXED_FILES)
5313                 return -EMFILE;
5314
5315         ctx->file_data = kzalloc(sizeof(*ctx->file_data), GFP_KERNEL);
5316         if (!ctx->file_data)
5317                 return -ENOMEM;
5318         ctx->file_data->ctx = ctx;
5319         init_completion(&ctx->file_data->done);
5320
5321         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
5322         ctx->file_data->table = kcalloc(nr_tables,
5323                                         sizeof(struct fixed_file_table),
5324                                         GFP_KERNEL);
5325         if (!ctx->file_data->table) {
5326                 kfree(ctx->file_data);
5327                 ctx->file_data = NULL;
5328                 return -ENOMEM;
5329         }
5330
5331         if (percpu_ref_init(&ctx->file_data->refs, io_file_data_ref_zero,
5332                                 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
5333                 kfree(ctx->file_data->table);
5334                 kfree(ctx->file_data);
5335                 ctx->file_data = NULL;
5336                 return -ENOMEM;
5337         }
5338         ctx->file_data->put_llist.first = NULL;
5339         INIT_WORK(&ctx->file_data->ref_work, io_ring_file_ref_switch);
5340
5341         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
5342                 percpu_ref_exit(&ctx->file_data->refs);
5343                 kfree(ctx->file_data->table);
5344                 kfree(ctx->file_data);
5345                 ctx->file_data = NULL;
5346                 return -ENOMEM;
5347         }
5348
5349         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
5350                 struct fixed_file_table *table;
5351                 unsigned index;
5352
5353                 ret = -EFAULT;
5354                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
5355                         break;
5356                 /* allow sparse sets */
5357                 if (fd == -1) {
5358                         ret = 0;
5359                         continue;
5360                 }
5361
5362                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5363                 index = i & IORING_FILE_TABLE_MASK;
5364                 file = fget(fd);
5365
5366                 ret = -EBADF;
5367                 if (!file)
5368                         break;
5369
5370                 /*
5371                  * Don't allow io_uring instances to be registered. If UNIX
5372                  * isn't enabled, then this causes a reference cycle and this
5373                  * instance can never get freed. If UNIX is enabled we'll
5374                  * handle it just fine, but there's still no point in allowing
5375                  * a ring fd as it doesn't support regular read/write anyway.
5376                  */
5377                 if (file->f_op == &io_uring_fops) {
5378                         fput(file);
5379                         break;
5380                 }
5381                 ret = 0;
5382                 table->files[index] = file;
5383         }
5384
5385         if (ret) {
5386                 for (i = 0; i < ctx->nr_user_files; i++) {
5387                         file = io_file_from_index(ctx, i);
5388                         if (file)
5389                                 fput(file);
5390                 }
5391                 for (i = 0; i < nr_tables; i++)
5392                         kfree(ctx->file_data->table[i].files);
5393
5394                 kfree(ctx->file_data->table);
5395                 kfree(ctx->file_data);
5396                 ctx->file_data = NULL;
5397                 ctx->nr_user_files = 0;
5398                 return ret;
5399         }
5400
5401         ret = io_sqe_files_scm(ctx);
5402         if (ret)
5403                 io_sqe_files_unregister(ctx);
5404
5405         return ret;
5406 }
5407
5408 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
5409                                 int index)
5410 {
5411 #if defined(CONFIG_UNIX)
5412         struct sock *sock = ctx->ring_sock->sk;
5413         struct sk_buff_head *head = &sock->sk_receive_queue;
5414         struct sk_buff *skb;
5415
5416         /*
5417          * See if we can merge this file into an existing skb SCM_RIGHTS
5418          * file set. If there's no room, fall back to allocating a new skb
5419          * and filling it in.
5420          */
5421         spin_lock_irq(&head->lock);
5422         skb = skb_peek(head);
5423         if (skb) {
5424                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
5425
5426                 if (fpl->count < SCM_MAX_FD) {
5427                         __skb_unlink(skb, head);
5428                         spin_unlock_irq(&head->lock);
5429                         fpl->fp[fpl->count] = get_file(file);
5430                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
5431                         fpl->count++;
5432                         spin_lock_irq(&head->lock);
5433                         __skb_queue_head(head, skb);
5434                 } else {
5435                         skb = NULL;
5436                 }
5437         }
5438         spin_unlock_irq(&head->lock);
5439
5440         if (skb) {
5441                 fput(file);
5442                 return 0;
5443         }
5444
5445         return __io_sqe_files_scm(ctx, 1, index);
5446 #else
5447         return 0;
5448 #endif
5449 }
5450
5451 static void io_atomic_switch(struct percpu_ref *ref)
5452 {
5453         struct fixed_file_data *data;
5454
5455         data = container_of(ref, struct fixed_file_data, refs);
5456         clear_bit(FFD_F_ATOMIC, &data->state);
5457 }
5458
5459 static bool io_queue_file_removal(struct fixed_file_data *data,
5460                                   struct file *file)
5461 {
5462         struct io_file_put *pfile, pfile_stack;
5463         DECLARE_COMPLETION_ONSTACK(done);
5464
5465         /*
5466          * If we fail allocating the struct we need for doing async reomval
5467          * of this file, just punt to sync and wait for it.
5468          */
5469         pfile = kzalloc(sizeof(*pfile), GFP_KERNEL);
5470         if (!pfile) {
5471                 pfile = &pfile_stack;
5472                 pfile->done = &done;
5473         }
5474
5475         pfile->file = file;
5476         llist_add(&pfile->llist, &data->put_llist);
5477
5478         if (pfile == &pfile_stack) {
5479                 if (!test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5480                         percpu_ref_put(&data->refs);
5481                         percpu_ref_switch_to_atomic(&data->refs,
5482                                                         io_atomic_switch);
5483                 }
5484                 wait_for_completion(&done);
5485                 flush_work(&data->ref_work);
5486                 return false;
5487         }
5488
5489         return true;
5490 }
5491
5492 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
5493                                  struct io_uring_files_update *up,
5494                                  unsigned nr_args)
5495 {
5496         struct fixed_file_data *data = ctx->file_data;
5497         bool ref_switch = false;
5498         struct file *file;
5499         __s32 __user *fds;
5500         int fd, i, err;
5501         __u32 done;
5502
5503         if (check_add_overflow(up->offset, nr_args, &done))
5504                 return -EOVERFLOW;
5505         if (done > ctx->nr_user_files)
5506                 return -EINVAL;
5507
5508         done = 0;
5509         fds = u64_to_user_ptr(up->fds);
5510         while (nr_args) {
5511                 struct fixed_file_table *table;
5512                 unsigned index;
5513
5514                 err = 0;
5515                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
5516                         err = -EFAULT;
5517                         break;
5518                 }
5519                 i = array_index_nospec(up->offset, ctx->nr_user_files);
5520                 table = &ctx->file_data->table[i >> IORING_FILE_TABLE_SHIFT];
5521                 index = i & IORING_FILE_TABLE_MASK;
5522                 if (table->files[index]) {
5523                         file = io_file_from_index(ctx, index);
5524                         table->files[index] = NULL;
5525                         if (io_queue_file_removal(data, file))
5526                                 ref_switch = true;
5527                 }
5528                 if (fd != -1) {
5529                         file = fget(fd);
5530                         if (!file) {
5531                                 err = -EBADF;
5532                                 break;
5533                         }
5534                         /*
5535                          * Don't allow io_uring instances to be registered. If
5536                          * UNIX isn't enabled, then this causes a reference
5537                          * cycle and this instance can never get freed. If UNIX
5538                          * is enabled we'll handle it just fine, but there's
5539                          * still no point in allowing a ring fd as it doesn't
5540                          * support regular read/write anyway.
5541                          */
5542                         if (file->f_op == &io_uring_fops) {
5543                                 fput(file);
5544                                 err = -EBADF;
5545                                 break;
5546                         }
5547                         table->files[index] = file;
5548                         err = io_sqe_file_register(ctx, file, i);
5549                         if (err)
5550                                 break;
5551                 }
5552                 nr_args--;
5553                 done++;
5554                 up->offset++;
5555         }
5556
5557         if (ref_switch && !test_and_set_bit(FFD_F_ATOMIC, &data->state)) {
5558                 percpu_ref_put(&data->refs);
5559                 percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch);
5560         }
5561
5562         return done ? done : err;
5563 }
5564 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
5565                                unsigned nr_args)
5566 {
5567         struct io_uring_files_update up;
5568
5569         if (!ctx->file_data)
5570                 return -ENXIO;
5571         if (!nr_args)
5572                 return -EINVAL;
5573         if (copy_from_user(&up, arg, sizeof(up)))
5574                 return -EFAULT;
5575         if (up.resv)
5576                 return -EINVAL;
5577
5578         return __io_sqe_files_update(ctx, &up, nr_args);
5579 }
5580
5581 static void io_put_work(struct io_wq_work *work)
5582 {
5583         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5584
5585         io_put_req(req);
5586 }
5587
5588 static void io_get_work(struct io_wq_work *work)
5589 {
5590         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
5591
5592         refcount_inc(&req->refs);
5593 }
5594
5595 static int io_sq_offload_start(struct io_ring_ctx *ctx,
5596                                struct io_uring_params *p)
5597 {
5598         struct io_wq_data data;
5599         unsigned concurrency;
5600         int ret;
5601
5602         init_waitqueue_head(&ctx->sqo_wait);
5603         mmgrab(current->mm);
5604         ctx->sqo_mm = current->mm;
5605
5606         if (ctx->flags & IORING_SETUP_SQPOLL) {
5607                 ret = -EPERM;
5608                 if (!capable(CAP_SYS_ADMIN))
5609                         goto err;
5610
5611                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
5612                 if (!ctx->sq_thread_idle)
5613                         ctx->sq_thread_idle = HZ;
5614
5615                 if (p->flags & IORING_SETUP_SQ_AFF) {
5616                         int cpu = p->sq_thread_cpu;
5617
5618                         ret = -EINVAL;
5619                         if (cpu >= nr_cpu_ids)
5620                                 goto err;
5621                         if (!cpu_online(cpu))
5622                                 goto err;
5623
5624                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
5625                                                         ctx, cpu,
5626                                                         "io_uring-sq");
5627                 } else {
5628                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
5629                                                         "io_uring-sq");
5630                 }
5631                 if (IS_ERR(ctx->sqo_thread)) {
5632                         ret = PTR_ERR(ctx->sqo_thread);
5633                         ctx->sqo_thread = NULL;
5634                         goto err;
5635                 }
5636                 wake_up_process(ctx->sqo_thread);
5637         } else if (p->flags & IORING_SETUP_SQ_AFF) {
5638                 /* Can't have SQ_AFF without SQPOLL */
5639                 ret = -EINVAL;
5640                 goto err;
5641         }
5642
5643         data.mm = ctx->sqo_mm;
5644         data.user = ctx->user;
5645         data.creds = ctx->creds;
5646         data.get_work = io_get_work;
5647         data.put_work = io_put_work;
5648
5649         /* Do QD, or 4 * CPUS, whatever is smallest */
5650         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
5651         ctx->io_wq = io_wq_create(concurrency, &data);
5652         if (IS_ERR(ctx->io_wq)) {
5653                 ret = PTR_ERR(ctx->io_wq);
5654                 ctx->io_wq = NULL;
5655                 goto err;
5656         }
5657
5658         return 0;
5659 err:
5660         io_finish_async(ctx);
5661         mmdrop(ctx->sqo_mm);
5662         ctx->sqo_mm = NULL;
5663         return ret;
5664 }
5665
5666 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
5667 {
5668         atomic_long_sub(nr_pages, &user->locked_vm);
5669 }
5670
5671 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
5672 {
5673         unsigned long page_limit, cur_pages, new_pages;
5674
5675         /* Don't allow more pages than we can safely lock */
5676         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
5677
5678         do {
5679                 cur_pages = atomic_long_read(&user->locked_vm);
5680                 new_pages = cur_pages + nr_pages;
5681                 if (new_pages > page_limit)
5682                         return -ENOMEM;
5683         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
5684                                         new_pages) != cur_pages);
5685
5686         return 0;
5687 }
5688
5689 static void io_mem_free(void *ptr)
5690 {
5691         struct page *page;
5692
5693         if (!ptr)
5694                 return;
5695
5696         page = virt_to_head_page(ptr);
5697         if (put_page_testzero(page))
5698                 free_compound_page(page);
5699 }
5700
5701 static void *io_mem_alloc(size_t size)
5702 {
5703         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
5704                                 __GFP_NORETRY;
5705
5706         return (void *) __get_free_pages(gfp_flags, get_order(size));
5707 }
5708
5709 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
5710                                 size_t *sq_offset)
5711 {
5712         struct io_rings *rings;
5713         size_t off, sq_array_size;
5714
5715         off = struct_size(rings, cqes, cq_entries);
5716         if (off == SIZE_MAX)
5717                 return SIZE_MAX;
5718
5719 #ifdef CONFIG_SMP
5720         off = ALIGN(off, SMP_CACHE_BYTES);
5721         if (off == 0)
5722                 return SIZE_MAX;
5723 #endif
5724
5725         sq_array_size = array_size(sizeof(u32), sq_entries);
5726         if (sq_array_size == SIZE_MAX)
5727                 return SIZE_MAX;
5728
5729         if (check_add_overflow(off, sq_array_size, &off))
5730                 return SIZE_MAX;
5731
5732         if (sq_offset)
5733                 *sq_offset = off;
5734
5735         return off;
5736 }
5737
5738 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
5739 {
5740         size_t pages;
5741
5742         pages = (size_t)1 << get_order(
5743                 rings_size(sq_entries, cq_entries, NULL));
5744         pages += (size_t)1 << get_order(
5745                 array_size(sizeof(struct io_uring_sqe), sq_entries));
5746
5747         return pages;
5748 }
5749
5750 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
5751 {
5752         int i, j;
5753
5754         if (!ctx->user_bufs)
5755                 return -ENXIO;
5756
5757         for (i = 0; i < ctx->nr_user_bufs; i++) {
5758                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5759
5760                 for (j = 0; j < imu->nr_bvecs; j++)
5761                         put_user_page(imu->bvec[j].bv_page);
5762
5763                 if (ctx->account_mem)
5764                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
5765                 kvfree(imu->bvec);
5766                 imu->nr_bvecs = 0;
5767         }
5768
5769         kfree(ctx->user_bufs);
5770         ctx->user_bufs = NULL;
5771         ctx->nr_user_bufs = 0;
5772         return 0;
5773 }
5774
5775 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
5776                        void __user *arg, unsigned index)
5777 {
5778         struct iovec __user *src;
5779
5780 #ifdef CONFIG_COMPAT
5781         if (ctx->compat) {
5782                 struct compat_iovec __user *ciovs;
5783                 struct compat_iovec ciov;
5784
5785                 ciovs = (struct compat_iovec __user *) arg;
5786                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
5787                         return -EFAULT;
5788
5789                 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
5790                 dst->iov_len = ciov.iov_len;
5791                 return 0;
5792         }
5793 #endif
5794         src = (struct iovec __user *) arg;
5795         if (copy_from_user(dst, &src[index], sizeof(*dst)))
5796                 return -EFAULT;
5797         return 0;
5798 }
5799
5800 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
5801                                   unsigned nr_args)
5802 {
5803         struct vm_area_struct **vmas = NULL;
5804         struct page **pages = NULL;
5805         int i, j, got_pages = 0;
5806         int ret = -EINVAL;
5807
5808         if (ctx->user_bufs)
5809                 return -EBUSY;
5810         if (!nr_args || nr_args > UIO_MAXIOV)
5811                 return -EINVAL;
5812
5813         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
5814                                         GFP_KERNEL);
5815         if (!ctx->user_bufs)
5816                 return -ENOMEM;
5817
5818         for (i = 0; i < nr_args; i++) {
5819                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
5820                 unsigned long off, start, end, ubuf;
5821                 int pret, nr_pages;
5822                 struct iovec iov;
5823                 size_t size;
5824
5825                 ret = io_copy_iov(ctx, &iov, arg, i);
5826                 if (ret)
5827                         goto err;
5828
5829                 /*
5830                  * Don't impose further limits on the size and buffer
5831                  * constraints here, we'll -EINVAL later when IO is
5832                  * submitted if they are wrong.
5833                  */
5834                 ret = -EFAULT;
5835                 if (!iov.iov_base || !iov.iov_len)
5836                         goto err;
5837
5838                 /* arbitrary limit, but we need something */
5839                 if (iov.iov_len > SZ_1G)
5840                         goto err;
5841
5842                 ubuf = (unsigned long) iov.iov_base;
5843                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
5844                 start = ubuf >> PAGE_SHIFT;
5845                 nr_pages = end - start;
5846
5847                 if (ctx->account_mem) {
5848                         ret = io_account_mem(ctx->user, nr_pages);
5849                         if (ret)
5850                                 goto err;
5851                 }
5852
5853                 ret = 0;
5854                 if (!pages || nr_pages > got_pages) {
5855                         kfree(vmas);
5856                         kfree(pages);
5857                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
5858                                                 GFP_KERNEL);
5859                         vmas = kvmalloc_array(nr_pages,
5860                                         sizeof(struct vm_area_struct *),
5861                                         GFP_KERNEL);
5862                         if (!pages || !vmas) {
5863                                 ret = -ENOMEM;
5864                                 if (ctx->account_mem)
5865                                         io_unaccount_mem(ctx->user, nr_pages);
5866                                 goto err;
5867                         }
5868                         got_pages = nr_pages;
5869                 }
5870
5871                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
5872                                                 GFP_KERNEL);
5873                 ret = -ENOMEM;
5874                 if (!imu->bvec) {
5875                         if (ctx->account_mem)
5876                                 io_unaccount_mem(ctx->user, nr_pages);
5877                         goto err;
5878                 }
5879
5880                 ret = 0;
5881                 down_read(&current->mm->mmap_sem);
5882                 pret = get_user_pages(ubuf, nr_pages,
5883                                       FOLL_WRITE | FOLL_LONGTERM,
5884                                       pages, vmas);
5885                 if (pret == nr_pages) {
5886                         /* don't support file backed memory */
5887                         for (j = 0; j < nr_pages; j++) {
5888                                 struct vm_area_struct *vma = vmas[j];
5889
5890                                 if (vma->vm_file &&
5891                                     !is_file_hugepages(vma->vm_file)) {
5892                                         ret = -EOPNOTSUPP;
5893                                         break;
5894                                 }
5895                         }
5896                 } else {
5897                         ret = pret < 0 ? pret : -EFAULT;
5898                 }
5899                 up_read(&current->mm->mmap_sem);
5900                 if (ret) {
5901                         /*
5902                          * if we did partial map, or found file backed vmas,
5903                          * release any pages we did get
5904                          */
5905                         if (pret > 0)
5906                                 put_user_pages(pages, pret);
5907                         if (ctx->account_mem)
5908                                 io_unaccount_mem(ctx->user, nr_pages);
5909                         kvfree(imu->bvec);
5910                         goto err;
5911                 }
5912
5913                 off = ubuf & ~PAGE_MASK;
5914                 size = iov.iov_len;
5915                 for (j = 0; j < nr_pages; j++) {
5916                         size_t vec_len;
5917
5918                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
5919                         imu->bvec[j].bv_page = pages[j];
5920                         imu->bvec[j].bv_len = vec_len;
5921                         imu->bvec[j].bv_offset = off;
5922                         off = 0;
5923                         size -= vec_len;
5924                 }
5925                 /* store original address for later verification */
5926                 imu->ubuf = ubuf;
5927                 imu->len = iov.iov_len;
5928                 imu->nr_bvecs = nr_pages;
5929
5930                 ctx->nr_user_bufs++;
5931         }
5932         kvfree(pages);
5933         kvfree(vmas);
5934         return 0;
5935 err:
5936         kvfree(pages);
5937         kvfree(vmas);
5938         io_sqe_buffer_unregister(ctx);
5939         return ret;
5940 }
5941
5942 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
5943 {
5944         __s32 __user *fds = arg;
5945         int fd;
5946
5947         if (ctx->cq_ev_fd)
5948                 return -EBUSY;
5949
5950         if (copy_from_user(&fd, fds, sizeof(*fds)))
5951                 return -EFAULT;
5952
5953         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
5954         if (IS_ERR(ctx->cq_ev_fd)) {
5955                 int ret = PTR_ERR(ctx->cq_ev_fd);
5956                 ctx->cq_ev_fd = NULL;
5957                 return ret;
5958         }
5959
5960         return 0;
5961 }
5962
5963 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
5964 {
5965         if (ctx->cq_ev_fd) {
5966                 eventfd_ctx_put(ctx->cq_ev_fd);
5967                 ctx->cq_ev_fd = NULL;
5968                 return 0;
5969         }
5970
5971         return -ENXIO;
5972 }
5973
5974 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
5975 {
5976         io_finish_async(ctx);
5977         if (ctx->sqo_mm)
5978                 mmdrop(ctx->sqo_mm);
5979
5980         io_iopoll_reap_events(ctx);
5981         io_sqe_buffer_unregister(ctx);
5982         io_sqe_files_unregister(ctx);
5983         io_eventfd_unregister(ctx);
5984
5985 #if defined(CONFIG_UNIX)
5986         if (ctx->ring_sock) {
5987                 ctx->ring_sock->file = NULL; /* so that iput() is called */
5988                 sock_release(ctx->ring_sock);
5989         }
5990 #endif
5991
5992         io_mem_free(ctx->rings);
5993         io_mem_free(ctx->sq_sqes);
5994
5995         percpu_ref_exit(&ctx->refs);
5996         if (ctx->account_mem)
5997                 io_unaccount_mem(ctx->user,
5998                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
5999         free_uid(ctx->user);
6000         put_cred(ctx->creds);
6001         kfree(ctx->completions);
6002         kfree(ctx->cancel_hash);
6003         kmem_cache_free(req_cachep, ctx->fallback_req);
6004         kfree(ctx);
6005 }
6006
6007 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
6008 {
6009         struct io_ring_ctx *ctx = file->private_data;
6010         __poll_t mask = 0;
6011
6012         poll_wait(file, &ctx->cq_wait, wait);
6013         /*
6014          * synchronizes with barrier from wq_has_sleeper call in
6015          * io_commit_cqring
6016          */
6017         smp_rmb();
6018         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
6019             ctx->rings->sq_ring_entries)
6020                 mask |= EPOLLOUT | EPOLLWRNORM;
6021         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
6022                 mask |= EPOLLIN | EPOLLRDNORM;
6023
6024         return mask;
6025 }
6026
6027 static int io_uring_fasync(int fd, struct file *file, int on)
6028 {
6029         struct io_ring_ctx *ctx = file->private_data;
6030
6031         return fasync_helper(fd, file, on, &ctx->cq_fasync);
6032 }
6033
6034 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
6035 {
6036         mutex_lock(&ctx->uring_lock);
6037         percpu_ref_kill(&ctx->refs);
6038         mutex_unlock(&ctx->uring_lock);
6039
6040         io_kill_timeouts(ctx);
6041         io_poll_remove_all(ctx);
6042
6043         if (ctx->io_wq)
6044                 io_wq_cancel_all(ctx->io_wq);
6045
6046         io_iopoll_reap_events(ctx);
6047         /* if we failed setting up the ctx, we might not have any rings */
6048         if (ctx->rings)
6049                 io_cqring_overflow_flush(ctx, true);
6050         wait_for_completion(&ctx->completions[0]);
6051         io_ring_ctx_free(ctx);
6052 }
6053
6054 static int io_uring_release(struct inode *inode, struct file *file)
6055 {
6056         struct io_ring_ctx *ctx = file->private_data;
6057
6058         file->private_data = NULL;
6059         io_ring_ctx_wait_and_kill(ctx);
6060         return 0;
6061 }
6062
6063 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
6064                                   struct files_struct *files)
6065 {
6066         struct io_kiocb *req;
6067         DEFINE_WAIT(wait);
6068
6069         while (!list_empty_careful(&ctx->inflight_list)) {
6070                 struct io_kiocb *cancel_req = NULL;
6071
6072                 spin_lock_irq(&ctx->inflight_lock);
6073                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
6074                         if (req->work.files != files)
6075                                 continue;
6076                         /* req is being completed, ignore */
6077                         if (!refcount_inc_not_zero(&req->refs))
6078                                 continue;
6079                         cancel_req = req;
6080                         break;
6081                 }
6082                 if (cancel_req)
6083                         prepare_to_wait(&ctx->inflight_wait, &wait,
6084                                                 TASK_UNINTERRUPTIBLE);
6085                 spin_unlock_irq(&ctx->inflight_lock);
6086
6087                 /* We need to keep going until we don't find a matching req */
6088                 if (!cancel_req)
6089                         break;
6090
6091                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
6092                 io_put_req(cancel_req);
6093                 schedule();
6094         }
6095         finish_wait(&ctx->inflight_wait, &wait);
6096 }
6097
6098 static int io_uring_flush(struct file *file, void *data)
6099 {
6100         struct io_ring_ctx *ctx = file->private_data;
6101
6102         io_uring_cancel_files(ctx, data);
6103         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
6104                 io_cqring_overflow_flush(ctx, true);
6105                 io_wq_cancel_all(ctx->io_wq);
6106         }
6107         return 0;
6108 }
6109
6110 static void *io_uring_validate_mmap_request(struct file *file,
6111                                             loff_t pgoff, size_t sz)
6112 {
6113         struct io_ring_ctx *ctx = file->private_data;
6114         loff_t offset = pgoff << PAGE_SHIFT;
6115         struct page *page;
6116         void *ptr;
6117
6118         switch (offset) {
6119         case IORING_OFF_SQ_RING:
6120         case IORING_OFF_CQ_RING:
6121                 ptr = ctx->rings;
6122                 break;
6123         case IORING_OFF_SQES:
6124                 ptr = ctx->sq_sqes;
6125                 break;
6126         default:
6127                 return ERR_PTR(-EINVAL);
6128         }
6129
6130         page = virt_to_head_page(ptr);
6131         if (sz > page_size(page))
6132                 return ERR_PTR(-EINVAL);
6133
6134         return ptr;
6135 }
6136
6137 #ifdef CONFIG_MMU
6138
6139 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6140 {
6141         size_t sz = vma->vm_end - vma->vm_start;
6142         unsigned long pfn;
6143         void *ptr;
6144
6145         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
6146         if (IS_ERR(ptr))
6147                 return PTR_ERR(ptr);
6148
6149         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
6150         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
6151 }
6152
6153 #else /* !CONFIG_MMU */
6154
6155 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
6156 {
6157         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
6158 }
6159
6160 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
6161 {
6162         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
6163 }
6164
6165 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
6166         unsigned long addr, unsigned long len,
6167         unsigned long pgoff, unsigned long flags)
6168 {
6169         void *ptr;
6170
6171         ptr = io_uring_validate_mmap_request(file, pgoff, len);
6172         if (IS_ERR(ptr))
6173                 return PTR_ERR(ptr);
6174
6175         return (unsigned long) ptr;
6176 }
6177
6178 #endif /* !CONFIG_MMU */
6179
6180 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
6181                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
6182                 size_t, sigsz)
6183 {
6184         struct io_ring_ctx *ctx;
6185         long ret = -EBADF;
6186         int submitted = 0;
6187         struct fd f;
6188
6189         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
6190                 return -EINVAL;
6191
6192         f = fdget(fd);
6193         if (!f.file)
6194                 return -EBADF;
6195
6196         ret = -EOPNOTSUPP;
6197         if (f.file->f_op != &io_uring_fops)
6198                 goto out_fput;
6199
6200         ret = -ENXIO;
6201         ctx = f.file->private_data;
6202         if (!percpu_ref_tryget(&ctx->refs))
6203                 goto out_fput;
6204
6205         /*
6206          * For SQ polling, the thread will do all submissions and completions.
6207          * Just return the requested submit count, and wake the thread if
6208          * we were asked to.
6209          */
6210         ret = 0;
6211         if (ctx->flags & IORING_SETUP_SQPOLL) {
6212                 if (!list_empty_careful(&ctx->cq_overflow_list))
6213                         io_cqring_overflow_flush(ctx, false);
6214                 if (flags & IORING_ENTER_SQ_WAKEUP)
6215                         wake_up(&ctx->sqo_wait);
6216                 submitted = to_submit;
6217         } else if (to_submit) {
6218                 struct mm_struct *cur_mm;
6219
6220                 if (current->mm != ctx->sqo_mm ||
6221                     current_cred() != ctx->creds) {
6222                         ret = -EPERM;
6223                         goto out;
6224                 }
6225
6226                 mutex_lock(&ctx->uring_lock);
6227                 /* already have mm, so io_submit_sqes() won't try to grab it */
6228                 cur_mm = ctx->sqo_mm;
6229                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
6230                                            &cur_mm, false);
6231                 mutex_unlock(&ctx->uring_lock);
6232
6233                 if (submitted != to_submit)
6234                         goto out;
6235         }
6236         if (flags & IORING_ENTER_GETEVENTS) {
6237                 unsigned nr_events = 0;
6238
6239                 min_complete = min(min_complete, ctx->cq_entries);
6240
6241                 if (ctx->flags & IORING_SETUP_IOPOLL) {
6242                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
6243                 } else {
6244                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
6245                 }
6246         }
6247
6248 out:
6249         percpu_ref_put(&ctx->refs);
6250 out_fput:
6251         fdput(f);
6252         return submitted ? submitted : ret;
6253 }
6254
6255 static const struct file_operations io_uring_fops = {
6256         .release        = io_uring_release,
6257         .flush          = io_uring_flush,
6258         .mmap           = io_uring_mmap,
6259 #ifndef CONFIG_MMU
6260         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
6261         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
6262 #endif
6263         .poll           = io_uring_poll,
6264         .fasync         = io_uring_fasync,
6265 };
6266
6267 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
6268                                   struct io_uring_params *p)
6269 {
6270         struct io_rings *rings;
6271         size_t size, sq_array_offset;
6272
6273         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
6274         if (size == SIZE_MAX)
6275                 return -EOVERFLOW;
6276
6277         rings = io_mem_alloc(size);
6278         if (!rings)
6279                 return -ENOMEM;
6280
6281         ctx->rings = rings;
6282         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
6283         rings->sq_ring_mask = p->sq_entries - 1;
6284         rings->cq_ring_mask = p->cq_entries - 1;
6285         rings->sq_ring_entries = p->sq_entries;
6286         rings->cq_ring_entries = p->cq_entries;
6287         ctx->sq_mask = rings->sq_ring_mask;
6288         ctx->cq_mask = rings->cq_ring_mask;
6289         ctx->sq_entries = rings->sq_ring_entries;
6290         ctx->cq_entries = rings->cq_ring_entries;
6291
6292         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
6293         if (size == SIZE_MAX) {
6294                 io_mem_free(ctx->rings);
6295                 ctx->rings = NULL;
6296                 return -EOVERFLOW;
6297         }
6298
6299         ctx->sq_sqes = io_mem_alloc(size);
6300         if (!ctx->sq_sqes) {
6301                 io_mem_free(ctx->rings);
6302                 ctx->rings = NULL;
6303                 return -ENOMEM;
6304         }
6305
6306         return 0;
6307 }
6308
6309 /*
6310  * Allocate an anonymous fd, this is what constitutes the application
6311  * visible backing of an io_uring instance. The application mmaps this
6312  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
6313  * we have to tie this fd to a socket for file garbage collection purposes.
6314  */
6315 static int io_uring_get_fd(struct io_ring_ctx *ctx)
6316 {
6317         struct file *file;
6318         int ret;
6319
6320 #if defined(CONFIG_UNIX)
6321         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
6322                                 &ctx->ring_sock);
6323         if (ret)
6324                 return ret;
6325 #endif
6326
6327         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
6328         if (ret < 0)
6329                 goto err;
6330
6331         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
6332                                         O_RDWR | O_CLOEXEC);
6333         if (IS_ERR(file)) {
6334                 put_unused_fd(ret);
6335                 ret = PTR_ERR(file);
6336                 goto err;
6337         }
6338
6339 #if defined(CONFIG_UNIX)
6340         ctx->ring_sock->file = file;
6341 #endif
6342         fd_install(ret, file);
6343         return ret;
6344 err:
6345 #if defined(CONFIG_UNIX)
6346         sock_release(ctx->ring_sock);
6347         ctx->ring_sock = NULL;
6348 #endif
6349         return ret;
6350 }
6351
6352 static int io_uring_create(unsigned entries, struct io_uring_params *p)
6353 {
6354         struct user_struct *user = NULL;
6355         struct io_ring_ctx *ctx;
6356         bool account_mem;
6357         int ret;
6358
6359         if (!entries)
6360                 return -EINVAL;
6361         if (entries > IORING_MAX_ENTRIES) {
6362                 if (!(p->flags & IORING_SETUP_CLAMP))
6363                         return -EINVAL;
6364                 entries = IORING_MAX_ENTRIES;
6365         }
6366
6367         /*
6368          * Use twice as many entries for the CQ ring. It's possible for the
6369          * application to drive a higher depth than the size of the SQ ring,
6370          * since the sqes are only used at submission time. This allows for
6371          * some flexibility in overcommitting a bit. If the application has
6372          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
6373          * of CQ ring entries manually.
6374          */
6375         p->sq_entries = roundup_pow_of_two(entries);
6376         if (p->flags & IORING_SETUP_CQSIZE) {
6377                 /*
6378                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
6379                  * to a power-of-two, if it isn't already. We do NOT impose
6380                  * any cq vs sq ring sizing.
6381                  */
6382                 if (p->cq_entries < p->sq_entries)
6383                         return -EINVAL;
6384                 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
6385                         if (!(p->flags & IORING_SETUP_CLAMP))
6386                                 return -EINVAL;
6387                         p->cq_entries = IORING_MAX_CQ_ENTRIES;
6388                 }
6389                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
6390         } else {
6391                 p->cq_entries = 2 * p->sq_entries;
6392         }
6393
6394         user = get_uid(current_user());
6395         account_mem = !capable(CAP_IPC_LOCK);
6396
6397         if (account_mem) {
6398                 ret = io_account_mem(user,
6399                                 ring_pages(p->sq_entries, p->cq_entries));
6400                 if (ret) {
6401                         free_uid(user);
6402                         return ret;
6403                 }
6404         }
6405
6406         ctx = io_ring_ctx_alloc(p);
6407         if (!ctx) {
6408                 if (account_mem)
6409                         io_unaccount_mem(user, ring_pages(p->sq_entries,
6410                                                                 p->cq_entries));
6411                 free_uid(user);
6412                 return -ENOMEM;
6413         }
6414         ctx->compat = in_compat_syscall();
6415         ctx->account_mem = account_mem;
6416         ctx->user = user;
6417         ctx->creds = get_current_cred();
6418
6419         ret = io_allocate_scq_urings(ctx, p);
6420         if (ret)
6421                 goto err;
6422
6423         ret = io_sq_offload_start(ctx, p);
6424         if (ret)
6425                 goto err;
6426
6427         memset(&p->sq_off, 0, sizeof(p->sq_off));
6428         p->sq_off.head = offsetof(struct io_rings, sq.head);
6429         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
6430         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
6431         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
6432         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
6433         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
6434         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
6435
6436         memset(&p->cq_off, 0, sizeof(p->cq_off));
6437         p->cq_off.head = offsetof(struct io_rings, cq.head);
6438         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
6439         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
6440         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
6441         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
6442         p->cq_off.cqes = offsetof(struct io_rings, cqes);
6443
6444         /*
6445          * Install ring fd as the very last thing, so we don't risk someone
6446          * having closed it before we finish setup
6447          */
6448         ret = io_uring_get_fd(ctx);
6449         if (ret < 0)
6450                 goto err;
6451
6452         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
6453                         IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS;
6454         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
6455         return ret;
6456 err:
6457         io_ring_ctx_wait_and_kill(ctx);
6458         return ret;
6459 }
6460
6461 /*
6462  * Sets up an aio uring context, and returns the fd. Applications asks for a
6463  * ring size, we return the actual sq/cq ring sizes (among other things) in the
6464  * params structure passed in.
6465  */
6466 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
6467 {
6468         struct io_uring_params p;
6469         long ret;
6470         int i;
6471
6472         if (copy_from_user(&p, params, sizeof(p)))
6473                 return -EFAULT;
6474         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
6475                 if (p.resv[i])
6476                         return -EINVAL;
6477         }
6478
6479         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
6480                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
6481                         IORING_SETUP_CLAMP))
6482                 return -EINVAL;
6483
6484         ret = io_uring_create(entries, &p);
6485         if (ret < 0)
6486                 return ret;
6487
6488         if (copy_to_user(params, &p, sizeof(p)))
6489                 return -EFAULT;
6490
6491         return ret;
6492 }
6493
6494 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
6495                 struct io_uring_params __user *, params)
6496 {
6497         return io_uring_setup(entries, params);
6498 }
6499
6500 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
6501                                void __user *arg, unsigned nr_args)
6502         __releases(ctx->uring_lock)
6503         __acquires(ctx->uring_lock)
6504 {
6505         int ret;
6506
6507         /*
6508          * We're inside the ring mutex, if the ref is already dying, then
6509          * someone else killed the ctx or is already going through
6510          * io_uring_register().
6511          */
6512         if (percpu_ref_is_dying(&ctx->refs))
6513                 return -ENXIO;
6514
6515         if (opcode != IORING_UNREGISTER_FILES &&
6516             opcode != IORING_REGISTER_FILES_UPDATE) {
6517                 percpu_ref_kill(&ctx->refs);
6518
6519                 /*
6520                  * Drop uring mutex before waiting for references to exit. If
6521                  * another thread is currently inside io_uring_enter() it might
6522                  * need to grab the uring_lock to make progress. If we hold it
6523                  * here across the drain wait, then we can deadlock. It's safe
6524                  * to drop the mutex here, since no new references will come in
6525                  * after we've killed the percpu ref.
6526                  */
6527                 mutex_unlock(&ctx->uring_lock);
6528                 wait_for_completion(&ctx->completions[0]);
6529                 mutex_lock(&ctx->uring_lock);
6530         }
6531
6532         switch (opcode) {
6533         case IORING_REGISTER_BUFFERS:
6534                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
6535                 break;
6536         case IORING_UNREGISTER_BUFFERS:
6537                 ret = -EINVAL;
6538                 if (arg || nr_args)
6539                         break;
6540                 ret = io_sqe_buffer_unregister(ctx);
6541                 break;
6542         case IORING_REGISTER_FILES:
6543                 ret = io_sqe_files_register(ctx, arg, nr_args);
6544                 break;
6545         case IORING_UNREGISTER_FILES:
6546                 ret = -EINVAL;
6547                 if (arg || nr_args)
6548                         break;
6549                 ret = io_sqe_files_unregister(ctx);
6550                 break;
6551         case IORING_REGISTER_FILES_UPDATE:
6552                 ret = io_sqe_files_update(ctx, arg, nr_args);
6553                 break;
6554         case IORING_REGISTER_EVENTFD:
6555                 ret = -EINVAL;
6556                 if (nr_args != 1)
6557                         break;
6558                 ret = io_eventfd_register(ctx, arg);
6559                 break;
6560         case IORING_UNREGISTER_EVENTFD:
6561                 ret = -EINVAL;
6562                 if (arg || nr_args)
6563                         break;
6564                 ret = io_eventfd_unregister(ctx);
6565                 break;
6566         default:
6567                 ret = -EINVAL;
6568                 break;
6569         }
6570
6571
6572         if (opcode != IORING_UNREGISTER_FILES &&
6573             opcode != IORING_REGISTER_FILES_UPDATE) {
6574                 /* bring the ctx back to life */
6575                 reinit_completion(&ctx->completions[0]);
6576                 percpu_ref_reinit(&ctx->refs);
6577         }
6578         return ret;
6579 }
6580
6581 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
6582                 void __user *, arg, unsigned int, nr_args)
6583 {
6584         struct io_ring_ctx *ctx;
6585         long ret = -EBADF;
6586         struct fd f;
6587
6588         f = fdget(fd);
6589         if (!f.file)
6590                 return -EBADF;
6591
6592         ret = -EOPNOTSUPP;
6593         if (f.file->f_op != &io_uring_fops)
6594                 goto out_fput;
6595
6596         ctx = f.file->private_data;
6597
6598         mutex_lock(&ctx->uring_lock);
6599         ret = __io_uring_register(ctx, opcode, arg, nr_args);
6600         mutex_unlock(&ctx->uring_lock);
6601         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
6602                                                         ctx->cq_ev_fd != NULL, ret);
6603 out_fput:
6604         fdput(f);
6605         return ret;
6606 }
6607
6608 static int __init io_uring_init(void)
6609 {
6610         BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
6611         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
6612         return 0;
6613 };
6614 __initcall(io_uring_init);